diff options
author | alexvru <alexvru@ydb.tech> | 2023-06-24 10:47:13 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-06-24 10:47:13 +0300 |
commit | 62b675742156dd4471435b3da5dbca87065a2d0f (patch) | |
tree | 1a954e6011ab1f165f8f36a31f881894c442f0e9 /library/cpp/actors/core/event_pb.cpp | |
parent | 967e2cdf8e721697dad301820fd5959bba2755e5 (diff) | |
download | ydb-62b675742156dd4471435b3da5dbca87065a2d0f.tar.gz |
Fix coroutine event serializer
Diffstat (limited to 'library/cpp/actors/core/event_pb.cpp')
-rw-r--r-- | library/cpp/actors/core/event_pb.cpp | 59 |
1 files changed, 23 insertions, 36 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index f5a50cba83..4c341d0c09 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -46,62 +46,47 @@ namespace NActors { return true; } - bool TCoroutineChunkSerializer::Produce(const void *data, size_t size) { + void TCoroutineChunkSerializer::Produce(const void *data, size_t size) { Y_VERIFY(size <= SizeRemain); SizeRemain -= size; TotalSerializedDataSize += size; - if (NumChunks) { - auto& last = Chunks[NumChunks - 1]; + if (!Chunks.empty()) { + auto& last = Chunks.back(); if (last.first + last.second == data) { last.second += size; // just extend the last buffer - return true; + return; } } - if (NumChunks == MaxChunks) { - InnerContext.SwitchTo(BufFeedContext); - if (CancelFlag || AbortFlag) { - return false; - } - } - - Y_VERIFY(NumChunks < MaxChunks); - Chunks[NumChunks++] = {static_cast<const char*>(data), size}; - return true; + Chunks.emplace_back(static_cast<const char*>(data), size); } bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) { Y_VERIFY(size >= 0); while (size) { - if (CancelFlag || AbortFlag) { - return false; - } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { + if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { + const void *produce = data; if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 && - (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) { + (Chunks.empty() || data != Chunks.back().first + Chunks.back().second)) { memcpy(BufferPtr, data, bytesToAppend); - if (!Produce(BufferPtr, bytesToAppend)) { - return false; - } + produce = BufferPtr; BufferPtr += bytesToAppend; - } else { - if (!Produce(data, bytesToAppend)) { - return false; - } } + Produce(produce, bytesToAppend); data = static_cast<const char*>(data) + bytesToAppend; size -= bytesToAppend; } else { InnerContext.SwitchTo(BufFeedContext); + if (CancelFlag || AbortFlag) { + return false; + } } } return true; } bool TCoroutineChunkSerializer::Next(void** data, int* size) { - if (CancelFlag || AbortFlag) { - return false; - } if (!SizeRemain) { InnerContext.SwitchTo(BufFeedContext); if (CancelFlag || AbortFlag) { @@ -112,7 +97,8 @@ namespace NActors { *data = BufferPtr; *size = SizeRemain; BufferPtr += SizeRemain; - return Produce(*data, *size); + Produce(*data, *size); + return true; } void TCoroutineChunkSerializer::BackUp(int count) { @@ -120,13 +106,14 @@ namespace NActors { return; } Y_VERIFY(count > 0); - Y_VERIFY(NumChunks); - TChunk& buf = Chunks[NumChunks - 1]; + Y_VERIFY(!Chunks.empty()); + TChunk& buf = Chunks.back(); Y_VERIFY((size_t)count <= buf.second); - Y_VERIFY(buf.first + buf.second == BufferPtr); + Y_VERIFY(buf.first + buf.second == BufferPtr, "buf# %p:%zu BufferPtr# %p SizeRemain# %zu NumChunks# %zu", + buf.first, buf.second, BufferPtr, SizeRemain, Chunks.size()); buf.second -= count; if (!buf.second) { - --NumChunks; + Chunks.pop_back(); } BufferPtr -= count; SizeRemain += count; @@ -153,7 +140,7 @@ namespace NActors { return WriteAliasedRaw(s->data(), s->length()); } - std::pair<TCoroutineChunkSerializer::TChunk*, TCoroutineChunkSerializer::TChunk*> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) { + std::span<TCoroutineChunkSerializer::TChunk> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) { // fill in base params BufferPtr = static_cast<char*>(data); SizeRemain = size; @@ -161,10 +148,10 @@ namespace NActors { // transfer control to the coroutine Y_VERIFY(Event); - NumChunks = 0; + Chunks.clear(); Resume(); - return {Chunks, Chunks + NumChunks}; + return Chunks; } void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) { |