aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-06-24 10:47:13 +0300
committeralexvru <alexvru@ydb.tech>2023-06-24 10:47:13 +0300
commit62b675742156dd4471435b3da5dbca87065a2d0f (patch)
tree1a954e6011ab1f165f8f36a31f881894c442f0e9 /library/cpp/actors
parent967e2cdf8e721697dad301820fd5959bba2755e5 (diff)
downloadydb-62b675742156dd4471435b3da5dbca87065a2d0f.tar.gz
Fix coroutine event serializer
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/event_pb.cpp59
-rw-r--r--library/cpp/actors/core/event_pb.h9
-rw-r--r--library/cpp/actors/core/event_pb_payload_ut.cpp4
-rw-r--r--library/cpp/actors/core/event_pb_ut.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp8
5 files changed, 36 insertions, 48 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index f5a50cba83..4c341d0c09 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -46,62 +46,47 @@ namespace NActors {
return true;
}
- bool TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
+ void TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
Y_VERIFY(size <= SizeRemain);
SizeRemain -= size;
TotalSerializedDataSize += size;
- if (NumChunks) {
- auto& last = Chunks[NumChunks - 1];
+ if (!Chunks.empty()) {
+ auto& last = Chunks.back();
if (last.first + last.second == data) {
last.second += size; // just extend the last buffer
- return true;
+ return;
}
}
- if (NumChunks == MaxChunks) {
- InnerContext.SwitchTo(BufFeedContext);
- if (CancelFlag || AbortFlag) {
- return false;
- }
- }
-
- Y_VERIFY(NumChunks < MaxChunks);
- Chunks[NumChunks++] = {static_cast<const char*>(data), size};
- return true;
+ Chunks.emplace_back(static_cast<const char*>(data), size);
}
bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) {
Y_VERIFY(size >= 0);
while (size) {
- if (CancelFlag || AbortFlag) {
- return false;
- } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
+ if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
+ const void *produce = data;
if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 &&
- (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) {
+ (Chunks.empty() || data != Chunks.back().first + Chunks.back().second)) {
memcpy(BufferPtr, data, bytesToAppend);
- if (!Produce(BufferPtr, bytesToAppend)) {
- return false;
- }
+ produce = BufferPtr;
BufferPtr += bytesToAppend;
- } else {
- if (!Produce(data, bytesToAppend)) {
- return false;
- }
}
+ Produce(produce, bytesToAppend);
data = static_cast<const char*>(data) + bytesToAppend;
size -= bytesToAppend;
} else {
InnerContext.SwitchTo(BufFeedContext);
+ if (CancelFlag || AbortFlag) {
+ return false;
+ }
}
}
return true;
}
bool TCoroutineChunkSerializer::Next(void** data, int* size) {
- if (CancelFlag || AbortFlag) {
- return false;
- }
if (!SizeRemain) {
InnerContext.SwitchTo(BufFeedContext);
if (CancelFlag || AbortFlag) {
@@ -112,7 +97,8 @@ namespace NActors {
*data = BufferPtr;
*size = SizeRemain;
BufferPtr += SizeRemain;
- return Produce(*data, *size);
+ Produce(*data, *size);
+ return true;
}
void TCoroutineChunkSerializer::BackUp(int count) {
@@ -120,13 +106,14 @@ namespace NActors {
return;
}
Y_VERIFY(count > 0);
- Y_VERIFY(NumChunks);
- TChunk& buf = Chunks[NumChunks - 1];
+ Y_VERIFY(!Chunks.empty());
+ TChunk& buf = Chunks.back();
Y_VERIFY((size_t)count <= buf.second);
- Y_VERIFY(buf.first + buf.second == BufferPtr);
+ Y_VERIFY(buf.first + buf.second == BufferPtr, "buf# %p:%zu BufferPtr# %p SizeRemain# %zu NumChunks# %zu",
+ buf.first, buf.second, BufferPtr, SizeRemain, Chunks.size());
buf.second -= count;
if (!buf.second) {
- --NumChunks;
+ Chunks.pop_back();
}
BufferPtr -= count;
SizeRemain += count;
@@ -153,7 +140,7 @@ namespace NActors {
return WriteAliasedRaw(s->data(), s->length());
}
- std::pair<TCoroutineChunkSerializer::TChunk*, TCoroutineChunkSerializer::TChunk*> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
+ std::span<TCoroutineChunkSerializer::TChunk> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
// fill in base params
BufferPtr = static_cast<char*>(data);
SizeRemain = size;
@@ -161,10 +148,10 @@ namespace NActors {
// transfer control to the coroutine
Y_VERIFY(Event);
- NumChunks = 0;
+ Chunks.clear();
Resume();
- return {Chunks, Chunks + NumChunks};
+ return Chunks;
}
void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 30ffb278af..3543af4901 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -12,6 +12,7 @@
#include <util/string/builder.h>
#include <util/thread/lfstack.h>
#include <array>
+#include <span>
namespace NActors {
@@ -77,7 +78,7 @@ namespace NActors {
void SetSerializingEvent(const IEventBase *event);
void Abort();
- std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size);
+ std::span<TChunk> FeedBuf(void* data, size_t size);
bool IsComplete() const {
return !Event;
}
@@ -102,7 +103,7 @@ namespace NActors {
protected:
void DoRun() override;
void Resume();
- bool Produce(const void *data, size_t size);
+ void Produce(const void *data, size_t size);
i64 TotalSerializedDataSize;
TMappedAllocation Stack;
@@ -111,9 +112,7 @@ namespace NActors {
TContMachineContext *BufFeedContext = nullptr;
char *BufferPtr;
size_t SizeRemain;
- static constexpr size_t MaxChunks = 16;
- TChunk Chunks[MaxChunks];
- size_t NumChunks = 0;
+ std::vector<TChunk> Chunks;
const IEventBase *Event = nullptr;
bool CancelFlag = false;
bool AbortFlag;
diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp
index 3f6d17b058..c75169db44 100644
--- a/library/cpp/actors/core/event_pb_payload_ut.cpp
+++ b/library/cpp/actors/core/event_pb_payload_ut.cpp
@@ -60,8 +60,8 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
while (!chunker.IsComplete()) {
char buffer[4096];
auto range = chunker.FeedBuf(buffer, sizeof(buffer));
- for (auto p = range.first; p != range.second; ++p) {
- chunkerRes += TString(p->first, p->second);
+ for (auto [data, size] : range) {
+ chunkerRes += TString(data, size);
}
}
UNIT_ASSERT_VALUES_EQUAL(chunkerRes, ser);
diff --git a/library/cpp/actors/core/event_pb_ut.cpp b/library/cpp/actors/core/event_pb_ut.cpp
index a16c3092b3..0dfd173651 100644
--- a/library/cpp/actors/core/event_pb_ut.cpp
+++ b/library/cpp/actors/core/event_pb_ut.cpp
@@ -61,8 +61,8 @@ Y_UNIT_TEST_SUITE(TEventSerialization) {
TString bmChunkedSerialized;
while (!chunker.IsComplete()) {
auto range = chunker.FeedBuf(&buf1[0], sizeof(buf1));
- for (auto p = range.first; p != range.second; ++p) {
- bmChunkedSerialized.append(p->first, p->second);
+ for (auto [data, size] : range) {
+ bmChunkedSerialized.append(data, size);
}
}
UNIT_ASSERT_EQUAL(bmSerialized, bmChunkedSerialized);
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index a5b02201a0..e69483a8e9 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -193,9 +193,11 @@ namespace NActors {
if (!out.size()) {
break;
}
- const auto [first, last] = Chunker.FeedBuf(out.data(), out.size());
- for (auto p = first; p != last; ++p) {
- addChunk(p->first, p->second, false);
+ ui32 bytesFed = 0;
+ for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) {
+ addChunk(buffer, size, false);
+ bytesFed += size;
+ Y_VERIFY(bytesFed <= out.size());
}
complete = Chunker.IsComplete();
if (complete) {