aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-26 10:57:10 +0300
committeralexvru <alexvru@ydb.tech>2023-04-26 10:57:10 +0300
commit9f32e2cd6cf2ae4d6651dfb413bc8401a86a597f (patch)
tree9bf57a3b9c7bdf6e001436c11499579062cdb1d6
parentc4f044f20585c080c4c1c6587148079ab7f59d0b (diff)
downloadydb-9f32e2cd6cf2ae4d6651dfb413bc8401a86a597f.tar.gz
Improve IC and XDC
-rw-r--r--library/cpp/actors/core/event_pb.cpp3
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp5
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.h5
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp20
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp70
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h5
-rw-r--r--library/cpp/actors/interconnect/packet.h14
8 files changed, 112 insertions, 22 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index 4e37f36f70..7b35f07e01 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -77,7 +77,8 @@ namespace NActors {
if (CancelFlag || AbortFlag) {
return false;
} else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
- if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64) {
+ if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64 &&
+ (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) {
memcpy(BufferPtr, data, bytesToAppend);
if (!Produce(BufferPtr, bytesToAppend)) {
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 371f9b7653..232f440cad 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -32,7 +32,10 @@ namespace NActors {
event.Descr.Sender,
event.Descr.Cookie,
{},
- event.Descr.Checksum
+ event.Descr.Checksum,
+#if IC_FORCE_HARDENED_PACKET_CHECKS
+ event.EventSerializedSize
+#endif
};
traceId.Serialize(&descr.TraceId);
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp
index 96ee13a3f5..af63e72fb4 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.cpp
+++ b/library/cpp/actors/interconnect/interconnect_stream.cpp
@@ -214,6 +214,10 @@ namespace NInterconnect {
token.Request(read, write);
}
+ bool TStreamSocket::ExpectingCertainWrite() const {
+ return false;
+ }
+
//////////////////////////////////////////////////////
TDatagramSocket::TPtr TDatagramSocket::Make(int domain) {
@@ -496,6 +500,10 @@ namespace NInterconnect {
return res;
}
+ bool ExpectingCertainWrite() const {
+ return BlockedSend.has_value();
+ }
+
std::optional<std::pair<void*, size_t>> BlockedReceive;
ssize_t Recv(void* msg, size_t len, TString *err) {
@@ -642,4 +650,8 @@ namespace NInterconnect {
void TSecureSocket::Request(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) {
token.Request(WantRead(), WantWrite());
}
+
+ bool TSecureSocket::ExpectingCertainWrite() const {
+ return Impl->ExpectingCertainWrite();
+ }
}
diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h
index 55438fef10..a25185fc50 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.h
+++ b/library/cpp/actors/interconnect/interconnect_stream.h
@@ -69,6 +69,8 @@ namespace NInterconnect {
ui32 GetSendBufferSize() const;
virtual void Request(NActors::TPollerToken& token, bool read, bool write);
+
+ virtual bool ExpectingCertainWrite() const;
};
class TSecureSocketContext {
@@ -120,7 +122,8 @@ namespace NInterconnect {
bool WantRead() const;
bool WantWrite() const;
- virtual void Request(NActors::TPollerToken& token, bool read, bool write) override;
+ void Request(NActors::TPollerToken& token, bool read, bool write) override;
+ bool ExpectingCertainWrite() const override;
};
class TDatagramSocket: public TSocket {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 925dbef823..37e1bb8b5f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -409,7 +409,10 @@ namespace NActors {
v2.Sender,
v2.Cookie,
NWilson::TTraceId(v2.TraceId),
- v2.Checksum
+ v2.Checksum,
+#if IC_FORCE_HARDENED_PACKET_CHECKS
+ v2.Len
+#endif
};
Metrics->IncInputChannelsIncomingEvents(channel);
@@ -420,9 +423,9 @@ namespace NActors {
}
// mark packet as processed
- ProcessInboundPacketQ(0);
XdcCatchStreamFinal = XdcCatchStreamFinalPending;
Context->LastProcessedSerial += !IgnorePayload;
+ ProcessInboundPacketQ(0);
++PacketsReadFromSocket;
++DataPacketsReadFromSocket;
@@ -442,8 +445,10 @@ namespace NActors {
break;
}
- if (!Context->AdvanceLastPacketSerialToConfirm(front.Serial)) {
- throw TExDestroySession{TDisconnectReason::NewSession()};
+ const bool success = Context->AdvanceLastPacketSerialToConfirm(front.Serial);
+ Y_VERIFY_DEBUG(Context->GetLastPacketSerialToConfirm() <= Context->LastProcessedSerial);
+ if (!success) {
+ throw TExReestablishConnection{TDisconnectReason::NewSession()};
}
}
}
@@ -531,6 +536,13 @@ namespace NActors {
}
auto& descr = *pendingEvent.EventData;
+#if IC_FORCE_HARDENED_PACKET_CHECKS
+ if (descr.Len != pendingEvent.Payload.GetSize()) {
+ LOG_CRIT_IC_SESSION("ICISxx", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32,
+ descr.Type, pendingEvent.Payload.GetSize(), descr.Len);
+ throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
+ }
+#endif
if (descr.Checksum) {
ui32 checksum = 0;
for (const auto&& [data, size] : pendingEvent.Payload) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index e8fc974433..b0b6dc0b66 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -78,7 +78,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) {
- LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1));
+ LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64 " reason# %s", (Socket ? i64(*Socket) : -1), reason.ToString().data());
IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this);
ShutdownSocket(std::move(reason));
@@ -277,9 +277,12 @@ namespace NActors {
// also reset SendQueuePos
// drop confirmed packets first as we do not need unwanted retransmissions
+ WriteSinglePacketAndDropConfirmed = false;
OutgoingStream.RewindToEnd();
XdcStream.RewindToEnd();
- DropConfirmed(nextPacket, true);
+ SendQueuePos = SendQueue.size();
+ SendOffset = 0;
+ DropConfirmed(nextPacket);
OutgoingStream.Rewind();
XdcStream.Rewind();
SendQueuePos = 0;
@@ -577,9 +580,9 @@ namespace NActors {
ui64 written = 0;
auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket,
- const TPollerToken::TPtr& token, bool *writeBlocked, auto&& callback) {
+ const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes, auto&& callback) {
while (stream && socket) {
- ssize_t r = Write(stream, *socket);
+ ssize_t r = Write(stream, *socket, maxBytes);
if (r == -1) {
*writeBlocked = true;
if (token) {
@@ -603,7 +606,11 @@ namespace NActors {
Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
- process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, [&](size_t r) {
+ Y_VERIFY_DEBUG(!WriteSinglePacketAndDropConfirmed || SendQueuePos != SendQueue.size());
+ const size_t maxBytesFromMainStream = WriteSinglePacketAndDropConfirmed
+ ? SendQueue[SendQueuePos].PacketSize - SendOffset
+ : Max<size_t>();
+ process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesFromMainStream, [&](size_t r) {
Y_VERIFY(r <= BytesUnwritten);
BytesUnwritten -= r;
@@ -621,12 +628,16 @@ namespace NActors {
++PacketsWrittenToSocket;
++packets;
Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
+
+ if (std::exchange(WriteSinglePacketAndDropConfirmed, false)) { // drop remaining already confirmed packets
+ DropConfirmed(LastConfirmed);
+ }
}
Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
});
- process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, [&](size_t r) {
+ process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>(), [&](size_t r) {
XdcBytesSent += r;
XdcStream.Advance(r);
});
@@ -645,7 +656,8 @@ namespace NActors {
WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
}
- ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) {
+ ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket,
+ size_t maxBytes) {
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
constexpr ui32 iovLimit = 256;
@@ -667,6 +679,21 @@ namespace NActors {
stream.ProduceIoVec(wbuffers, maxElementsInIOV);
Y_VERIFY(!wbuffers.empty());
+ if (maxBytes != Max<size_t>()) {
+ for (auto it = wbuffers.begin(); it != wbuffers.end(); ++it) {
+ if (maxBytes < it->Size) {
+ if (maxBytes) {
+ it->Size = maxBytes;
+ ++it;
+ }
+ wbuffers.erase(it, wbuffers.end());
+ break;
+ } else {
+ maxBytes -= it->Size;
+ }
+ }
+ }
+
TString err;
ssize_t r = 0;
{ // issue syscall with timing
@@ -828,7 +855,7 @@ namespace NActors {
return packetSize;
}
- bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm, bool ignoreSendQueuePos) {
+ bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
@@ -845,14 +872,13 @@ namespace NActors {
size_t unsentBytesInSendQueue = 0;
for (size_t i = 0; i < SendQueue.size(); ++i) {
totalBytesInSendQueue += SendQueue[i].PacketSize;
- if (i > SendQueuePos) {
+ if (SendQueuePos <= i) {
unsentBytesInSendQueue += SendQueue[i].PacketSize;
- } else if (i == SendQueuePos) {
- unsentBytesInSendQueue += SendQueue[i].PacketSize - SendOffset;
}
}
+ unsentBytesInSendQueue -= SendOffset;
Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize());
- Y_VERIFY(ignoreSendQueuePos || unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize());
+ Y_VERIFY(unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize());
}
#endif
@@ -865,14 +891,32 @@ namespace NActors {
if (front.Data && confirm < front.Serial) {
break;
}
+ if (!SendQueuePos) {
+ // we are sending this packet right now; it might be a race -- we are resending packets after reconnection,
+ // while getting confirmation for higher packet number (as we have sent it before); here we have to send
+ // these packets (if necessary) and skip the rest of them
+ if (!SendOffset && Socket && !Socket->ExpectingCertainWrite()) {
+ // just advance to next packet -- we won't send current one
+ ++SendQueuePos;
+ } else {
+ // can't skip packets now, make a mark and exit
+ WriteSinglePacketAndDropConfirmed = true;
+ break;
+ }
+ }
if (front.Data) {
lastDroppedSerial.emplace(front.Serial);
}
bytesDropped += front.PacketSize;
bytesDroppedFromXdc += front.ExternalSize;
++numDropped;
- Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0);
+ Y_VERIFY_DEBUG(SendQueuePos != 0);
+ }
+
+ if (!numDropped) {
+ return false;
}
+
const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped;
OutgoingStream.DropFront(bytesDropped);
XdcStream.DropFront(bytesDroppedFromXdc);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 1437f7df2c..af6c14d824 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -459,11 +459,11 @@ namespace NActors {
void Handle(TEvPollerReady::TPtr& ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
void WriteData();
- ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket);
+ ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes);
ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
- bool DropConfirmed(ui64 confirm, bool ignoreSendQueuePos = false);
+ bool DropConfirmed(ui64 confirm);
void ShutdownSocket(TDisconnectReason reason);
void StartHandshake();
@@ -532,6 +532,7 @@ namespace NActors {
std::deque<TOutgoingPacket> SendQueue; // packet boundaries
size_t SendQueuePos = 0; // packet being sent now
size_t SendOffset = 0;
+ bool WriteSinglePacketAndDropConfirmed = false;
ui64 XdcBytesSent = 0;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 3f7eda1e9c..b4d672ee31 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -20,6 +20,14 @@
#define FORCE_EVENT_CHECKSUM 0
#endif
+// WARNING: turning this feature on will make protocol incompatible with ordinary Interconnect, use with caution
+#define IC_FORCE_HARDENED_PACKET_CHECKS 0
+
+#if IC_FORCE_HARDENED_PACKET_CHECKS
+#undef FORCE_EVENT_CHECKSUM
+#define FORCE_EVENT_CHECKSUM 1
+#endif
+
Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) {
if constexpr (NSan::MSanIsOn()) {
const char *begin = static_cast<const char*>(data);
@@ -56,6 +64,9 @@ struct TEventData {
ui64 Cookie;
NWilson::TTraceId TraceId;
ui32 Checksum;
+#if IC_FORCE_HARDENED_PACKET_CHECKS
+ ui32 Len;
+#endif
};
#pragma pack(push, 1)
@@ -68,6 +79,9 @@ struct TEventDescr2 {
ui64 Cookie;
NWilson::TTraceId::TSerializedTraceId TraceId;
ui32 Checksum;
+#if IC_FORCE_HARDENED_PACKET_CHECKS
+ ui32 Len;
+#endif
};
#pragma pack(pop)