LaunchAPPL/Serial: Various retransmission fixes

This commit is contained in:
Wolfgang Thaller 2018-04-23 00:32:43 +02:00
parent 033a1561e5
commit 89c6471153
4 changed files with 85 additions and 61 deletions

View File

@ -67,9 +67,14 @@ void ReliableStream::gotAck(uint8_t id)
{
printf("got ack %d\n", (int)id);
unsigned nAcked = (id - ackedOutputPacket) & 0xFF;
ackedOutputPacket += nAcked;
for(int i = 0; i < nAcked; i++)
sentPackets.pop_front();
if(nAcked <= sentPackets.size())
{
ackedOutputPacket += nAcked;
for(int i = 0; i < nAcked; i++)
sentPackets.pop_front();
sendPackets();
}
}
void ReliableStream::gotNack(uint8_t id)
@ -77,14 +82,18 @@ void ReliableStream::gotNack(uint8_t id)
printf("got nack %d\n", (int)id);
unsigned nAcked = (id - ackedOutputPacket) & 0xFF;
ackedOutputPacket += nAcked;
for(int i = 0; i < nAcked; i++)
sentPackets.pop_front();
if(nAcked <= sentPackets.size())
{
ackedOutputPacket += nAcked;
for(int i = 0; i < nAcked; i++)
sentPackets.pop_front();
sentOutputPacket = ackedOutputPacket;
sentOutputPacket = ackedOutputPacket;
packetsToSend.splice(packetsToSend.begin(), sentPackets);
flushWrite();
packetsToSend.splice(packetsToSend.begin(), sentPackets);
sendPackets();
}
}
void ReliableStream::processIncoming()
@ -114,7 +123,7 @@ void ReliableStream::processIncoming()
incomingPacket.clear();
}
void ReliableStream::sendPacket()
void ReliableStream::sendOnePacket()
{
if(packetsToSend.empty())
return;
@ -137,38 +146,40 @@ void ReliableStream::sendPacket()
int consumed = 0;
for(i = 0; i < n; i++)
{
if(p[i] == magic1[match++])
{
if(match == 4)
{
packet.push_back(magic2[0]);
packet.push_back(magic2[1]);
packet.push_back(magic2[2]);
packet.push_back(magic2[3]);
packet.push_back(kEscapedMagic1);
match = 0;
consumed = i+1;
}
}
if(p[i] == magic1[match])
match++;
else if(p[i] == magic1[0])
match = 1;
else
match = 0;
if(match == 4)
{
packet.push_back(magic2[0]);
packet.push_back(magic2[1]);
packet.push_back(magic2[2]);
packet.push_back(magic2[3]);
packet.push_back(kEscapedMagic1);
match = 0;
consumed = i+1;
}
if(match == 0)
while(consumed <= i)
packet.push_back(p[consumed++]);
if(p[i] == magic2[match2++])
{
if(match2 == 4)
{
match2 = 0;
packet.push_back(kEscapedMagic2);
}
}
if(p[i] == magic2[match2])
match2++;
else if(p[i] == magic2[0])
match2 = 1;
else
match2 = 0;
if(match2 == 4)
{
match2 = 0;
packet.push_back(kEscapedMagic2);
}
}
while(consumed < i)
@ -185,17 +196,19 @@ void ReliableStream::sendPacket()
stream.write(packet.data(), packet.size());
}
void ReliableStream::sendPackets()
{
while(!packetsToSend.empty() && sentPackets.size() < maxInFlight)
sendOnePacket();
}
void ReliableStream::flushWrite()
{
while(!packetsToSend.empty())
sendPacket();
sendPackets();
}
void ReliableStream::write(const void* p, size_t n)
{
const size_t packetSize = 1024;
while(n)
{
size_t n1 = n > packetSize ? packetSize : n;
@ -326,33 +339,24 @@ size_t ReliableStream::onReceive(const uint8_t* p, size_t n)
int match = 0, match2 = 0;
int i;
int consumed = 0;
for(i = 0; i < n; i++)
{
if(p[i] == magic1[match++])
{
if(match == 4)
{
state = State::waiting;
nack();
return i-3;
}
}
else
match = 0;
incomingPacket.push_back(p[i]);
if(match2 == 4)
{
match2 = 0;
consumed = i+1;
incomingPacket.pop_back();
switch(p[i])
{
case kEscapedMagic1:
incomingPacket.insert(incomingPacket.end(), begin(magic1), end(magic1));
std::copy(begin(magic1), end(magic1), incomingPacket.end()-4);
break;
case kEscapedMagic2:
incomingPacket.insert(incomingPacket.end(), begin(magic2), end(magic2));
break;
case kEndOfPacket:
incomingPacket.erase(incomingPacket.end()-4, incomingPacket.end());
processIncoming();
state = State::waiting;
return i + 1;
@ -362,19 +366,27 @@ size_t ReliableStream::onReceive(const uint8_t* p, size_t n)
return i + 1;
}
}
else if(p[i] == magic2[match2++])
;
else
{
if(p[i] != magic1[match])
match = 0;
if(p[i] == magic1[match])
match++;
if(p[i] != magic2[match2])
match2 = 0;
if(p[i] == magic2[match2])
match2++;
if(match == 4)
{
state = State::waiting;
nack();
return i-3;
}
if(match == 0 && match2 == 0)
while(consumed <= i)
incomingPacket.push_back(p[consumed++]);
}
return i - std::max(match, match2);
return n - std::max(match, match2);
}
break;
}

View File

@ -10,13 +10,15 @@ class ReliableStream : public Stream, public StreamListener
{
Stream& stream;
static const int packetSize = 256;
static const int maxInFlight = 4;
static const int packetSize = 1024;
unsigned receivedInputPacket = 0;
unsigned sentOutputPacket = 0;
unsigned ackedOutputPacket = 0;
void sendPacket();
void sendOnePacket();
void sendPackets();
void nack();
void ack();

View File

@ -3,15 +3,21 @@
#include <vector>
#include <string>
#include <stdint.h>
#include <iomanip>
class Forwarder : public Stream
{
public:
std::string prefix;
Forwarder *other;
std::vector<std::vector<uint8_t>> packets;
virtual void write(const void* p, size_t n)
{
std::cout << prefix << ": ";
for(int i = 0; i < n; i++)
std::cout << std::hex << std::setfill('0') << std::setw(2) << (int) ((uint8_t*)p)[i] << " ";
std::cout << std::endl;
other->enqueueReceive(p,n);
}
@ -56,6 +62,8 @@ int main()
Forwarder fwd1, fwd2;
fwd1.other = &fwd2;
fwd2.other = &fwd1;
fwd1.prefix = "1->2";
fwd2.prefix = "2->1";
ReliableStream rel1(fwd1), rel2(fwd2);
fwd1.setListener(&rel1);
@ -70,9 +78,11 @@ int main()
rel1.write("Hello, world.", 13);
rel1.flushWrite();
for(int i = 0; i < 100; i++)
for(int i = 0; i < 10; i++)
{
std::cout << "handle 1:\n";
fwd1.idle();
std::cout << "handle 2:\n";
fwd2.idle();
}
return 0;

View File

@ -39,7 +39,7 @@ private:
void setupNextError()
{
nextError += 8 * 1000 + 3;
nextError += rand() % (40 * 8 * 1000 + 3);
}
void maybeFlipBit(uint8_t* p, size_t n)