summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-05-17 13:05:38 +0300
committeralexvru <[email protected]>2023-05-17 13:05:38 +0300
commit279acaad8dc9d7d8d2a596bd1d77cb1cf9e55a46 (patch)
tree6265cc7b56cb3fa103576de08b7c67b3a75063d1 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parent713edbc77c2822c6dab831a34c0e6e8e82028700 (diff)
Fix OpenSSL problem
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp24
1 files changed, 16 insertions, 8 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2f2b175530d..a5d0b3db9d4 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -269,7 +269,7 @@ namespace NActors {
XdcStream.Rewind();
OutgoingOffset = XdcOffset = 0;
OutgoingIndex = 0;
- ForceCurrentPacket = false;
+ ForcedWriteLength = 0;
const ui64 serial = OutputCounter - SendQueue.size() + 1;
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
@@ -589,11 +589,19 @@ namespace NActors {
Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream));
if (OutOfBandStream) {
- // we need to align main stream to single packet boundary, so calculate the boundary
- bytesToSendInMain = sendQueueIt == SendQueue.end() ? 0 : // we have no packet to send
- ForceCurrentPacket || OutgoingOffset ? sendQueueIt->PacketSize - OutgoingOffset : // we have to finish current packet
- 0; // we haven't sent any of current packet content
- Y_VERIFY_DEBUG(bytesToSendInMain || !ForceCurrentPacket);
+ bytesToSendInMain = 0;
+
+ if (!ForcedWriteLength && OutgoingOffset) {
+ ForcedWriteLength = 1; // send at least one byte from current packet
+ }
+
+ // align send up to packet boundary
+ size_t offset = OutgoingOffset;
+ for (auto it = sendQueueIt; ForcedWriteLength; ++it, offset = 0) {
+ Y_VERIFY_DEBUG(it != SendQueue.end());
+ bytesToSendInMain += it->PacketSize - offset; // send remainder of current packet
+ ForcedWriteLength -= Min(it->PacketSize - offset, ForcedWriteLength);
+ }
}
if (bytesToSendInMain) {
@@ -611,10 +619,10 @@ namespace NActors {
bytesToSendInMain -= w;
}
- ForceCurrentPacket = Socket ? Socket->ExpectingCertainWrite() : false;
+ ForcedWriteLength = Socket ? Socket->ExpectedWriteLength() : 0;
}
- if (!bytesToSendInMain && !ForceCurrentPacket) {
+ if (!bytesToSendInMain && !ForcedWriteLength) {
if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) {
OutOfBandStream.DropFront(w);
BytesWrittenToSocket += w;