From d9ab8bee37d0fd47342a0735417099150ce256db Mon Sep 17 00:00:00 2001 From: Wolfgang Thaller Date: Tue, 24 Apr 2018 01:42:36 +0200 Subject: [PATCH] LaunchAPPL/Serial: stream resetting --- LaunchAPPL/Client/Serial.cc | 24 ++++++++- LaunchAPPL/Common/ReliableStream.cc | 70 ++++++++++++++++++++----- LaunchAPPL/Common/ReliableStream.h | 26 ++++++---- LaunchAPPL/Common/Stream.cc | 37 ++++++++++++++ LaunchAPPL/Common/Stream.h | 20 ++++++++ LaunchAPPL/Common/Test.cc | 4 +- LaunchAPPL/Common/UnreliableStream.h | 12 ++--- LaunchAPPL/Server/LaunchAPPLServer.cc | 73 ++++++++++++++++++++++----- 8 files changed, 217 insertions(+), 49 deletions(-) diff --git a/LaunchAPPL/Client/Serial.cc b/LaunchAPPL/Client/Serial.cc index c705b16b13..99cf6c8b82 100644 --- a/LaunchAPPL/Client/Serial.cc +++ b/LaunchAPPL/Client/Serial.cc @@ -9,6 +9,7 @@ #include #include +#include namespace po = boost::program_options; @@ -38,7 +39,7 @@ public: virtual bool Go(int timeout = 0); private: - void write(const void *p, size_t n) { rStream.write(p, n); } + void write(const void *p, size_t n); ssize_t read(void * p, size_t n); }; @@ -64,6 +65,8 @@ SerialStream::SerialStream(po::variables_map &options) } SerialStream::~SerialStream() { + tcdrain(fd); + usleep(500000); close(fd); } @@ -107,7 +110,7 @@ void SerialStream::wait() SerialLauncher::SerialLauncher(po::variables_map &options) - : Launcher(options), stream(options), rStream(stream) + : Launcher(options), stream(options), rStream(&stream) { } @@ -127,10 +130,25 @@ ssize_t SerialLauncher::read(void *p, size_t n) return available; } +void SerialLauncher::write(const void *p, size_t n) +{ + while(!rStream.readyToWrite()) + stream.wait(); + rStream.write(p, n); +} + + bool SerialLauncher::Go(int timeout) { uint32_t tmp; + rStream.reset(1); + std::cout << "reset send.\n"; + while(!rStream.resetResponseArrived()) + stream.wait(); + + std::cout << "reset response received.\n"; + { std::ostringstream rsrcOut; app.resources.writeFork(rsrcOut); @@ -145,6 +163,8 @@ bool SerialLauncher::Go(int timeout) write(data.data(), data.size()); write(rsrc.data(), rsrc.size()); } + while(!rStream.allDataArrived()) + stream.wait(); read(&tmp, 4); tmp = ntohl(tmp); diff --git a/LaunchAPPL/Common/ReliableStream.cc b/LaunchAPPL/Common/ReliableStream.cc index 08211614ab..fa1c7f4e84 100644 --- a/LaunchAPPL/Common/ReliableStream.cc +++ b/LaunchAPPL/Common/ReliableStream.cc @@ -18,7 +18,9 @@ enum thing : uint8_t kEscapedMagic1, kEscapedMagic2, kAck, - kNack + kNack, + kReset1, + kReset2 }; inline uint32_t readLong(uint8_t* p) @@ -31,25 +33,44 @@ inline uint32_t readLong(uint8_t* p) return x; } -ReliableStream::ReliableStream(Stream& stream) - : stream(stream) +ReliableStream::ReliableStream(Stream* stream) + : StreamWrapper(stream) { incomingPacket.reserve(packetSize + 4); - stream.setListener(this); } -ReliableStream::~ReliableStream() + +void ReliableStream::reset(int sendReset) { + printf("reset %d\n", sendReset); + receivedInputPacket = 0; + sentOutputPacket = 0; + ackedOutputPacket = 0; + + incomingPacket.clear(); + state = State::waiting; + sentPackets.clear(); + packetsToSend.clear(); + + resetResponse = false; + + if(sendReset) + { + uint8_t resetKind = sendReset == 1 ? kReset1 : kReset2; + uint8_t packet[] = { + magic1[0], magic1[1], magic1[2], magic1[3], + resetKind, (uint8_t)~resetKind + }; + underlying().write(packet, 6); + } } - - void ReliableStream::ack() { uint8_t packet[] = { magic1[0], magic1[1], magic1[2], magic1[3], kAck, (uint8_t)~kAck, (uint8_t)receivedInputPacket, (uint8_t)~receivedInputPacket }; - stream.write(packet, 8); + underlying().write(packet, 8); //printf("ack sent\n"); } @@ -59,14 +80,14 @@ void ReliableStream::nack() magic1[0], magic1[1], magic1[2], magic1[3], kNack, (uint8_t)~kNack, (uint8_t)receivedInputPacket, (uint8_t)~receivedInputPacket }; - stream.write(packet, 8); + underlying().write(packet, 8); //printf("nack sent\n"); } void ReliableStream::gotAck(uint8_t id) { - printf("got ack %d\n", (int)id); unsigned nAcked = (id - ackedOutputPacket) & 0xFF; + printf("got ack %d -> %u packets of %u acked\n", (int)id, nAcked, (unsigned)sentPackets.size()); if(nAcked <= sentPackets.size()) { ackedOutputPacket += nAcked; @@ -98,7 +119,7 @@ void ReliableStream::gotNack(uint8_t id) void ReliableStream::processIncoming() { - printf("Received packet %d - %d bytes\n", receivedInputPacket, (int)incomingPacket.size()); + printf("Received packet %d - %d bytes\n", receivedInputPacket + 1, (int)incomingPacket.size()); if(incomingPacket.size() < 4) { nack(); @@ -193,11 +214,13 @@ void ReliableStream::sendOnePacket() packet.push_back(kEndOfPacket); printf("sent packet: %d, total %d bytes\n", sentOutputPacket, (int)packet.size()); - stream.write(packet.data(), packet.size()); + printf("sendOnePacket: %d - %d packets, next = %d\n", (int)packetsToSend.size(), (int)sentPackets.size(), sentOutputPacket + 1); + underlying().write(packet.data(), packet.size()); } void ReliableStream::sendPackets() { + printf("sendPackets: %d - %d packets, next = %d\n", (int)packetsToSend.size(), (int)sentPackets.size(), sentOutputPacket + 1); while(!packetsToSend.empty() && sentPackets.size() < maxInFlight) sendOnePacket(); } @@ -229,6 +252,7 @@ void ReliableStream::write(const void* p, size_t n) } flushWrite(); } +//#include size_t ReliableStream::onReceive(const uint8_t* p, size_t n) { @@ -245,7 +269,15 @@ size_t ReliableStream::onReceive(const uint8_t* p, size_t n) printf("no magic\n"); nack(); gotNack(ackedOutputPacket); - return n > 4 ? 4 : n; + + if(p[0] != magic1[0]) + return 0; + else if(p[1] != magic1[1]) + return 1; + else if(p[2] != magic1[2]) + return 2; + else + return 3; } if(n < 6) return 0; @@ -304,6 +336,18 @@ size_t ReliableStream::onReceive(const uint8_t* p, size_t n) state = State::receiving; inputMatchMagic1 = inputMatchMagic2 = 0; return 8; + + case kReset1: + reset(2); + notifyReset(); + return 6; + + case kReset2: + reset(0); + resetResponse = true; + notifyReset(); + return 6; + default: state = State::skipping; nack(); diff --git a/LaunchAPPL/Common/ReliableStream.h b/LaunchAPPL/Common/ReliableStream.h index bfca35c81c..66b8dc53c8 100644 --- a/LaunchAPPL/Common/ReliableStream.h +++ b/LaunchAPPL/Common/ReliableStream.h @@ -6,16 +6,11 @@ #include #include -class ReliableStream : public Stream, public StreamListener +class ReliableStream : public StreamWrapper { - Stream& stream; - static const int maxInFlight = 4; static const int packetSize = 1024; - unsigned receivedInputPacket = 0; - unsigned sentOutputPacket = 0; - unsigned ackedOutputPacket = 0; void sendOnePacket(); void sendPackets(); @@ -34,6 +29,10 @@ class ReliableStream : public Stream, public StreamListener receiving }; + unsigned receivedInputPacket = 0; + unsigned sentOutputPacket = 0; + unsigned ackedOutputPacket = 0; + State state = State::waiting; std::vector incomingPacket; int inputMatchMagic1, inputMatchMagic2; @@ -41,14 +40,21 @@ class ReliableStream : public Stream, public StreamListener std::list> packetsToSend; std::list> sentPackets; -public: - ReliableStream(Stream& stream); - virtual ~ReliableStream(); + bool resetResponse = false; + virtual size_t onReceive(const uint8_t* p, size_t n); +public: + explicit ReliableStream(Stream* stream); + void reset(int sendReset); + bool resetResponseArrived() { return resetResponse; } + virtual void write(const void* p, size_t n) override; virtual void flushWrite() override; - virtual size_t onReceive(const uint8_t* p, size_t n) override; + + virtual bool readyToWrite() { return packetsToSend.empty() && underlying().readyToWrite(); } + bool allDataArrived() { return packetsToSend.empty() && sentPackets.empty() && underlying().readyToWrite(); } + }; #endif diff --git a/LaunchAPPL/Common/Stream.cc b/LaunchAPPL/Common/Stream.cc index f6f99d8b49..68aabae1cc 100644 --- a/LaunchAPPL/Common/Stream.cc +++ b/LaunchAPPL/Common/Stream.cc @@ -32,6 +32,12 @@ void Stream::notifyReceive(const uint8_t* p, size_t n) } } +void Stream::notifyReset() +{ + if(listener_) + listener_->onReset(); +} + long Stream::read(void *p, size_t n) { if(buffer_.size() <= n) @@ -48,3 +54,34 @@ long Stream::read(void *p, size_t n) return n; } } + +StreamWrapper::StreamWrapper(Stream* underlying) + : underlying_(underlying) +{ + if(underlying_) + underlying_->setListener(this); +} + +StreamWrapper::~StreamWrapper() +{ + if(underlying_) + underlying_->clearListener(this); +} + +StreamWrapper::StreamWrapper(StreamWrapper&& other) +{ + underlying_ = other.underlying_; + if(underlying_) + underlying_->setListener(this); + other.underlying_ = nullptr; +} + +StreamWrapper& StreamWrapper::operator=(StreamWrapper&& other) +{ + if(underlying_) + underlying_->clearListener(this); + underlying_ = other.underlying_; + if(underlying_) + underlying_->setListener(this); + other.underlying_ = nullptr; +} diff --git a/LaunchAPPL/Common/Stream.h b/LaunchAPPL/Common/Stream.h index 6590f80277..6342f7a2a6 100644 --- a/LaunchAPPL/Common/Stream.h +++ b/LaunchAPPL/Common/Stream.h @@ -9,6 +9,7 @@ class StreamListener { public: virtual size_t onReceive(const uint8_t* p, size_t n) = 0; + virtual void onReset() {} }; class Stream @@ -20,13 +21,32 @@ public: virtual ~Stream(); void setListener(StreamListener *l) { listener_ = l; } + void clearListener(StreamListener *l = nullptr) { if(!l || listener_ == l) listener_ = nullptr; } virtual void write(const void* p, size_t n) = 0; virtual void flushWrite() {} long read(void *p, size_t n); + virtual bool readyToWrite() { return true; } + bool readyToRead() { return !buffer_.empty(); } protected: void notifyReceive(const uint8_t* p, size_t n); + void notifyReset(); +}; + +class StreamWrapper : public Stream, private StreamListener +{ + Stream* underlying_; +public: + StreamWrapper(Stream* underlying_); + virtual ~StreamWrapper(); + + StreamWrapper(const StreamWrapper& other) = delete; + StreamWrapper& operator=(const StreamWrapper& other) = delete; + StreamWrapper(StreamWrapper&& other); + StreamWrapper& operator=(StreamWrapper&& other); + + Stream& underlying() const { return *underlying_; } }; #endif diff --git a/LaunchAPPL/Common/Test.cc b/LaunchAPPL/Common/Test.cc index 929882c058..43001715c7 100644 --- a/LaunchAPPL/Common/Test.cc +++ b/LaunchAPPL/Common/Test.cc @@ -65,9 +65,7 @@ int main() fwd1.prefix = "1->2"; fwd2.prefix = "2->1"; - ReliableStream rel1(fwd1), rel2(fwd2); - fwd1.setListener(&rel1); - fwd2.setListener(&rel2); + ReliableStream rel1(&fwd1), rel2(&fwd2); DumpToConsole dump1("one:"); DumpToConsole dump2("two:"); diff --git a/LaunchAPPL/Common/UnreliableStream.h b/LaunchAPPL/Common/UnreliableStream.h index 1c67639a7b..229b31fa30 100644 --- a/LaunchAPPL/Common/UnreliableStream.h +++ b/LaunchAPPL/Common/UnreliableStream.h @@ -4,15 +4,13 @@ #include "Stream.h" // A stream filter to simulate bit errors -class UnreliableStream : public Stream, private StreamListener +class UnreliableStream : public StreamWrapper { - Stream& stream_; int nextError = 0; public: - UnreliableStream(Stream& stream) - : stream_(stream) + UnreliableStream(Stream* stream) + : StreamWrapper(stream) { - stream_.setListener(this); setupNextError(); } virtual void write(const void* p, size_t n) override @@ -20,11 +18,11 @@ public: std::vector tmp(n); memcpy(tmp.data(), p, n); maybeFlipBit(tmp.data(), n); - stream_.write(tmp.data(),n); + underlying().write(tmp.data(),n); } virtual void flushWrite() override { - stream_.flushWrite(); + underlying().flushWrite(); } private: diff --git a/LaunchAPPL/Server/LaunchAPPLServer.cc b/LaunchAPPL/Server/LaunchAPPLServer.cc index 378b80077c..bcdc666f13 100644 --- a/LaunchAPPL/Server/LaunchAPPLServer.cc +++ b/LaunchAPPL/Server/LaunchAPPLServer.cc @@ -210,22 +210,37 @@ void SetStatus(AppStatus stat, int done = 0, int total = 0) InvalRect(&statusWindow->portRect); } -class Listener : public StreamListener +ProcessSerialNumber psn; + +class LaunchServer : public StreamListener { + Stream* stream; + uint32_t dataSize, rsrcSize; uint32_t remainingSize; short refNum; public: + LaunchServer(Stream* stream) : stream(stream) + { + stream->setListener(this); + } + enum class State { size, data, rsrc, launch, - stop + wait, + respond }; State state = State::size; + void onReset() + { + SetStatus(AppStatus::ready, 0, 0); + state = State::size; + } size_t onReceive(const uint8_t* p, size_t n) { @@ -317,13 +332,12 @@ int main() //#define SIMULATE_ERRORS #ifdef SIMULATE_ERRORS UnreliableStream uStream(stream); - ReliableStream rStream(uStream); + ReliableStream rStream(&uStream); #else - ReliableStream rStream(stream); + ReliableStream rStream(&stream); #endif - Listener listener; - rStream.setListener(&listener); + LaunchServer server(&rStream); for(;;) @@ -381,26 +395,57 @@ int main() stream.idle(); - if(listener.state == Listener::State::launch) + if(server.state == LaunchServer::State::launch) { - stream.close(); + //stream.close(); { LaunchParamBlockRec lpb; memset(&lpb, 0, sizeof(lpb)); - lpb.reserved1 = (unsigned long) "\pRetro68App"; + /* lpb.reserved1 = (unsigned long) "\pRetro68App"; lpb.reserved2 = 0; lpb.launchBlockID = extendedBlock; lpb.launchEPBLength = 6; lpb.launchFileFlags = 0; - lpb.launchControlFlags = 0xC000; + lpb.launchControlFlags = 0xC000;*/ + + FSSpec spec; + FSMakeFSSpec(0,0,"\pRetro68App",&spec); + lpb.launchBlockID = extendedBlock; + lpb.launchEPBLength = extendedBlockLen; + lpb.launchFileFlags = 0; + lpb.launchControlFlags = launchContinue; + lpb.launchAppSpec = &spec; + - printf("Launching...\n"); OSErr err = LaunchApplication(&lpb); - listener.state = Listener::State::size; + psn = lpb.launchProcessSN; + server.state = LaunchServer::State::wait; + + SetStatus(AppStatus::running, 0, 0); + + } + } + else if(server.state == LaunchServer::State::wait) + { + ProcessInfoRec info; + OSErr err = GetProcessInformation(&psn,&info); + if(err) + { + server.state = LaunchServer::State::respond; + SetStatus(AppStatus::uploading, 0, 0); uint32_t zero = 0; - stream.write(&zero, 4); - stream.flushWrite(); + rStream.write(&zero, 4); + rStream.flushWrite(); + } + } + else if(server.state == LaunchServer::State::respond) + { + if(rStream.allDataArrived()) + { + server.state = LaunchServer::State::size; + SetStatus(AppStatus::ready, 0, 0); + rStream.reset(0); } } }