diff options
author | alexvru <alexvru@ydb.tech> | 2023-05-17 13:05:38 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-05-17 13:05:38 +0300 |
commit | 279acaad8dc9d7d8d2a596bd1d77cb1cf9e55a46 (patch) | |
tree | 6265cc7b56cb3fa103576de08b7c67b3a75063d1 /library/cpp | |
parent | 713edbc77c2822c6dab831a34c0e6e8e82028700 (diff) | |
download | ydb-279acaad8dc9d7d8d2a596bd1d77cb1cf9e55a46.tar.gz |
Fix OpenSSL problem
Diffstat (limited to 'library/cpp')
4 files changed, 25 insertions, 17 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index af63e72fb40..3082e08a78e 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -214,8 +214,8 @@ namespace NInterconnect { token.Request(read, write); } - bool TStreamSocket::ExpectingCertainWrite() const { - return false; + size_t TStreamSocket::ExpectedWriteLength() const { + return 0; } ////////////////////////////////////////////////////// @@ -500,8 +500,8 @@ namespace NInterconnect { return res; } - bool ExpectingCertainWrite() const { - return BlockedSend.has_value(); + size_t ExpectedWriteLength() const { + return BlockedSend ? BlockedSend->second : 0; } std::optional<std::pair<void*, size_t>> BlockedReceive; @@ -651,7 +651,7 @@ namespace NInterconnect { token.Request(WantRead(), WantWrite()); } - bool TSecureSocket::ExpectingCertainWrite() const { - return Impl->ExpectingCertainWrite(); + size_t TSecureSocket::ExpectedWriteLength() const { + return Impl->ExpectedWriteLength(); } } diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index a25185fc507..54139aed8dc 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -70,7 +70,7 @@ namespace NInterconnect { virtual void Request(NActors::TPollerToken& token, bool read, bool write); - virtual bool ExpectingCertainWrite() const; + virtual size_t ExpectedWriteLength() const; }; class TSecureSocketContext { @@ -123,7 +123,7 @@ namespace NInterconnect { bool WantRead() const; bool WantWrite() const; void Request(NActors::TPollerToken& token, bool read, bool write) override; - bool ExpectingCertainWrite() const override; + size_t ExpectedWriteLength() const override; }; class TDatagramSocket: public TSocket { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2f2b175530d..a5d0b3db9d4 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -269,7 +269,7 @@ namespace NActors { XdcStream.Rewind(); OutgoingOffset = XdcOffset = 0; OutgoingIndex = 0; - ForceCurrentPacket = false; + ForcedWriteLength = 0; const ui64 serial = OutputCounter - SendQueue.size() + 1; Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed); @@ -589,11 +589,19 @@ namespace NActors { Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream)); if (OutOfBandStream) { - // we need to align main stream to single packet boundary, so calculate the boundary - bytesToSendInMain = sendQueueIt == SendQueue.end() ? 0 : // we have no packet to send - ForceCurrentPacket || OutgoingOffset ? sendQueueIt->PacketSize - OutgoingOffset : // we have to finish current packet - 0; // we haven't sent any of current packet content - Y_VERIFY_DEBUG(bytesToSendInMain || !ForceCurrentPacket); + bytesToSendInMain = 0; + + if (!ForcedWriteLength && OutgoingOffset) { + ForcedWriteLength = 1; // send at least one byte from current packet + } + + // align send up to packet boundary + size_t offset = OutgoingOffset; + for (auto it = sendQueueIt; ForcedWriteLength; ++it, offset = 0) { + Y_VERIFY_DEBUG(it != SendQueue.end()); + bytesToSendInMain += it->PacketSize - offset; // send remainder of current packet + ForcedWriteLength -= Min(it->PacketSize - offset, ForcedWriteLength); + } } if (bytesToSendInMain) { @@ -611,10 +619,10 @@ namespace NActors { bytesToSendInMain -= w; } - ForceCurrentPacket = Socket ? Socket->ExpectingCertainWrite() : false; + ForcedWriteLength = Socket ? Socket->ExpectedWriteLength() : 0; } - if (!bytesToSendInMain && !ForceCurrentPacket) { + if (!bytesToSendInMain && !ForcedWriteLength) { if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) { OutOfBandStream.DropFront(w); BytesWrittenToSocket += w; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index db245847c47..f11ae505f83 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -570,7 +570,7 @@ namespace NActors { size_t OutgoingOffset = 0; size_t XdcOffset = 0; size_t OutgoingIndex = 0; // index into current packet in SendQueue - bool ForceCurrentPacket = false; + size_t ForcedWriteLength = 0; ui64 XdcBytesSent = 0; |