diff --git a/LaunchAPPL/Common/ReliableStream.cc b/LaunchAPPL/Common/ReliableStream.cc index 7f07f5aef6..1f3598540a 100644 --- a/LaunchAPPL/Common/ReliableStream.cc +++ b/LaunchAPPL/Common/ReliableStream.cc @@ -152,6 +152,9 @@ void ReliableStream::sendOnePacket() const uint8_t* p = packetsToSend.front().data(); size_t n = packetsToSend.front().size(); + if(n < packetSize) + flushRequested = false; + ++sentOutputPacket; sentPackets.splice(sentPackets.end(), packetsToSend, packetsToSend.begin()); @@ -221,12 +224,14 @@ void ReliableStream::sendOnePacket() void ReliableStream::sendPackets() { //printf("sendPackets: %d - %d packets, next = %d\n", (int)packetsToSend.size(), (int)sentPackets.size(), sentOutputPacket + 1); - while(!packetsToSend.empty() && sentPackets.size() < maxInFlight) + while(!packetsToSend.empty() && sentPackets.size() < maxInFlight + && (flushRequested || sentPackets.front().size() >= packetSize)) sendOnePacket(); } void ReliableStream::flushWrite() { + flushRequested = true; sendPackets(); } @@ -234,13 +239,35 @@ void ReliableStream::write(const void* p, size_t n) { while(n) { - size_t n1 = n > packetSize ? packetSize : n; + size_t availableSpaceInPacket = 0; + if(!packetsToSend.empty()) + availableSpaceInPacket = packetSize - (packetsToSend.back().size()-4); - packetsToSend.emplace_back(); + uint32_t crc = 0; + + if(availableSpaceInPacket) + { + auto& packet = packetsToSend.back(); + + auto p = packet.end() - 4; + crc = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3]; + packet.erase(p, packet.end()); + } + else + { + packetsToSend.emplace_back(); + availableSpaceInPacket = packetSize; + flushRequested = false; + } + + size_t n1 = n > availableSpaceInPacket ? availableSpaceInPacket : n; + auto& packet = packetsToSend.back(); - packet.reserve(n1 + 4); - packet.insert(packet.end(), (const uint8_t*)p, ((const uint8_t*)p)+n1); - uint32_t crc = crc32(0, packet.begin(), packet.end()); + packet.reserve(packetSize + 4); + + auto newData = packet.end(); + packet.insert(newData, (const uint8_t*)p, ((const uint8_t*)p)+n1); + crc = crc32(crc, newData, packet.end()); //printf("outgoing crc: %x (bytes: %d without crc and header)\n", (unsigned) crc, (int)packet.size()); packet.push_back(crc >> 24); packet.push_back(crc >> 16); diff --git a/LaunchAPPL/Common/ReliableStream.h b/LaunchAPPL/Common/ReliableStream.h index 6a913b846f..20e55b57e6 100644 --- a/LaunchAPPL/Common/ReliableStream.h +++ b/LaunchAPPL/Common/ReliableStream.h @@ -40,6 +40,7 @@ class ReliableStream : public StreamWrapper std::list> sentPackets; bool resetResponse = false; + bool flushRequested = false; virtual size_t onReceive(const uint8_t* p, size_t n); public: