aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-17 17:10:48 +0300
committeralexvru <alexvru@ydb.tech>2022-11-17 17:10:48 +0300
commit22e27b93d470d33788f31ba8c357afc64e0f1b76 (patch)
treee7d460f1f3e152d91dbe43f60b858083b18b2392 /library/cpp
parent4a42af3f3b446ff6e05ca4503c96fcd1b501ebef (diff)
downloadydb-22e27b93d470d33788f31ba8c357afc64e0f1b76.tar.gz
Improve IC packet slicing
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp31
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h2
-rw-r--r--library/cpp/actors/util/rope.h5
4 files changed, 21 insertions, 19 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 285709a00cf..ea6a5310d4e 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -46,6 +46,8 @@ namespace NActors {
TDuration MessagePendingTimeout = TDuration::Seconds(1); // timeout for which messages are queued while in PendingConnection state
ui64 MessagePendingSize = Max<ui64>(); // size of the queue
ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
+ ui32 PreallocatedBufferSize = 8 << 10; // 8 KB
+ ui32 NumPreallocatedBuffers = 16;
ui32 GetSendBufferSize() const {
ui32 res = 512 * 1024; // 512 kb is the default value for send buffer
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 6c80ddd2959..5a4df9d7365 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -368,13 +368,15 @@ namespace NActors {
bool TInputSessionTCP::ReadMore() {
PreallocateBuffers();
- TStackVec<TIoVec, NumPreallocatedBuffers> buffs;
+ TStackVec<TIoVec, 16> buffs;
+ size_t offset = FirstBufferOffset;
for (const auto& item : Buffers) {
- TIoVec iov{item->GetBuffer(), item->GetCapacity()};
+ TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset};
buffs.push_back(iov);
if (Params.Encryption) {
break; // do not put more than one buffer in queue to prevent using ReadV
}
+ offset = 0;
}
const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data());
@@ -421,15 +423,19 @@ namespace NActors {
Y_VERIFY(recvres > 0);
Metrics->AddTotalBytesRead(recvres);
- TDeque<TIntrusivePtr<TRopeAlignedBuffer>>::iterator it;
- for (it = Buffers.begin(); recvres; ++it) {
- Y_VERIFY(it != Buffers.end());
- const size_t bytesFromFrontBuffer = Min<size_t>(recvres, (*it)->GetCapacity());
- (*it)->AdjustSize(bytesFromFrontBuffer);
- IncomingData.Insert(IncomingData.End(), TRope(std::move(*it)));
- recvres -= bytesFromFrontBuffer;
+
+ while (recvres) {
+ Y_VERIFY(!Buffers.empty());
+ auto& buffer = Buffers.front();
+ const size_t bytes = Min<size_t>(recvres, buffer->GetCapacity() - FirstBufferOffset);
+ IncomingData.Insert(IncomingData.End(), TContiguousData{buffer, {buffer->GetBuffer() + FirstBufferOffset, bytes}});
+ recvres -= bytes;
+ FirstBufferOffset += bytes;
+ if (FirstBufferOffset == buffer->GetCapacity()) {
+ Buffers.pop_front();
+ FirstBufferOffset = 0;
+ }
}
- Buffers.erase(Buffers.begin(), it);
LastReceiveTimestamp = TActivationContext::Now();
@@ -439,9 +445,8 @@ namespace NActors {
void TInputSessionTCP::PreallocateBuffers() {
// ensure that we have exactly "numBuffers" in queue
LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) {
- const ui32 target = Params.Encryption ? 1 : NumPreallocatedBuffers;
- while (Buffers.size() < target) {
- Buffers.emplace_back(TRopeAlignedBuffer::Allocate(sizeof(TTcpPacketBuf)));
+ while (Buffers.size() < Common->Settings.NumPreallocatedBuffers) {
+ Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize));
}
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 9933bd489ed..51c5bfa453f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -257,8 +257,8 @@ namespace NActors {
void PassAway() override;
TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
+ size_t FirstBufferOffset = 0;
- static constexpr size_t NumPreallocatedBuffers = 16;
void PreallocateBuffers();
inline ui64 GetMaxCyclesPerEvent() const {
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h
index 6ac4aa13b37..2b2b2df19d1 100644
--- a/library/cpp/actors/util/rope.h
+++ b/library/cpp/actors/util/rope.h
@@ -73,11 +73,6 @@ public:
char *GetBuffer() {
return Data + Offset;
}
-
- void AdjustSize(size_t size) {
- Y_VERIFY(size <= Capacity);
- Size = size;
- }
};
namespace NRopeDetails {