diff --git a/LaunchAPPL/Client/Serial.cc b/LaunchAPPL/Client/Serial.cc index 11516ae943..22c5c0656e 100644 --- a/LaunchAPPL/Client/Serial.cc +++ b/LaunchAPPL/Client/Serial.cc @@ -2,6 +2,7 @@ #include "Launcher.h" #include "Utilities.h" #include "Stream.h" +#include "ReliableStream.h" #include #include #include @@ -29,6 +30,7 @@ public: class SerialLauncher : public Launcher { SerialStream stream; + ReliableStream rStream; public: SerialLauncher(po::variables_map& options); virtual ~SerialLauncher(); @@ -36,7 +38,7 @@ public: virtual bool Go(int timeout = 0); private: - void write(const void *p, size_t n) { stream.write(p, n); } + void write(const void *p, size_t n) { rStream.write(p, n); } ssize_t read(void * p, size_t n); }; @@ -56,7 +58,7 @@ SerialStream::SerialStream(po::variables_map &options) tios.c_lflag = 0; tios.c_oflag = 0; tios.c_cc[VTIME] = 0; /* inter-character timer unused */ - tios.c_cc[VMIN] = 1; /* blocking read until 5 chars received */ + tios.c_cc[VMIN] = 1; /* blocking read until 1 chars received */ tcsetattr(fd,TCSANOW,&tios); usleep(500000); } @@ -98,14 +100,14 @@ void SerialStream::wait() ssize_t n = ::read(fd, readBuffer, kReadBufferSize); if(n > 0) { - onReceive(readBuffer, n); + notifyReceive(readBuffer, n); } } } SerialLauncher::SerialLauncher(po::variables_map &options) - : Launcher(options), stream(options) + : Launcher(options), stream(options), rStream(stream) { } @@ -115,11 +117,12 @@ SerialLauncher::~SerialLauncher() ssize_t SerialLauncher::read(void *p, size_t n) { - ssize_t available = stream.read(p, n); + ssize_t available = rStream.read(p, n); while(!available) { + rStream.flushWrite(); stream.wait(); - available = stream.read(p, n); + available = rStream.read(p, n); } return available; } diff --git a/LaunchAPPL/Common/CMakeLists.txt b/LaunchAPPL/Common/CMakeLists.txt index 93247c6e0d..95f2a5a7d5 100644 --- a/LaunchAPPL/Common/CMakeLists.txt +++ b/LaunchAPPL/Common/CMakeLists.txt @@ -1,5 +1,11 @@ add_library(LaunchAPPLCommon Stream.h Stream.cc + ReliableStream.h + ReliableStream.cc + CRC32.h ) target_include_directories(LaunchAPPLCommon PUBLIC .) + +add_executable(TestLaunchAPPLCommon Test.cc) +target_link_libraries(TestLaunchAPPLCommon LaunchAPPLCommon) diff --git a/LaunchAPPL/Common/CRC32.h b/LaunchAPPL/Common/CRC32.h new file mode 100644 index 0000000000..775e9b1d57 --- /dev/null +++ b/LaunchAPPL/Common/CRC32.h @@ -0,0 +1,71 @@ +#ifndef CRC32_H_ +#define CRC32_H_ +#include + + +const uint32_t crc_table[256] = { + 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, + 0x706af48f, 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, + 0xe0d5e91e, 0x97d2d988, 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, + 0x90bf1d91, 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de, + 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, 0x136c9856, + 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, + 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, + 0xa2677172, 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, + 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, + 0x45df5c75, 0xdcd60dcf, 0xabd13d59, 0x26d930ac, 0x51de003a, + 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, 0xcfba9599, + 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, + 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, + 0x01db7106, 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, + 0x9fbfe4a5, 0xe8b8d433, 0x7807c9a2, 0x0f00f934, 0x9609a88e, + 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01, + 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, 0x6c0695ed, + 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, + 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, + 0xfbd44c65, 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, + 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, + 0x346ed9fc, 0xad678846, 0xda60b8d0, 0x44042d73, 0x33031de5, + 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, 0xbe0b1010, + 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, + 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, + 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, + 0x03b6e20c, 0x74b1d29a, 0xead54739, 0x9dd277af, 0x04db2615, + 0x73dc1683, 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8, + 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, 0xf00f9344, + 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, + 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, + 0x67dd4acc, 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, + 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, + 0xa6bc5767, 0x3fb506dd, 0x48b2364b, 0xd80d2bda, 0xaf0a1b4c, + 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, 0x316e8eef, + 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, + 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, + 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, + 0x2cd99e8b, 0x5bdeae1d, 0x9b64c2b0, 0xec63f226, 0x756aa39c, + 0x026d930a, 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713, + 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, 0x92d28e9b, + 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, + 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, + 0x18b74777, 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, + 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45, 0xa00ae278, + 0xd70dd2ee, 0x4e048354, 0x3903b3c2, 0xa7672661, 0xd06016f7, + 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, 0x40df0b66, + 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, + 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, + 0xcdd70693, 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, + 0x5d681b02, 0x2a6f2b94, 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, + 0x2d02ef8d +}; + +template +inline uint32_t crc32(uint32_t crc, Iterator p, Iterator q) +{ + crc = crc ^ 0xffffffffL; + while (p != q) + { + crc = crc_table[((int)crc ^ (*p++)) & 0xff] ^ (crc >> 8); + } + return crc ^ 0xffffffffL; +} +#endif diff --git a/LaunchAPPL/Common/ReliableStream.cc b/LaunchAPPL/Common/ReliableStream.cc new file mode 100644 index 0000000000..49be3b29af --- /dev/null +++ b/LaunchAPPL/Common/ReliableStream.cc @@ -0,0 +1,345 @@ +#include "ReliableStream.h" +#include +#include "CRC32.h" + +using std::begin; +using std::end; + +const uint8_t magic1[4] = { 0xDE, 0xAD, 0xBE, 0xEF }; +const uint8_t magic2[4] = { 0xFA, 0xCE, 0xCA, 0xFE }; + +const uint32_t magic1_32 = 0xDEADBEEF; +const uint32_t magic2_32 = 0xFACECAFE; + +enum thing : uint8_t +{ + kDataPacket = 0, + kEndOfPacket, + kEscapedMagic1, + kEscapedMagic2, + kAck, + kNack +}; + +inline uint32_t readLong(uint8_t* p) +{ + uint32_t x; + x = *p++; + x = (x << 8) | *p++; + x = (x << 8) | *p++; + x = (x << 8) | *p++; + return x; +} + +ReliableStream::ReliableStream(Stream& stream) + : stream(stream) +{ + incomingPacket.reserve(packetSize + 4); +} +ReliableStream::~ReliableStream() +{ +} + + +void ReliableStream::sendPacket() +{ +} + +void ReliableStream::ack() +{ + uint8_t packet[] = { + magic1[0], magic1[1], magic1[2], magic1[3], + kAck, (uint8_t)~kAck, (uint8_t)receivedInputPacket, (uint8_t)~receivedInputPacket + }; + stream.write(packet, 8); + printf("ack sent\n"); +} + +void ReliableStream::nack() +{ + uint8_t packet[] = { + magic1[0], magic1[1], magic1[2], magic1[3], + kNack, (uint8_t)~kNack, (uint8_t)receivedInputPacket, (uint8_t)~receivedInputPacket + }; + stream.write(packet, 8); + printf("nack sent\n"); +} + +void ReliableStream::gotNack(uint8_t id) +{ + printf("got ack\n"); +} + +void ReliableStream::gotAck(uint8_t id) +{ + printf("got nack\n"); +} + +void ReliableStream::processIncoming() +{ + printf("Received packet %d - %d bytes\n", receivedInputPacket, (int)incomingPacket.size()); + if(incomingPacket.size() < 4) + { + nack(); + incomingPacket.clear(); + return; + } + uint32_t expectedCRC = crc32(0, incomingPacket.begin(), incomingPacket.end() - 4); + uint32_t receivedCRC = readLong(&incomingPacket[incomingPacket.size()-4]); + if(receivedCRC != expectedCRC) + { + printf("CRC mismatch %x != %x\n", receivedCRC, expectedCRC); + + nack(); + incomingPacket.clear(); + return; + } + + receivedInputPacket++; + printf("Verified packet %d - %d bytes\n", receivedInputPacket, (int)incomingPacket.size()); + ack(); + notifyReceive(incomingPacket.data(), incomingPacket.size() - 4); + incomingPacket.clear(); +} + +void ReliableStream::write(const void* p0, size_t n) +{ + const uint8_t* p = (const uint8_t*)p0; + + ++sentOutputPacket; + std::vector packet = { + magic1[0], magic1[1], magic1[2], magic1[3], + kDataPacket, (uint8_t)~kDataPacket, (uint8_t)sentOutputPacket, (uint8_t)~sentOutputPacket + }; + int match = 0, match2 = 0; + int i; + int consumed = 0; + for(i = 0; i < n; i++) + { + if(p[i] == magic1[match++]) + { + if(match == 4) + { + packet.push_back(magic2[0]); + packet.push_back(magic2[1]); + packet.push_back(magic2[2]); + packet.push_back(magic2[3]); + packet.push_back(kEscapedMagic1); + match = 0; + consumed = i+1; + } + } + else + match = 0; + + if(match == 0) + while(consumed <= i) + packet.push_back(p[consumed++]); + + + if(p[i] == magic2[match2++]) + { + if(match2 == 4) + { + match2 = 0; + packet.push_back(kEscapedMagic2); + } + } + else + { + match2 = 0; + } + } + uint32_t crc = crc32(0, p, p + n); + printf("sending crc: %x\n", crc); + + if(crc == magic1_32 || crc == magic2_32) + { + packet.push_back(magic2[0]); + packet.push_back(magic2[1]); + packet.push_back(magic2[2]); + packet.push_back(magic2[3]); + packet.push_back(crc == magic1_32 ? kEscapedMagic1 : kEscapedMagic2); + } + else + { + packet.push_back(crc >> 24); + packet.push_back(crc >> 16); + packet.push_back(crc >> 8); + packet.push_back(crc); + } + + packet.push_back(magic2[0]); + packet.push_back(magic2[1]); + packet.push_back(magic2[2]); + packet.push_back(magic2[3]); + packet.push_back(kEndOfPacket); + + stream.write(packet.data(), packet.size()); +} + +void ReliableStream::flushWrite() +{ + +} + +size_t ReliableStream::onReceive(const uint8_t* p, size_t n) +{ + printf("data available (%d) - state %d\n", (int)n, (int)state); + + switch(state) + { + case State::waiting: + if( (n > 0 && p[0] != magic1[0]) + || (n > 1 && p[1] != magic1[1]) + || (n > 2 && p[2] != magic1[2]) + || (n > 3 && p[3] != magic1[3]) ) + { + state = State::skipping; + nack(); + gotNack(ackedOutputPacket); + return n > 4 ? 4 : n; + } + if(n < 6) + return 0; + + if(p[5] != (uint8_t)~p[4]) + { + state = State::skipping; + nack(); + gotNack(ackedOutputPacket); + return 6; + } + + switch(p[4]) + { + case kAck: + if(n < 8) + return 0; + if(p[6] != (uint8_t)~p[7]) + { + state = State::skipping; + nack(); + gotNack(ackedOutputPacket); + return 8; + } + gotAck(p[6]); + return 8; + case kNack: + if(n < 8) + return 0; + if(p[6] != (uint8_t)~p[7]) + { + state = State::skipping; + nack(); + gotNack(ackedOutputPacket); + return 8; + } + gotNack(p[6]); + return 8; + case kDataPacket: + if(n < 8) + return 0; + if(p[6] != (uint8_t)~p[7]) + { + state = State::skipping; + nack(); + gotNack(ackedOutputPacket); + return 8; + } + if(p[6] != ((receivedInputPacket + 1) & 0xFF)) + { + nack(); + state = State::skipping; + return 8; + } + state = State::receiving; + return 8; + default: + state = State::skipping; + nack(); + gotNack(ackedOutputPacket); + return 8; + } + break; + + case State::skipping: + { + int match = 0; + int i; + for(i = 0; i < n; i++) + { + if(p[i] == magic1[match++]) + { + if(match == 4) + { + state = State::waiting; + return i-3; + } + + } + else + match = 0; + } + return i - match; + } + break; + + case State::receiving: + { + int match = 0, match2 = 0; + int i; + int consumed = 0; + for(i = 0; i < n; i++) + { + if(p[i] == magic1[match++]) + { + if(match == 4) + { + state = State::waiting; + nack(); + return i-3; + } + } + else + match = 0; + + if(match2 == 4) + { + match2 = 0; + consumed = i+1; + switch(p[i]) + { + case kEscapedMagic1: + incomingPacket.insert(incomingPacket.end(), begin(magic1), end(magic1)); + break; + case kEscapedMagic2: + incomingPacket.insert(incomingPacket.end(), begin(magic2), end(magic2)); + break; + case kEndOfPacket: + processIncoming(); + state = State::waiting; + return i + 1; + default: + state = State::waiting; + nack(); + return i + 1; + } + } + else if(p[i] == magic2[match2++]) + ; + else + { + match2 = 0; + } + + if(match == 0 && match2 == 0) + while(consumed <= i) + incomingPacket.push_back(p[consumed++]); + + } + return i - std::max(match, match2); + } + break; + } +} diff --git a/LaunchAPPL/Common/ReliableStream.h b/LaunchAPPL/Common/ReliableStream.h new file mode 100644 index 0000000000..48c963ee4d --- /dev/null +++ b/LaunchAPPL/Common/ReliableStream.h @@ -0,0 +1,45 @@ +#ifndef RELIABLESTREAM_H_ +#define RELIABLESTREAM_H_ + +#include "Stream.h" + +class ReliableStream : public Stream, public StreamListener +{ + Stream& stream; + + static const int packetSize = 256; + + int receivedInputPacket = 0; + int sentOutputPacket = 0; + int ackedOutputPacket = 0; + + void sendPacket(); + void nack(); + void ack(); + + void gotNack(uint8_t id); + void gotAck(uint8_t id); + + void processIncoming(); + + enum class State + { + waiting, + skipping, + receiving + }; + + State state = State::waiting; + std::vector incomingPacket; + +public: + ReliableStream(Stream& stream); + virtual ~ReliableStream(); + + virtual void write(const void* p, size_t n) override; + virtual void flushWrite() override; + + virtual size_t onReceive(const uint8_t* p, size_t n) override; +}; + +#endif diff --git a/LaunchAPPL/Common/Stream.cc b/LaunchAPPL/Common/Stream.cc index 42827b7fc0..f6f99d8b49 100644 --- a/LaunchAPPL/Common/Stream.cc +++ b/LaunchAPPL/Common/Stream.cc @@ -9,7 +9,7 @@ Stream::~Stream() { } -void Stream::onReceive(const uint8_t* p, size_t n) +void Stream::notifyReceive(const uint8_t* p, size_t n) { if(buffer_.empty()) { diff --git a/LaunchAPPL/Common/Stream.h b/LaunchAPPL/Common/Stream.h index 01598acf8d..6590f80277 100644 --- a/LaunchAPPL/Common/Stream.h +++ b/LaunchAPPL/Common/Stream.h @@ -1,3 +1,6 @@ +#ifndef STREAM_H_ +#define STREAM_H_ + #include #include #include @@ -10,7 +13,7 @@ public: class Stream { - StreamListener *listener_; + StreamListener *listener_ = nullptr; std::vector buffer_; public: Stream(); @@ -19,8 +22,11 @@ public: void setListener(StreamListener *l) { listener_ = l; } virtual void write(const void* p, size_t n) = 0; + virtual void flushWrite() {} long read(void *p, size_t n); protected: - void onReceive(const uint8_t* p, size_t n); + void notifyReceive(const uint8_t* p, size_t n); }; + +#endif diff --git a/LaunchAPPL/Common/Test.cc b/LaunchAPPL/Common/Test.cc new file mode 100644 index 0000000000..5ff1c8fb03 --- /dev/null +++ b/LaunchAPPL/Common/Test.cc @@ -0,0 +1,79 @@ +#include "ReliableStream.h" +#include +#include +#include +#include + +class Forwarder : public Stream +{ +public: + Forwarder *other; + std::vector> packets; + + virtual void write(const void* p, size_t n) + { + other->enqueueReceive(p,n); + } + + void enqueueReceive(const void* p, size_t n) + { + packets.emplace_back((const uint8_t*)p, ((const uint8_t*)p)+n); + } + + void idle() + { + std::vector> packets2; + packets2.swap(packets); + for(auto& p : packets2) + notifyReceive(p.data(), p.size()); + } +}; + +class DumpToConsole : public StreamListener +{ +public: + std::string prefix; + + DumpToConsole(std::string prefix) : prefix(prefix) {} + + size_t onReceive(const uint8_t* p, size_t n) + { + std::cout << prefix; + for(int i = 0; i < n; i++) + { + if(p[i] >= 128 || p[i] < 32) + std::cout << "\\x" << std::hex << (unsigned)p[i]; + else + std::cout << p[i]; + } + std::cout << std::endl; + return n; + } +}; + +int main() +{ + Forwarder fwd1, fwd2; + fwd1.other = &fwd2; + fwd2.other = &fwd1; + + ReliableStream rel1(fwd1), rel2(fwd2); + fwd1.setListener(&rel1); + fwd2.setListener(&rel2); + + DumpToConsole dump1("one:"); + DumpToConsole dump2("two:"); + + rel1.setListener(&dump1); + rel2.setListener(&dump2); + + rel1.write("Hello, world.", 13); + rel1.flushWrite(); + + for(int i = 0; i < 100; i++) + { + fwd1.idle(); + fwd2.idle(); + } + return 0; +} diff --git a/LaunchAPPL/Server/main.cc b/LaunchAPPL/Server/main.cc index 895bdfcb33..33d46a59af 100644 --- a/LaunchAPPL/Server/main.cc +++ b/LaunchAPPL/Server/main.cc @@ -6,6 +6,7 @@ #include #include +#include #include class MacSerialStream : public Stream @@ -73,10 +74,9 @@ void MacSerialStream::idle() { long count = 0; SerGetBuf(inRefNum, &count); - if(count) - printf("sergetbuf -> %ld\n", count); while(count > 0) { + printf("something received.\n"); long count1 = count > kReadBufferSize ? kReadBufferSize : count; ParamBlockRec pb; memset(&pb, 0, sizeof(pb)); @@ -88,8 +88,7 @@ void MacSerialStream::idle() return; count -= count1; - printf("onReceive: %ld\n", count1); - onReceive((uint8_t*)readBuffer, count1); + notifyReceive((uint8_t*)readBuffer, count1); Delay(20,nullptr); } } @@ -196,8 +195,10 @@ int main() { MacSerialStream stream; + ReliableStream rStream(stream); + stream.setListener(&rStream); Listener listener; - stream.setListener(&listener); + rStream.setListener(&listener); while(!Button()) {