aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/event_pb.cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-06-24 10:47:13 +0300
committeralexvru <alexvru@ydb.tech>2023-06-24 10:47:13 +0300
commit62b675742156dd4471435b3da5dbca87065a2d0f (patch)
tree1a954e6011ab1f165f8f36a31f881894c442f0e9 /library/cpp/actors/core/event_pb.cpp
parent967e2cdf8e721697dad301820fd5959bba2755e5 (diff)
downloadydb-62b675742156dd4471435b3da5dbca87065a2d0f.tar.gz
Fix coroutine event serializer
Diffstat (limited to 'library/cpp/actors/core/event_pb.cpp')
-rw-r--r--library/cpp/actors/core/event_pb.cpp59
1 files changed, 23 insertions, 36 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index f5a50cba83..4c341d0c09 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -46,62 +46,47 @@ namespace NActors {
return true;
}
- bool TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
+ void TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
Y_VERIFY(size <= SizeRemain);
SizeRemain -= size;
TotalSerializedDataSize += size;
- if (NumChunks) {
- auto& last = Chunks[NumChunks - 1];
+ if (!Chunks.empty()) {
+ auto& last = Chunks.back();
if (last.first + last.second == data) {
last.second += size; // just extend the last buffer
- return true;
+ return;
}
}
- if (NumChunks == MaxChunks) {
- InnerContext.SwitchTo(BufFeedContext);
- if (CancelFlag || AbortFlag) {
- return false;
- }
- }
-
- Y_VERIFY(NumChunks < MaxChunks);
- Chunks[NumChunks++] = {static_cast<const char*>(data), size};
- return true;
+ Chunks.emplace_back(static_cast<const char*>(data), size);
}
bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) {
Y_VERIFY(size >= 0);
while (size) {
- if (CancelFlag || AbortFlag) {
- return false;
- } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
+ if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
+ const void *produce = data;
if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 &&
- (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) {
+ (Chunks.empty() || data != Chunks.back().first + Chunks.back().second)) {
memcpy(BufferPtr, data, bytesToAppend);
- if (!Produce(BufferPtr, bytesToAppend)) {
- return false;
- }
+ produce = BufferPtr;
BufferPtr += bytesToAppend;
- } else {
- if (!Produce(data, bytesToAppend)) {
- return false;
- }
}
+ Produce(produce, bytesToAppend);
data = static_cast<const char*>(data) + bytesToAppend;
size -= bytesToAppend;
} else {
InnerContext.SwitchTo(BufFeedContext);
+ if (CancelFlag || AbortFlag) {
+ return false;
+ }
}
}
return true;
}
bool TCoroutineChunkSerializer::Next(void** data, int* size) {
- if (CancelFlag || AbortFlag) {
- return false;
- }
if (!SizeRemain) {
InnerContext.SwitchTo(BufFeedContext);
if (CancelFlag || AbortFlag) {
@@ -112,7 +97,8 @@ namespace NActors {
*data = BufferPtr;
*size = SizeRemain;
BufferPtr += SizeRemain;
- return Produce(*data, *size);
+ Produce(*data, *size);
+ return true;
}
void TCoroutineChunkSerializer::BackUp(int count) {
@@ -120,13 +106,14 @@ namespace NActors {
return;
}
Y_VERIFY(count > 0);
- Y_VERIFY(NumChunks);
- TChunk& buf = Chunks[NumChunks - 1];
+ Y_VERIFY(!Chunks.empty());
+ TChunk& buf = Chunks.back();
Y_VERIFY((size_t)count <= buf.second);
- Y_VERIFY(buf.first + buf.second == BufferPtr);
+ Y_VERIFY(buf.first + buf.second == BufferPtr, "buf# %p:%zu BufferPtr# %p SizeRemain# %zu NumChunks# %zu",
+ buf.first, buf.second, BufferPtr, SizeRemain, Chunks.size());
buf.second -= count;
if (!buf.second) {
- --NumChunks;
+ Chunks.pop_back();
}
BufferPtr -= count;
SizeRemain += count;
@@ -153,7 +140,7 @@ namespace NActors {
return WriteAliasedRaw(s->data(), s->length());
}
- std::pair<TCoroutineChunkSerializer::TChunk*, TCoroutineChunkSerializer::TChunk*> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
+ std::span<TCoroutineChunkSerializer::TChunk> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
// fill in base params
BufferPtr = static_cast<char*>(data);
SizeRemain = size;
@@ -161,10 +148,10 @@ namespace NActors {
// transfer control to the coroutine
Y_VERIFY(Event);
- NumChunks = 0;
+ Chunks.clear();
Resume();
- return {Chunks, Chunks + NumChunks};
+ return Chunks;
}
void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {