diff options
author | alexvru <alexvru@ydb.tech> | 2023-05-04 18:26:27 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-05-04 18:26:27 +0300 |
commit | 82951621d9c50b5360d435b26f44d38adfd03f0d (patch) | |
tree | 501ad80b2c6e500985329392f4376cd829d61d64 /library/cpp | |
parent | 4e5d5c7da7468ccd5fbd7f7a7af5146939839635 (diff) | |
download | ydb-82951621d9c50b5360d435b26f44d38adfd03f0d.tar.gz |
Improve IC
Diffstat (limited to 'library/cpp')
5 files changed, 100 insertions, 47 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 374d11935e..3b3672b48e 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -203,7 +203,7 @@ namespace NActors { } } } else if (event.Buffer) { - while (const size_t numb = Min(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), + while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), Iter.ContiguousSize())) { const char *obuf = Iter.ContiguousData(); addChunk(obuf, numb, true); @@ -257,6 +257,9 @@ namespace NActors { return false; } + // clear external checksum for this chunk + task.ExternalChecksum = 0; + auto partBookmark = task.Bookmark(partSize); size_t bytesSerialized = 0; @@ -274,12 +277,8 @@ namespace NActors { *ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA); *reinterpret_cast<ui16*>(ptr) = bytesSerialized; ptr += sizeof(ui16); - if (!Params.Encryption) { - ui32 checksum = 0; - task.XdcStream.ScanLastBytes(bytesSerialized, [&](TContiguousSpan span) { - checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size()); - }); - *reinterpret_cast<ui32*>(ptr) = checksum; + if (task.Checksumming()) { + *reinterpret_cast<ui32*>(ptr) = task.ExternalChecksum; } task.WriteBookmark(std::move(partBookmark), buffer, partSize); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 43253a2c5f..961bf7caff 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -670,21 +670,26 @@ namespace NActors { return recvres; } + constexpr ui64 GetUsageCountClearMask(size_t items, int bits) { + ui64 mask = 0; + for (size_t i = 0; i < items; ++i) { + mask |= ui64(1 << bits - 2) << i * bits; + } + return mask; + } + bool TInputSessionTCP::ReadMore() { PreallocateBuffers(); TStackVec<TIoVec, MaxBuffers> buffs; - size_t offset = FirstBufferOffset; - for (const auto& item : Buffers) { - TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset}; - buffs.push_back(iov); + for (auto& item : Buffers) { + buffs.push_back({item.GetDataMut(), item.size()}); if (Params.Encryption) { break; // do not put more than one buffer in queue to prevent using ReadV } #ifdef _win_ break; // do the same thing for Windows build #endif - offset = 0; } ssize_t recvres = Read(*Socket, PollerToken, &Context->MainReadPending, buffs.data(), buffs.size()); @@ -701,27 +706,46 @@ namespace NActors { while (recvres) { Y_VERIFY(!Buffers.empty()); auto& buffer = Buffers.front(); - const size_t bytes = Min<size_t>(recvres, buffer->GetCapacity() - FirstBufferOffset); - IncomingData.Insert(IncomingData.End(), TRcBuf{buffer, {buffer->GetBuffer() + FirstBufferOffset, bytes}}); + const size_t bytes = Min<size_t>(recvres, buffer.size()); recvres -= bytes; - FirstBufferOffset += bytes; - if (FirstBufferOffset == buffer->GetCapacity()) { + if (const size_t newSize = buffer.size() - bytes) { + IncomingData.Insert(IncomingData.End(), TRcBuf(TRcBuf::Piece, buffer.data(), bytes, buffer)); + buffer.TrimFront(newSize); + } else { + IncomingData.Insert(IncomingData.End(), std::move(buffer)); Buffers.pop_front(); - FirstBufferOffset = 0; } ++numBuffersCovered; } if (Buffers.empty()) { // we have read all the data, increase number of buffers CurrentBuffers = Min(CurrentBuffers * 2, MaxBuffers); - } else if (++UsageHisto[numBuffersCovered - 1] == 64) { // time to shift - for (auto& value : UsageHisto) { - value /= 2; + } else { + Y_VERIFY_DEBUG(numBuffersCovered); + + const size_t index = numBuffersCovered - 1; + + static constexpr ui64 itemMask = (1 << BitsPerUsageCount) - 1; + + const size_t word = index / ItemsPerUsageCount; + const size_t offset = index % ItemsPerUsageCount * BitsPerUsageCount; + + if ((UsageHisto[word] >> offset & itemMask) == itemMask) { // time to shift + for (ui64& w : UsageHisto) { + static constexpr ui64 mask = GetUsageCountClearMask(ItemsPerUsageCount, BitsPerUsageCount); + w = (w & mask) >> 1; + } } - while (CurrentBuffers > 1 && !UsageHisto[CurrentBuffers - 1]) { - --CurrentBuffers; + UsageHisto[word] += 1 << offset; + + while (CurrentBuffers > 1) { + const size_t index = CurrentBuffers - 1; + if (UsageHisto[index / ItemsPerUsageCount] >> index % ItemsPerUsageCount * BitsPerUsageCount & itemMask) { + break; + } else { + --CurrentBuffers; + } } - Y_VERIFY_DEBUG(UsageHisto[CurrentBuffers - 1]); } LastReceiveTimestamp = TActivationContext::Monotonic(); @@ -904,7 +928,7 @@ namespace NActors { // ensure that we have exactly "numBuffers" in queue LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) { while (Buffers.size() < CurrentBuffers) { - Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize)); + Buffers.push_back(TRcBuf::Uninitialized(Common->Settings.PreallocatedBufferSize)); } } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 0ca9f2f136..3ee04be604 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -232,7 +232,7 @@ namespace NActors { ReceiveContext->UnlockLastPacketSerialToConfirm(); auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); - ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); + ReceiverId = RegisterWithSameMailbox(actor.Release()); // register our socket in poller actor LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); @@ -806,7 +806,7 @@ namespace NActors { Y_VERIFY(outgoingStreamSizeAfter == outgoingStreamSizeBefore + packetSize && xdcStreamSizeAfter == xdcStreamSizeBefore + packet.GetExternalSize(), "outgoingStreamSizeBefore# %zu outgoingStreamSizeAfter# %zu packetSize# %zu" - " xdcStreamSizeBefore# %zu xdcStreamSizeAfter# %zu externalSize# %zu", + " xdcStreamSizeBefore# %zu xdcStreamSizeAfter# %zu externalSize# %" PRIu32, outgoingStreamSizeBefore, outgoingStreamSizeAfter, packetSize, xdcStreamSizeBefore, xdcStreamSizeAfter, packet.GetExternalSize()); #endif @@ -819,7 +819,7 @@ namespace NActors { }); } - LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" + LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %" PRIu32 " InflightDataAmount# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount); // reset forced packet sending timestamp as we have confirmed all received data diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 92146f6591..7841f8a848 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -316,12 +316,13 @@ namespace NActors { void PassAway() override; - TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers; - size_t FirstBufferOffset = 0; + TDeque<TRcBuf> Buffers; size_t CurrentBuffers = 1; // number of buffers currently required to allocate - static constexpr size_t MaxBuffers = 64; // maximum buffers possible - std::array<ui8, MaxBuffers> UsageHisto; // read count histogram + static constexpr size_t MaxBuffers = 72; // maximum buffers possible + static constexpr int BitsPerUsageCount = 5; + static constexpr size_t ItemsPerUsageCount = sizeof(ui64) * CHAR_BIT / BitsPerUsageCount; + std::array<ui64, (MaxBuffers + ItemsPerUsageCount - 1) / ItemsPerUsageCount> UsageHisto; // read count histogram void PreallocateBuffers(); diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 2ef72b1c21..4e4cfb4b1d 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -138,8 +138,15 @@ struct TTcpPacketOutTask : TNonCopyable { NInterconnect::TOutgoingStream& OutgoingStream; NInterconnect::TOutgoingStream& XdcStream; NInterconnect::TOutgoingStream::TBookmark HeaderBookmark; - size_t InternalSize = 0; - size_t ExternalSize = 0; + ui32 InternalSize = 0; + ui32 ExternalSize = 0; + + ui32 PreBookmarkChecksum = 0; + ui32 InternalChecksum = 0; + ui32 InternalChecksumLen = 0; + bool InsideBookmark = false; + + ui32 ExternalChecksum = 0; TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream, NInterconnect::TOutgoingStream& xdcStream) @@ -151,6 +158,12 @@ struct TTcpPacketOutTask : TNonCopyable { // Preallocate some space to fill it later. NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) { + if (Checksumming()) { + Y_VERIFY_DEBUG(!InsideBookmark); + InsideBookmark = true; + PreBookmarkChecksum = std::exchange(InternalChecksum, 0); + InternalChecksumLen = 0; + } Y_VERIFY_DEBUG(len <= GetInternalFreeAmount()); InternalSize += len; return OutgoingStream.Bookmark(len); @@ -158,6 +171,12 @@ struct TTcpPacketOutTask : TNonCopyable { // Write previously bookmarked space. void WriteBookmark(NInterconnect::TOutgoingStream::TBookmark&& bookmark, const void *buffer, size_t len) { + if (Checksumming()) { + Y_VERIFY_DEBUG(InsideBookmark); + InsideBookmark = false; + const ui32 bookmarkChecksum = Crc32cExtendMSanCompatible(PreBookmarkChecksum, buffer, len); + InternalChecksum = Crc32cCombine(bookmarkChecksum, InternalChecksum, InternalChecksumLen); + } OutgoingStream.WriteBookmark(std::move(bookmark), {static_cast<const char*>(buffer), len}); } @@ -177,6 +196,7 @@ struct TTcpPacketOutTask : TNonCopyable { Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount())); (External ? ExternalSize : InternalSize) += len; (External ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len}); + ProcessChecksum<External>(buffer, len); } // Write some data with copying. @@ -185,6 +205,19 @@ struct TTcpPacketOutTask : TNonCopyable { Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount())); (External ? ExternalSize : InternalSize) += len; (External ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len}); + ProcessChecksum<External>(buffer, len); + } + + template<bool External> + void ProcessChecksum(const void *buffer, size_t len) { + if (Checksumming()) { + if (External) { + ExternalChecksum = Crc32cExtendMSanCompatible(ExternalChecksum, buffer, len); + } else { + InternalChecksum = Crc32cExtendMSanCompatible(InternalChecksum, buffer, len); + InternalChecksumLen += len; + } + } } void Finish(ui64 serial, ui64 confirm) { @@ -198,17 +231,13 @@ struct TTcpPacketOutTask : TNonCopyable { }; if (Checksumming()) { - // pre-write header without checksum for correct checksum calculation - WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), &header, sizeof(header)); - - ui32 checksum = 0; - OutgoingStream.ScanLastBytes(GetPacketSize(), [&](TContiguousSpan span) { - checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size()); - }); - header.Checksum = checksum; + Y_VERIFY_DEBUG(!InsideBookmark); + const ui32 headerChecksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); + header.Checksum = Crc32cCombine(headerChecksum, InternalChecksum, InternalSize); } - WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header)); + OutgoingStream.WriteBookmark(std::exchange(HeaderBookmark, {}), {reinterpret_cast<const char*>(&header), + sizeof(header)}); } bool Checksumming() const { @@ -216,11 +245,11 @@ struct TTcpPacketOutTask : TNonCopyable { } bool IsEmpty() const { return GetDataSize() == 0; } - size_t GetDataSize() const { return InternalSize + ExternalSize; } - size_t GetPacketSize() const { return sizeof(TTcpPacketHeader_v2) + InternalSize; } - size_t GetInternalFreeAmount() const { return TTcpPacketBuf::PacketDataLen - InternalSize; } - size_t GetExternalFreeAmount() const { return 16384 - ExternalSize; } - size_t GetExternalSize() const { return ExternalSize; } + ui32 GetDataSize() const { return InternalSize + ExternalSize; } + ui32 GetPacketSize() const { return sizeof(TTcpPacketHeader_v2) + InternalSize; } + ui32 GetInternalFreeAmount() const { return TTcpPacketBuf::PacketDataLen - InternalSize; } + ui32 GetExternalFreeAmount() const { return 16384 - ExternalSize; } + ui32 GetExternalSize() const { return ExternalSize; } }; namespace NInterconnect::NDetail { |