aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-06-01 13:53:27 +0300
committerGitHub <noreply@github.com>2025-06-01 13:53:27 +0300
commit829ac07a6fcceb9fcae299720a61075cd537aaa1 (patch)
tree23b46916981bc43f91c7c9b0e08d896ccc0a5619
parentd2eac74d96acdfa6a10a8a17a298630624ca2a73 (diff)
downloadydb-829ac07a6fcceb9fcae299720a61075cd537aaa1.tar.gz
Refactor sink: shards state updating (#19071)
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp46
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.cpp48
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.h6
4 files changed, 70 insertions, 32 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp
index bba5fc63987..b76a7ec14cf 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.cpp
+++ b/ydb/core/kqp/common/kqp_tx_manager.cpp
@@ -312,7 +312,7 @@ public:
bool NeedCommit() const override {
AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
- const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
+ const bool dontNeedCommit = IsEmpty() || (IsReadOnly() && ((ActionsCount == 1) || HasSnapshot()));
return !dontNeedCommit;
}
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index aa0dd44e2cc..fb2a0a205f5 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -354,20 +354,13 @@ public:
ShardedWriteController->Close();
}
- void SetParentTraceId(NWilson::TTraceId traceId) {
- ParentTraceId = std::move(traceId);
+ void CleanupClosedTokens() {
+ YQL_ENSURE(ShardedWriteController);
+ ShardedWriteController->CleanupClosedTokens();
}
- void UpdateShards() {
- // TODO: Maybe there are better ways to initialize new shards...
- for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
- TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath);
- IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
- if (shardInfo.HasRead) {
- flags |= IKqpTransactionManager::EAction::READ;
- }
- TxManager->AddAction(shardInfo.ShardId, flags);
- }
+ void SetParentTraceId(NWilson::TTraceId traceId) {
+ ParentTraceId = std::move(traceId);
}
bool IsClosed() const {
@@ -878,18 +871,30 @@ public:
}
}
+ void UpdateShards() {
+ for (const auto& shardInfo : ShardedWriteController->ExtractShardUpdates()) {
+ TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath);
+ IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
+ if (shardInfo.HasRead) {
+ flags |= IKqpTransactionManager::EAction::READ;
+ }
+ TxManager->AddAction(shardInfo.ShardId, flags);
+ }
+ }
+
void FlushBuffers() {
ShardedWriteController->FlushBuffers();
UpdateShards();
}
- bool Flush() {
- for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
- if (!SendDataToShard(shardInfo.ShardId)) {
- return false;
+ bool FlushToShards() {
+ bool ok = true;
+ ShardedWriteController->ForEachPendingShard([&](const auto& shardInfo) {
+ if (ok && !SendDataToShard(shardInfo.ShardId)) {
+ ok = false;
}
- }
- return true;
+ });
+ return ok;
}
bool SendDataToShard(const ui64 shardId) {
@@ -1516,7 +1521,7 @@ private:
}
if (Closed || outOfMemory) {
- if (!WriteTableActor->Flush()) {
+ if (!WriteTableActor->FlushToShards()) {
return;
}
}
@@ -1933,7 +1938,7 @@ public:
CA_LOG_D("Flush data");
for (auto& [_, info] : WriteInfos) {
if (info.WriteTableActor->IsReady()) {
- if (!info.WriteTableActor->Flush()) {
+ if (!info.WriteTableActor->FlushToShards()) {
return false;
}
}
@@ -2807,6 +2812,7 @@ public:
Y_ABORT_UNLESS(GetTotalMemory() == 0);
for (auto& [_, info] : WriteInfos) {
+ info.WriteTableActor->CleanupClosedTokens();
info.WriteTableActor->Unlink();
}
}
diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp
index 70144eb0c10..cc6df787d62 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_table.cpp
@@ -1187,17 +1187,15 @@ public:
return insertIt->second;
}
- TVector<IShardedWriteController::TPendingShardInfo> GetPendingShards() const {
- TVector<IShardedWriteController::TPendingShardInfo> result;
+ void ForEachPendingShard(std::function<void(const IShardedWriteController::TPendingShardInfo&)>&& callback) const {
for (const auto& [id, shard] : ShardsInfo) {
if (!shard.IsEmpty() && shard.GetSendAttempts() == 0) {
- result.push_back(IShardedWriteController::TPendingShardInfo{
+ callback(IShardedWriteController::TPendingShardInfo{
.ShardId = id,
.HasRead = shard.HasRead(),
});
}
}
- return result;
}
bool Has(ui64 shardId) const {
@@ -1382,6 +1380,18 @@ public:
}
}
+ void CleanupClosedTokens() override {
+ AFL_ENSURE(IsEmpty());
+ for (auto it = WriteInfos.begin(); it != WriteInfos.end();) {
+ if (it->second.Closed) {
+ AFL_ENSURE(it->second.Serializer->IsFinished());
+ it = WriteInfos.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
void FlushBuffers() override {
TVector<TWriteToken> writeTokensFoFlush;
for (const auto& [token, writeInfo] : WriteInfos) {
@@ -1420,8 +1430,14 @@ public:
}
}
- TVector<TPendingShardInfo> GetPendingShards() const override {
- return ShardsInfo.GetPendingShards();
+ void ForEachPendingShard(std::function<void(const TPendingShardInfo&)>&& callback) const override {
+ ShardsInfo.ForEachPendingShard(std::move(callback));
+ }
+
+ std::vector<TPendingShardInfo> ExtractShardUpdates() override {
+ std::vector<TPendingShardInfo> shardUpdates;
+ std::swap(shardUpdates, ShardUpdates);
+ return shardUpdates;
}
TVector<ui64> GetShardsIds() const override {
@@ -1580,11 +1596,16 @@ private:
for (auto& [shardId, batches] : writeInfo.Serializer->FlushBatchesForce()) {
for (auto& batch : batches) {
if (batch && !batch->IsEmpty()) {
+ const bool hasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
+ || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE);
ShardsInfo.GetShard(shardId).PushBatch(TBatchWithMetadata{
.Token = token,
.Data = std::move(batch),
- .HasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
- || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE),
+ .HasRead = hasRead,
+ });
+ ShardUpdates.push_back(IShardedWriteController::TPendingShardInfo{
+ .ShardId = shardId,
+ .HasRead = hasRead,
});
}
}
@@ -1597,11 +1618,16 @@ private:
if (!batch || batch->IsEmpty()) {
break;
}
+ const bool hasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
+ || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE);
shard.PushBatch(TBatchWithMetadata{
.Token = token,
.Data = std::move(batch),
- .HasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
- || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE),
+ .HasRead = hasRead,
+ });
+ ShardUpdates.push_back(IShardedWriteController::TPendingShardInfo{
+ .ShardId = shardId,
+ .HasRead = hasRead,
});
}
}
@@ -1615,6 +1641,7 @@ private:
shard.MakeNextBatches(1);
} else {
shard.MakeNextBatches(std::nullopt);
+ AFL_ENSURE(shard.GetBatchesInFlight() == shard.Size());
}
}
}
@@ -1646,6 +1673,7 @@ private:
TWriteToken CurrentWriteToken = 0;
TShardsInfo ShardsInfo;
+ std::vector<IShardedWriteController::TPendingShardInfo> ShardUpdates;
std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h
index 81f005f55cd..9526b76ae28 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.h
+++ b/ydb/core/kqp/runtime/kqp_write_table.h
@@ -69,6 +69,8 @@ public:
virtual void Write(TWriteToken token, IDataBatchPtr&& data) = 0;
virtual void Close(TWriteToken token) = 0;
+ virtual void CleanupClosedTokens() = 0;
+
virtual void FlushBuffers() = 0;
virtual void Close() = 0;
@@ -79,7 +81,9 @@ public:
ui64 ShardId;
bool HasRead;
};
- virtual TVector<TPendingShardInfo> GetPendingShards() const = 0;
+ virtual void ForEachPendingShard(std::function<void(const TPendingShardInfo&)>&& callback) const = 0;
+ virtual std::vector<TPendingShardInfo> ExtractShardUpdates() = 0;
+
virtual ui64 GetShardsCount() const = 0;
virtual TVector<ui64> GetShardsIds() const = 0;