diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-06-05 19:48:11 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-05 19:48:11 +0300 |
commit | 33701dfdce78bb10a31d619218be65775b3730e4 (patch) | |
tree | e2e47b816296546638ebe0045a635d4922814692 | |
parent | 0e700aea8a266197a8b54647882eb8aef1d699e4 (diff) | |
download | ydb-33701dfdce78bb10a31d619218be65775b3730e4.tar.gz |
Refactor write actor buffers (#19300)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.cpp | 25 |
3 files changed, 19 insertions, 22 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index b4e2b8bb8a6..9003a2b5d5d 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -266,10 +266,6 @@ public: if (TxManager) { TxManager->SetHasSnapshot(GetSnapshot().IsValid()); - - for (const ui64& shardId : TxManager->GetShards()) { - Stats->AffectedShards.insert(shardId); - } } if (!BufferActorId || (ReadOnlyTx && Request.LocksOp != ELocksOp::Rollback)) { @@ -392,6 +388,12 @@ public: } } + if (TxManager) { + for (const ui64& shardId : TxManager->GetShards()) { + Stats->AffectedShards.insert(shardId); + } + } + auto resultSize = ResponseEv->GetByteSize(); if (resultSize > (int)ReplySizeLimit) { TString message; diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index bb4c7dea7c2..20b38b07a18 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1495,6 +1495,7 @@ private: WriteTableActor->Write(WriteToken, Batcher->Build()); if (Closed) { WriteTableActor->Close(WriteToken); + WriteTableActor->FlushBuffers(); WriteTableActor->Close(); } } catch (const TMemoryLimitExceededException&) { @@ -1536,6 +1537,10 @@ private: return; } + if (!Closed && outOfMemory) { + WriteTableActor->FlushBuffers(); + } + if (Closed || outOfMemory) { if (!WriteTableActor->FlushToShards()) { return; @@ -2025,6 +2030,7 @@ public: bool flushFailed = false; ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { if (!flushFailed && actor->IsReady()) { + actor->FlushBuffers(); if (!actor->FlushToShards()) { flushFailed = true; } diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 38b79509f47..c7824518a1a 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1385,8 +1385,6 @@ public: for (const auto& [token, writeInfo] : WriteInfos) { if (writeInfo.Closed) { Close(token); - } else { - FlushSerializer(token, GetMemory() >= Settings.MemoryLimitTotal); } } } @@ -1442,10 +1440,6 @@ public: data->AttachAlloc(Alloc); } info.Serializer->AddData(std::move(data)); - - if (info.Metadata.Priority == 0) { - FlushSerializer(token, GetMemory() >= Settings.MemoryLimitTotal); - } } void Close(TWriteToken token) override { @@ -1453,10 +1447,6 @@ public: AFL_ENSURE(info.Serializer); info.Closed = true; info.Serializer->Close(); - if (info.Metadata.Priority == 0) { - FlushSerializer(token, true); - AFL_ENSURE(info.Serializer->IsFinished()); - } } void CleanupClosedTokens() override { @@ -1474,13 +1464,8 @@ public: void FlushBuffers() override { TVector<TWriteToken> writeTokensFoFlush; for (const auto& [token, writeInfo] : WriteInfos) { - AFL_ENSURE(writeInfo.Closed); - if (writeInfo.Metadata.Priority != 0) { - if (!writeInfo.Serializer->IsFinished()) { - writeTokensFoFlush.push_back(token); - } - } else { - AFL_ENSURE(writeInfo.Serializer->IsFinished()); + if ((writeInfo.Metadata.Priority == 0 || writeInfo.Closed) && !writeInfo.Serializer->IsFinished()) { + writeTokensFoFlush.push_back(token); } } @@ -1495,7 +1480,11 @@ public: for (const TWriteToken token : writeTokensFoFlush) { FlushSerializer(token, true); - AFL_ENSURE(WriteInfos.at(token).Serializer->IsFinished()); + const auto& writeInfo = WriteInfos.at(token); + if (writeInfo.Metadata.Priority != 0) { + AFL_ENSURE(writeInfo.Closed); + AFL_ENSURE(writeInfo.Serializer->IsFinished()); + } } } |