aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-06-05 19:48:11 +0300
committerGitHub <noreply@github.com>2025-06-05 19:48:11 +0300
commit33701dfdce78bb10a31d619218be65775b3730e4 (patch)
treee2e47b816296546638ebe0045a635d4922814692
parent0e700aea8a266197a8b54647882eb8aef1d699e4 (diff)
downloadydb-33701dfdce78bb10a31d619218be65775b3730e4.tar.gz
Refactor write actor buffers (#19300)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp10
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.cpp25
3 files changed, 19 insertions, 22 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index b4e2b8bb8a6..9003a2b5d5d 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -266,10 +266,6 @@ public:
if (TxManager) {
TxManager->SetHasSnapshot(GetSnapshot().IsValid());
-
- for (const ui64& shardId : TxManager->GetShards()) {
- Stats->AffectedShards.insert(shardId);
- }
}
if (!BufferActorId || (ReadOnlyTx && Request.LocksOp != ELocksOp::Rollback)) {
@@ -392,6 +388,12 @@ public:
}
}
+ if (TxManager) {
+ for (const ui64& shardId : TxManager->GetShards()) {
+ Stats->AffectedShards.insert(shardId);
+ }
+ }
+
auto resultSize = ResponseEv->GetByteSize();
if (resultSize > (int)ReplySizeLimit) {
TString message;
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index bb4c7dea7c2..20b38b07a18 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1495,6 +1495,7 @@ private:
WriteTableActor->Write(WriteToken, Batcher->Build());
if (Closed) {
WriteTableActor->Close(WriteToken);
+ WriteTableActor->FlushBuffers();
WriteTableActor->Close();
}
} catch (const TMemoryLimitExceededException&) {
@@ -1536,6 +1537,10 @@ private:
return;
}
+ if (!Closed && outOfMemory) {
+ WriteTableActor->FlushBuffers();
+ }
+
if (Closed || outOfMemory) {
if (!WriteTableActor->FlushToShards()) {
return;
@@ -2025,6 +2030,7 @@ public:
bool flushFailed = false;
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
if (!flushFailed && actor->IsReady()) {
+ actor->FlushBuffers();
if (!actor->FlushToShards()) {
flushFailed = true;
}
diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp
index 38b79509f47..c7824518a1a 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_table.cpp
@@ -1385,8 +1385,6 @@ public:
for (const auto& [token, writeInfo] : WriteInfos) {
if (writeInfo.Closed) {
Close(token);
- } else {
- FlushSerializer(token, GetMemory() >= Settings.MemoryLimitTotal);
}
}
}
@@ -1442,10 +1440,6 @@ public:
data->AttachAlloc(Alloc);
}
info.Serializer->AddData(std::move(data));
-
- if (info.Metadata.Priority == 0) {
- FlushSerializer(token, GetMemory() >= Settings.MemoryLimitTotal);
- }
}
void Close(TWriteToken token) override {
@@ -1453,10 +1447,6 @@ public:
AFL_ENSURE(info.Serializer);
info.Closed = true;
info.Serializer->Close();
- if (info.Metadata.Priority == 0) {
- FlushSerializer(token, true);
- AFL_ENSURE(info.Serializer->IsFinished());
- }
}
void CleanupClosedTokens() override {
@@ -1474,13 +1464,8 @@ public:
void FlushBuffers() override {
TVector<TWriteToken> writeTokensFoFlush;
for (const auto& [token, writeInfo] : WriteInfos) {
- AFL_ENSURE(writeInfo.Closed);
- if (writeInfo.Metadata.Priority != 0) {
- if (!writeInfo.Serializer->IsFinished()) {
- writeTokensFoFlush.push_back(token);
- }
- } else {
- AFL_ENSURE(writeInfo.Serializer->IsFinished());
+ if ((writeInfo.Metadata.Priority == 0 || writeInfo.Closed) && !writeInfo.Serializer->IsFinished()) {
+ writeTokensFoFlush.push_back(token);
}
}
@@ -1495,7 +1480,11 @@ public:
for (const TWriteToken token : writeTokensFoFlush) {
FlushSerializer(token, true);
- AFL_ENSURE(WriteInfos.at(token).Serializer->IsFinished());
+ const auto& writeInfo = WriteInfos.at(token);
+ if (writeInfo.Metadata.Priority != 0) {
+ AFL_ENSURE(writeInfo.Closed);
+ AFL_ENSURE(writeInfo.Serializer->IsFinished());
+ }
}
}