diff options
author | alexvru <alexvru@ydb.tech> | 2023-06-24 10:47:13 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-06-24 10:47:13 +0300 |
commit | 62b675742156dd4471435b3da5dbca87065a2d0f (patch) | |
tree | 1a954e6011ab1f165f8f36a31f881894c442f0e9 /library/cpp/actors | |
parent | 967e2cdf8e721697dad301820fd5959bba2755e5 (diff) | |
download | ydb-62b675742156dd4471435b3da5dbca87065a2d0f.tar.gz |
Fix coroutine event serializer
Diffstat (limited to 'library/cpp/actors')
-rw-r--r-- | library/cpp/actors/core/event_pb.cpp | 59 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb.h | 9 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb_payload_ut.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb_ut.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 8 |
5 files changed, 36 insertions, 48 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index f5a50cba83..4c341d0c09 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -46,62 +46,47 @@ namespace NActors { return true; } - bool TCoroutineChunkSerializer::Produce(const void *data, size_t size) { + void TCoroutineChunkSerializer::Produce(const void *data, size_t size) { Y_VERIFY(size <= SizeRemain); SizeRemain -= size; TotalSerializedDataSize += size; - if (NumChunks) { - auto& last = Chunks[NumChunks - 1]; + if (!Chunks.empty()) { + auto& last = Chunks.back(); if (last.first + last.second == data) { last.second += size; // just extend the last buffer - return true; + return; } } - if (NumChunks == MaxChunks) { - InnerContext.SwitchTo(BufFeedContext); - if (CancelFlag || AbortFlag) { - return false; - } - } - - Y_VERIFY(NumChunks < MaxChunks); - Chunks[NumChunks++] = {static_cast<const char*>(data), size}; - return true; + Chunks.emplace_back(static_cast<const char*>(data), size); } bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) { Y_VERIFY(size >= 0); while (size) { - if (CancelFlag || AbortFlag) { - return false; - } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { + if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { + const void *produce = data; if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 && - (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) { + (Chunks.empty() || data != Chunks.back().first + Chunks.back().second)) { memcpy(BufferPtr, data, bytesToAppend); - if (!Produce(BufferPtr, bytesToAppend)) { - return false; - } + produce = BufferPtr; BufferPtr += bytesToAppend; - } else { - if (!Produce(data, bytesToAppend)) { - return false; - } } + Produce(produce, bytesToAppend); data = static_cast<const char*>(data) + bytesToAppend; size -= bytesToAppend; } else { InnerContext.SwitchTo(BufFeedContext); + if (CancelFlag || AbortFlag) { + return false; + } } } return true; } bool TCoroutineChunkSerializer::Next(void** data, int* size) { - if (CancelFlag || AbortFlag) { - return false; - } if (!SizeRemain) { InnerContext.SwitchTo(BufFeedContext); if (CancelFlag || AbortFlag) { @@ -112,7 +97,8 @@ namespace NActors { *data = BufferPtr; *size = SizeRemain; BufferPtr += SizeRemain; - return Produce(*data, *size); + Produce(*data, *size); + return true; } void TCoroutineChunkSerializer::BackUp(int count) { @@ -120,13 +106,14 @@ namespace NActors { return; } Y_VERIFY(count > 0); - Y_VERIFY(NumChunks); - TChunk& buf = Chunks[NumChunks - 1]; + Y_VERIFY(!Chunks.empty()); + TChunk& buf = Chunks.back(); Y_VERIFY((size_t)count <= buf.second); - Y_VERIFY(buf.first + buf.second == BufferPtr); + Y_VERIFY(buf.first + buf.second == BufferPtr, "buf# %p:%zu BufferPtr# %p SizeRemain# %zu NumChunks# %zu", + buf.first, buf.second, BufferPtr, SizeRemain, Chunks.size()); buf.second -= count; if (!buf.second) { - --NumChunks; + Chunks.pop_back(); } BufferPtr -= count; SizeRemain += count; @@ -153,7 +140,7 @@ namespace NActors { return WriteAliasedRaw(s->data(), s->length()); } - std::pair<TCoroutineChunkSerializer::TChunk*, TCoroutineChunkSerializer::TChunk*> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) { + std::span<TCoroutineChunkSerializer::TChunk> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) { // fill in base params BufferPtr = static_cast<char*>(data); SizeRemain = size; @@ -161,10 +148,10 @@ namespace NActors { // transfer control to the coroutine Y_VERIFY(Event); - NumChunks = 0; + Chunks.clear(); Resume(); - return {Chunks, Chunks + NumChunks}; + return Chunks; } void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) { diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 30ffb278af..3543af4901 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -12,6 +12,7 @@ #include <util/string/builder.h> #include <util/thread/lfstack.h> #include <array> +#include <span> namespace NActors { @@ -77,7 +78,7 @@ namespace NActors { void SetSerializingEvent(const IEventBase *event); void Abort(); - std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size); + std::span<TChunk> FeedBuf(void* data, size_t size); bool IsComplete() const { return !Event; } @@ -102,7 +103,7 @@ namespace NActors { protected: void DoRun() override; void Resume(); - bool Produce(const void *data, size_t size); + void Produce(const void *data, size_t size); i64 TotalSerializedDataSize; TMappedAllocation Stack; @@ -111,9 +112,7 @@ namespace NActors { TContMachineContext *BufFeedContext = nullptr; char *BufferPtr; size_t SizeRemain; - static constexpr size_t MaxChunks = 16; - TChunk Chunks[MaxChunks]; - size_t NumChunks = 0; + std::vector<TChunk> Chunks; const IEventBase *Event = nullptr; bool CancelFlag = false; bool AbortFlag; diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index 3f6d17b058..c75169db44 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -60,8 +60,8 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { while (!chunker.IsComplete()) { char buffer[4096]; auto range = chunker.FeedBuf(buffer, sizeof(buffer)); - for (auto p = range.first; p != range.second; ++p) { - chunkerRes += TString(p->first, p->second); + for (auto [data, size] : range) { + chunkerRes += TString(data, size); } } UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser); diff --git a/library/cpp/actors/core/event_pb_ut.cpp b/library/cpp/actors/core/event_pb_ut.cpp index a16c3092b3..0dfd173651 100644 --- a/library/cpp/actors/core/event_pb_ut.cpp +++ b/library/cpp/actors/core/event_pb_ut.cpp @@ -61,8 +61,8 @@ Y_UNIT_TEST_SUITE(TEventSerialization) { TString bmChunkedSerialized; while (!chunker.IsComplete()) { auto range = chunker.FeedBuf(&buf1[0], sizeof(buf1)); - for (auto p = range.first; p != range.second; ++p) { - bmChunkedSerialized.append(p->first, p->second); + for (auto [data, size] : range) { + bmChunkedSerialized.append(data, size); } } UNIT_ASSERT_EQUAL(bmSerialized, bmChunkedSerialized); diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a5b02201a0..e69483a8e9 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -193,9 +193,11 @@ namespace NActors { if (!out.size()) { break; } - const auto [first, last] = Chunker.FeedBuf(out.data(), out.size()); - for (auto p = first; p != last; ++p) { - addChunk(p->first, p->second, false); + ui32 bytesFed = 0; + for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) { + addChunk(buffer, size, false); + bytesFed += size; + Y_VERIFY(bytesFed <= out.size()); } complete = Chunker.IsComplete(); if (complete) { |