diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-17 17:10:48 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-17 17:10:48 +0300 |
commit | 22e27b93d470d33788f31ba8c357afc64e0f1b76 (patch) | |
tree | e7d460f1f3e152d91dbe43f60b858083b18b2392 /library/cpp | |
parent | 4a42af3f3b446ff6e05ca4503c96fcd1b501ebef (diff) | |
download | ydb-22e27b93d470d33788f31ba8c357afc64e0f1b76.tar.gz |
Improve IC packet slicing
Diffstat (limited to 'library/cpp')
4 files changed, 21 insertions, 19 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 285709a00cf..ea6a5310d4e 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -46,6 +46,8 @@ namespace NActors { TDuration MessagePendingTimeout = TDuration::Seconds(1); // timeout for which messages are queued while in PendingConnection state ui64 MessagePendingSize = Max<ui64>(); // size of the queue ui32 MaxSerializedEventSize = NActors::EventMaxByteSize; + ui32 PreallocatedBufferSize = 8 << 10; // 8 KB + ui32 NumPreallocatedBuffers = 16; ui32 GetSendBufferSize() const { ui32 res = 512 * 1024; // 512 kb is the default value for send buffer diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 6c80ddd2959..5a4df9d7365 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -368,13 +368,15 @@ namespace NActors { bool TInputSessionTCP::ReadMore() { PreallocateBuffers(); - TStackVec<TIoVec, NumPreallocatedBuffers> buffs; + TStackVec<TIoVec, 16> buffs; + size_t offset = FirstBufferOffset; for (const auto& item : Buffers) { - TIoVec iov{item->GetBuffer(), item->GetCapacity()}; + TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset}; buffs.push_back(iov); if (Params.Encryption) { break; // do not put more than one buffer in queue to prevent using ReadV } + offset = 0; } const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data()); @@ -421,15 +423,19 @@ namespace NActors { Y_VERIFY(recvres > 0); Metrics->AddTotalBytesRead(recvres); - TDeque<TIntrusivePtr<TRopeAlignedBuffer>>::iterator it; - for (it = Buffers.begin(); recvres; ++it) { - Y_VERIFY(it != Buffers.end()); - const size_t bytesFromFrontBuffer = Min<size_t>(recvres, (*it)->GetCapacity()); - (*it)->AdjustSize(bytesFromFrontBuffer); - IncomingData.Insert(IncomingData.End(), TRope(std::move(*it))); - recvres -= bytesFromFrontBuffer; + + while (recvres) { + Y_VERIFY(!Buffers.empty()); + auto& buffer = Buffers.front(); + const size_t bytes = Min<size_t>(recvres, buffer->GetCapacity() - FirstBufferOffset); + IncomingData.Insert(IncomingData.End(), TContiguousData{buffer, {buffer->GetBuffer() + FirstBufferOffset, bytes}}); + recvres -= bytes; + FirstBufferOffset += bytes; + if (FirstBufferOffset == buffer->GetCapacity()) { + Buffers.pop_front(); + FirstBufferOffset = 0; + } } - Buffers.erase(Buffers.begin(), it); LastReceiveTimestamp = TActivationContext::Now(); @@ -439,9 +445,8 @@ namespace NActors { void TInputSessionTCP::PreallocateBuffers() { // ensure that we have exactly "numBuffers" in queue LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) { - const ui32 target = Params.Encryption ? 1 : NumPreallocatedBuffers; - while (Buffers.size() < target) { - Buffers.emplace_back(TRopeAlignedBuffer::Allocate(sizeof(TTcpPacketBuf))); + while (Buffers.size() < Common->Settings.NumPreallocatedBuffers) { + Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize)); } } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 9933bd489ed..51c5bfa453f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -257,8 +257,8 @@ namespace NActors { void PassAway() override; TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers; + size_t FirstBufferOffset = 0; - static constexpr size_t NumPreallocatedBuffers = 16; void PreallocateBuffers(); inline ui64 GetMaxCyclesPerEvent() const { diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h index 6ac4aa13b37..2b2b2df19d1 100644 --- a/library/cpp/actors/util/rope.h +++ b/library/cpp/actors/util/rope.h @@ -73,11 +73,6 @@ public: char *GetBuffer() { return Data + Offset; } - - void AdjustSize(size_t size) { - Y_VERIFY(size <= Capacity); - Size = size; - } }; namespace NRopeDetails { |