aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-05-04 18:26:27 +0300
committeralexvru <alexvru@ydb.tech>2023-05-04 18:26:27 +0300
commit82951621d9c50b5360d435b26f44d38adfd03f0d (patch)
tree501ad80b2c6e500985329392f4376cd829d61d64 /library/cpp
parent4e5d5c7da7468ccd5fbd7f7a7af5146939839635 (diff)
downloadydb-82951621d9c50b5360d435b26f44d38adfd03f0d.tar.gz
Improve IC
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp13
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp58
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h9
-rw-r--r--library/cpp/actors/interconnect/packet.h61
5 files changed, 100 insertions, 47 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 374d11935e..3b3672b48e 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -203,7 +203,7 @@ namespace NActors {
}
}
} else if (event.Buffer) {
- while (const size_t numb = Min(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
+ while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
Iter.ContiguousSize())) {
const char *obuf = Iter.ContiguousData();
addChunk(obuf, numb, true);
@@ -257,6 +257,9 @@ namespace NActors {
return false;
}
+ // clear external checksum for this chunk
+ task.ExternalChecksum = 0;
+
auto partBookmark = task.Bookmark(partSize);
size_t bytesSerialized = 0;
@@ -274,12 +277,8 @@ namespace NActors {
*ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA);
*reinterpret_cast<ui16*>(ptr) = bytesSerialized;
ptr += sizeof(ui16);
- if (!Params.Encryption) {
- ui32 checksum = 0;
- task.XdcStream.ScanLastBytes(bytesSerialized, [&](TContiguousSpan span) {
- checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size());
- });
- *reinterpret_cast<ui32*>(ptr) = checksum;
+ if (task.Checksumming()) {
+ *reinterpret_cast<ui32*>(ptr) = task.ExternalChecksum;
}
task.WriteBookmark(std::move(partBookmark), buffer, partSize);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 43253a2c5f..961bf7caff 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -670,21 +670,26 @@ namespace NActors {
return recvres;
}
+ constexpr ui64 GetUsageCountClearMask(size_t items, int bits) {
+ ui64 mask = 0;
+ for (size_t i = 0; i < items; ++i) {
+ mask |= ui64(1 << bits - 2) << i * bits;
+ }
+ return mask;
+ }
+
bool TInputSessionTCP::ReadMore() {
PreallocateBuffers();
TStackVec<TIoVec, MaxBuffers> buffs;
- size_t offset = FirstBufferOffset;
- for (const auto& item : Buffers) {
- TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset};
- buffs.push_back(iov);
+ for (auto& item : Buffers) {
+ buffs.push_back({item.GetDataMut(), item.size()});
if (Params.Encryption) {
break; // do not put more than one buffer in queue to prevent using ReadV
}
#ifdef _win_
break; // do the same thing for Windows build
#endif
- offset = 0;
}
ssize_t recvres = Read(*Socket, PollerToken, &Context->MainReadPending, buffs.data(), buffs.size());
@@ -701,27 +706,46 @@ namespace NActors {
while (recvres) {
Y_VERIFY(!Buffers.empty());
auto& buffer = Buffers.front();
- const size_t bytes = Min<size_t>(recvres, buffer->GetCapacity() - FirstBufferOffset);
- IncomingData.Insert(IncomingData.End(), TRcBuf{buffer, {buffer->GetBuffer() + FirstBufferOffset, bytes}});
+ const size_t bytes = Min<size_t>(recvres, buffer.size());
recvres -= bytes;
- FirstBufferOffset += bytes;
- if (FirstBufferOffset == buffer->GetCapacity()) {
+ if (const size_t newSize = buffer.size() - bytes) {
+ IncomingData.Insert(IncomingData.End(), TRcBuf(TRcBuf::Piece, buffer.data(), bytes, buffer));
+ buffer.TrimFront(newSize);
+ } else {
+ IncomingData.Insert(IncomingData.End(), std::move(buffer));
Buffers.pop_front();
- FirstBufferOffset = 0;
}
++numBuffersCovered;
}
if (Buffers.empty()) { // we have read all the data, increase number of buffers
CurrentBuffers = Min(CurrentBuffers * 2, MaxBuffers);
- } else if (++UsageHisto[numBuffersCovered - 1] == 64) { // time to shift
- for (auto& value : UsageHisto) {
- value /= 2;
+ } else {
+ Y_VERIFY_DEBUG(numBuffersCovered);
+
+ const size_t index = numBuffersCovered - 1;
+
+ static constexpr ui64 itemMask = (1 << BitsPerUsageCount) - 1;
+
+ const size_t word = index / ItemsPerUsageCount;
+ const size_t offset = index % ItemsPerUsageCount * BitsPerUsageCount;
+
+ if ((UsageHisto[word] >> offset & itemMask) == itemMask) { // time to shift
+ for (ui64& w : UsageHisto) {
+ static constexpr ui64 mask = GetUsageCountClearMask(ItemsPerUsageCount, BitsPerUsageCount);
+ w = (w & mask) >> 1;
+ }
}
- while (CurrentBuffers > 1 && !UsageHisto[CurrentBuffers - 1]) {
- --CurrentBuffers;
+ UsageHisto[word] += 1 << offset;
+
+ while (CurrentBuffers > 1) {
+ const size_t index = CurrentBuffers - 1;
+ if (UsageHisto[index / ItemsPerUsageCount] >> index % ItemsPerUsageCount * BitsPerUsageCount & itemMask) {
+ break;
+ } else {
+ --CurrentBuffers;
+ }
}
- Y_VERIFY_DEBUG(UsageHisto[CurrentBuffers - 1]);
}
LastReceiveTimestamp = TActivationContext::Monotonic();
@@ -904,7 +928,7 @@ namespace NActors {
// ensure that we have exactly "numBuffers" in queue
LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) {
while (Buffers.size() < CurrentBuffers) {
- Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize));
+ Buffers.push_back(TRcBuf::Uninitialized(Common->Settings.PreallocatedBufferSize));
}
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 0ca9f2f136..3ee04be604 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -232,7 +232,7 @@ namespace NActors {
ReceiveContext->UnlockLastPacketSerialToConfirm();
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
- ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
+ ReceiverId = RegisterWithSameMailbox(actor.Release());
// register our socket in poller actor
LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
@@ -806,7 +806,7 @@ namespace NActors {
Y_VERIFY(outgoingStreamSizeAfter == outgoingStreamSizeBefore + packetSize &&
xdcStreamSizeAfter == xdcStreamSizeBefore + packet.GetExternalSize(),
"outgoingStreamSizeBefore# %zu outgoingStreamSizeAfter# %zu packetSize# %zu"
- " xdcStreamSizeBefore# %zu xdcStreamSizeAfter# %zu externalSize# %zu",
+ " xdcStreamSizeBefore# %zu xdcStreamSizeAfter# %zu externalSize# %" PRIu32,
outgoingStreamSizeBefore, outgoingStreamSizeAfter, packetSize,
xdcStreamSizeBefore, xdcStreamSizeAfter, packet.GetExternalSize());
#endif
@@ -819,7 +819,7 @@ namespace NActors {
});
}
- LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
+ LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %" PRIu32
" InflightDataAmount# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount);
// reset forced packet sending timestamp as we have confirmed all received data
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 92146f6591..7841f8a848 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -316,12 +316,13 @@ namespace NActors {
void PassAway() override;
- TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
- size_t FirstBufferOffset = 0;
+ TDeque<TRcBuf> Buffers;
size_t CurrentBuffers = 1; // number of buffers currently required to allocate
- static constexpr size_t MaxBuffers = 64; // maximum buffers possible
- std::array<ui8, MaxBuffers> UsageHisto; // read count histogram
+ static constexpr size_t MaxBuffers = 72; // maximum buffers possible
+ static constexpr int BitsPerUsageCount = 5;
+ static constexpr size_t ItemsPerUsageCount = sizeof(ui64) * CHAR_BIT / BitsPerUsageCount;
+ std::array<ui64, (MaxBuffers + ItemsPerUsageCount - 1) / ItemsPerUsageCount> UsageHisto; // read count histogram
void PreallocateBuffers();
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 2ef72b1c21..4e4cfb4b1d 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -138,8 +138,15 @@ struct TTcpPacketOutTask : TNonCopyable {
NInterconnect::TOutgoingStream& OutgoingStream;
NInterconnect::TOutgoingStream& XdcStream;
NInterconnect::TOutgoingStream::TBookmark HeaderBookmark;
- size_t InternalSize = 0;
- size_t ExternalSize = 0;
+ ui32 InternalSize = 0;
+ ui32 ExternalSize = 0;
+
+ ui32 PreBookmarkChecksum = 0;
+ ui32 InternalChecksum = 0;
+ ui32 InternalChecksumLen = 0;
+ bool InsideBookmark = false;
+
+ ui32 ExternalChecksum = 0;
TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream,
NInterconnect::TOutgoingStream& xdcStream)
@@ -151,6 +158,12 @@ struct TTcpPacketOutTask : TNonCopyable {
// Preallocate some space to fill it later.
NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) {
+ if (Checksumming()) {
+ Y_VERIFY_DEBUG(!InsideBookmark);
+ InsideBookmark = true;
+ PreBookmarkChecksum = std::exchange(InternalChecksum, 0);
+ InternalChecksumLen = 0;
+ }
Y_VERIFY_DEBUG(len <= GetInternalFreeAmount());
InternalSize += len;
return OutgoingStream.Bookmark(len);
@@ -158,6 +171,12 @@ struct TTcpPacketOutTask : TNonCopyable {
// Write previously bookmarked space.
void WriteBookmark(NInterconnect::TOutgoingStream::TBookmark&& bookmark, const void *buffer, size_t len) {
+ if (Checksumming()) {
+ Y_VERIFY_DEBUG(InsideBookmark);
+ InsideBookmark = false;
+ const ui32 bookmarkChecksum = Crc32cExtendMSanCompatible(PreBookmarkChecksum, buffer, len);
+ InternalChecksum = Crc32cCombine(bookmarkChecksum, InternalChecksum, InternalChecksumLen);
+ }
OutgoingStream.WriteBookmark(std::move(bookmark), {static_cast<const char*>(buffer), len});
}
@@ -177,6 +196,7 @@ struct TTcpPacketOutTask : TNonCopyable {
Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount()));
(External ? ExternalSize : InternalSize) += len;
(External ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len});
+ ProcessChecksum<External>(buffer, len);
}
// Write some data with copying.
@@ -185,6 +205,19 @@ struct TTcpPacketOutTask : TNonCopyable {
Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount()));
(External ? ExternalSize : InternalSize) += len;
(External ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len});
+ ProcessChecksum<External>(buffer, len);
+ }
+
+ template<bool External>
+ void ProcessChecksum(const void *buffer, size_t len) {
+ if (Checksumming()) {
+ if (External) {
+ ExternalChecksum = Crc32cExtendMSanCompatible(ExternalChecksum, buffer, len);
+ } else {
+ InternalChecksum = Crc32cExtendMSanCompatible(InternalChecksum, buffer, len);
+ InternalChecksumLen += len;
+ }
+ }
}
void Finish(ui64 serial, ui64 confirm) {
@@ -198,17 +231,13 @@ struct TTcpPacketOutTask : TNonCopyable {
};
if (Checksumming()) {
- // pre-write header without checksum for correct checksum calculation
- WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), &header, sizeof(header));
-
- ui32 checksum = 0;
- OutgoingStream.ScanLastBytes(GetPacketSize(), [&](TContiguousSpan span) {
- checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size());
- });
- header.Checksum = checksum;
+ Y_VERIFY_DEBUG(!InsideBookmark);
+ const ui32 headerChecksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header));
+ header.Checksum = Crc32cCombine(headerChecksum, InternalChecksum, InternalSize);
}
- WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header));
+ OutgoingStream.WriteBookmark(std::exchange(HeaderBookmark, {}), {reinterpret_cast<const char*>(&header),
+ sizeof(header)});
}
bool Checksumming() const {
@@ -216,11 +245,11 @@ struct TTcpPacketOutTask : TNonCopyable {
}
bool IsEmpty() const { return GetDataSize() == 0; }
- size_t GetDataSize() const { return InternalSize + ExternalSize; }
- size_t GetPacketSize() const { return sizeof(TTcpPacketHeader_v2) + InternalSize; }
- size_t GetInternalFreeAmount() const { return TTcpPacketBuf::PacketDataLen - InternalSize; }
- size_t GetExternalFreeAmount() const { return 16384 - ExternalSize; }
- size_t GetExternalSize() const { return ExternalSize; }
+ ui32 GetDataSize() const { return InternalSize + ExternalSize; }
+ ui32 GetPacketSize() const { return sizeof(TTcpPacketHeader_v2) + InternalSize; }
+ ui32 GetInternalFreeAmount() const { return TTcpPacketBuf::PacketDataLen - InternalSize; }
+ ui32 GetExternalFreeAmount() const { return 16384 - ExternalSize; }
+ ui32 GetExternalSize() const { return ExternalSize; }
};
namespace NInterconnect::NDetail {