#ifndef RELIABLESTREAM_H_ #define RELIABLESTREAM_H_ #include "Stream.h" #include #include class ReliableStream : public StreamWrapper { static const int maxInFlight = 3; static const int packetSize = 1024; void sendOnePacket(); void sendPackets(); void nack(); void ack(); void gotNack(uint8_t id); void gotAck(uint8_t id); void processIncoming(); enum class State { waiting, skipping, receiving }; unsigned receivedInputPacket = 0; unsigned sentOutputPacket = 0; unsigned ackedOutputPacket = 0; State state = State::waiting; std::vector incomingPacket; int inputMatchMagic1, inputMatchMagic2; std::list> packetsToSend; std::list> sentPackets; bool resetResponse = false; virtual size_t onReceive(const uint8_t* p, size_t n); public: explicit ReliableStream(Stream* stream); void reset(int sendReset); bool resetResponseArrived() { return resetResponse; } virtual void write(const void* p, size_t n) override; virtual void flushWrite() override; virtual bool readyToWrite() { return packetsToSend.empty() && underlying().readyToWrite(); } bool allDataArrived() { return packetsToSend.empty() && sentPackets.empty() && underlying().readyToWrite(); } }; #endif