aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-05-17 13:05:38 +0300
committeralexvru <alexvru@ydb.tech>2023-05-17 13:05:38 +0300
commit279acaad8dc9d7d8d2a596bd1d77cb1cf9e55a46 (patch)
tree6265cc7b56cb3fa103576de08b7c67b3a75063d1 /library/cpp
parent713edbc77c2822c6dab831a34c0e6e8e82028700 (diff)
downloadydb-279acaad8dc9d7d8d2a596bd1d77cb1cf9e55a46.tar.gz
Fix OpenSSL problem
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp24
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h2
4 files changed, 25 insertions, 17 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp
index af63e72fb40..3082e08a78e 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.cpp
+++ b/library/cpp/actors/interconnect/interconnect_stream.cpp
@@ -214,8 +214,8 @@ namespace NInterconnect {
token.Request(read, write);
}
- bool TStreamSocket::ExpectingCertainWrite() const {
- return false;
+ size_t TStreamSocket::ExpectedWriteLength() const {
+ return 0;
}
//////////////////////////////////////////////////////
@@ -500,8 +500,8 @@ namespace NInterconnect {
return res;
}
- bool ExpectingCertainWrite() const {
- return BlockedSend.has_value();
+ size_t ExpectedWriteLength() const {
+ return BlockedSend ? BlockedSend->second : 0;
}
std::optional<std::pair<void*, size_t>> BlockedReceive;
@@ -651,7 +651,7 @@ namespace NInterconnect {
token.Request(WantRead(), WantWrite());
}
- bool TSecureSocket::ExpectingCertainWrite() const {
- return Impl->ExpectingCertainWrite();
+ size_t TSecureSocket::ExpectedWriteLength() const {
+ return Impl->ExpectedWriteLength();
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h
index a25185fc507..54139aed8dc 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.h
+++ b/library/cpp/actors/interconnect/interconnect_stream.h
@@ -70,7 +70,7 @@ namespace NInterconnect {
virtual void Request(NActors::TPollerToken& token, bool read, bool write);
- virtual bool ExpectingCertainWrite() const;
+ virtual size_t ExpectedWriteLength() const;
};
class TSecureSocketContext {
@@ -123,7 +123,7 @@ namespace NInterconnect {
bool WantRead() const;
bool WantWrite() const;
void Request(NActors::TPollerToken& token, bool read, bool write) override;
- bool ExpectingCertainWrite() const override;
+ size_t ExpectedWriteLength() const override;
};
class TDatagramSocket: public TSocket {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2f2b175530d..a5d0b3db9d4 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -269,7 +269,7 @@ namespace NActors {
XdcStream.Rewind();
OutgoingOffset = XdcOffset = 0;
OutgoingIndex = 0;
- ForceCurrentPacket = false;
+ ForcedWriteLength = 0;
const ui64 serial = OutputCounter - SendQueue.size() + 1;
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
@@ -589,11 +589,19 @@ namespace NActors {
Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream));
if (OutOfBandStream) {
- // we need to align main stream to single packet boundary, so calculate the boundary
- bytesToSendInMain = sendQueueIt == SendQueue.end() ? 0 : // we have no packet to send
- ForceCurrentPacket || OutgoingOffset ? sendQueueIt->PacketSize - OutgoingOffset : // we have to finish current packet
- 0; // we haven't sent any of current packet content
- Y_VERIFY_DEBUG(bytesToSendInMain || !ForceCurrentPacket);
+ bytesToSendInMain = 0;
+
+ if (!ForcedWriteLength && OutgoingOffset) {
+ ForcedWriteLength = 1; // send at least one byte from current packet
+ }
+
+ // align send up to packet boundary
+ size_t offset = OutgoingOffset;
+ for (auto it = sendQueueIt; ForcedWriteLength; ++it, offset = 0) {
+ Y_VERIFY_DEBUG(it != SendQueue.end());
+ bytesToSendInMain += it->PacketSize - offset; // send remainder of current packet
+ ForcedWriteLength -= Min(it->PacketSize - offset, ForcedWriteLength);
+ }
}
if (bytesToSendInMain) {
@@ -611,10 +619,10 @@ namespace NActors {
bytesToSendInMain -= w;
}
- ForceCurrentPacket = Socket ? Socket->ExpectingCertainWrite() : false;
+ ForcedWriteLength = Socket ? Socket->ExpectedWriteLength() : 0;
}
- if (!bytesToSendInMain && !ForceCurrentPacket) {
+ if (!bytesToSendInMain && !ForcedWriteLength) {
if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) {
OutOfBandStream.DropFront(w);
BytesWrittenToSocket += w;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index db245847c47..f11ae505f83 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -570,7 +570,7 @@ namespace NActors {
size_t OutgoingOffset = 0;
size_t XdcOffset = 0;
size_t OutgoingIndex = 0; // index into current packet in SendQueue
- bool ForceCurrentPacket = false;
+ size_t ForcedWriteLength = 0;
ui64 XdcBytesSent = 0;