diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-06-01 13:53:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-01 13:53:27 +0300 |
commit | 829ac07a6fcceb9fcae299720a61075cd537aaa1 (patch) | |
tree | 23b46916981bc43f91c7c9b0e08d896ccc0a5619 | |
parent | d2eac74d96acdfa6a10a8a17a298630624ca2a73 (diff) | |
download | ydb-829ac07a6fcceb9fcae299720a61075cd537aaa1.tar.gz |
Refactor sink: shards state updating (#19071)
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 46 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.cpp | 48 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.h | 6 |
4 files changed, 70 insertions, 32 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index bba5fc63987..b76a7ec14cf 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -312,7 +312,7 @@ public: bool NeedCommit() const override { AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard() - const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot()); + const bool dontNeedCommit = IsEmpty() || (IsReadOnly() && ((ActionsCount == 1) || HasSnapshot())); return !dontNeedCommit; } diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index aa0dd44e2cc..fb2a0a205f5 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -354,20 +354,13 @@ public: ShardedWriteController->Close(); } - void SetParentTraceId(NWilson::TTraceId traceId) { - ParentTraceId = std::move(traceId); + void CleanupClosedTokens() { + YQL_ENSURE(ShardedWriteController); + ShardedWriteController->CleanupClosedTokens(); } - void UpdateShards() { - // TODO: Maybe there are better ways to initialize new shards... - for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) { - TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath); - IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE; - if (shardInfo.HasRead) { - flags |= IKqpTransactionManager::EAction::READ; - } - TxManager->AddAction(shardInfo.ShardId, flags); - } + void SetParentTraceId(NWilson::TTraceId traceId) { + ParentTraceId = std::move(traceId); } bool IsClosed() const { @@ -878,18 +871,30 @@ public: } } + void UpdateShards() { + for (const auto& shardInfo : ShardedWriteController->ExtractShardUpdates()) { + TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath); + IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE; + if (shardInfo.HasRead) { + flags |= IKqpTransactionManager::EAction::READ; + } + TxManager->AddAction(shardInfo.ShardId, flags); + } + } + void FlushBuffers() { ShardedWriteController->FlushBuffers(); UpdateShards(); } - bool Flush() { - for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) { - if (!SendDataToShard(shardInfo.ShardId)) { - return false; + bool FlushToShards() { + bool ok = true; + ShardedWriteController->ForEachPendingShard([&](const auto& shardInfo) { + if (ok && !SendDataToShard(shardInfo.ShardId)) { + ok = false; } - } - return true; + }); + return ok; } bool SendDataToShard(const ui64 shardId) { @@ -1516,7 +1521,7 @@ private: } if (Closed || outOfMemory) { - if (!WriteTableActor->Flush()) { + if (!WriteTableActor->FlushToShards()) { return; } } @@ -1933,7 +1938,7 @@ public: CA_LOG_D("Flush data"); for (auto& [_, info] : WriteInfos) { if (info.WriteTableActor->IsReady()) { - if (!info.WriteTableActor->Flush()) { + if (!info.WriteTableActor->FlushToShards()) { return false; } } @@ -2807,6 +2812,7 @@ public: Y_ABORT_UNLESS(GetTotalMemory() == 0); for (auto& [_, info] : WriteInfos) { + info.WriteTableActor->CleanupClosedTokens(); info.WriteTableActor->Unlink(); } } diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 70144eb0c10..cc6df787d62 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1187,17 +1187,15 @@ public: return insertIt->second; } - TVector<IShardedWriteController::TPendingShardInfo> GetPendingShards() const { - TVector<IShardedWriteController::TPendingShardInfo> result; + void ForEachPendingShard(std::function<void(const IShardedWriteController::TPendingShardInfo&)>&& callback) const { for (const auto& [id, shard] : ShardsInfo) { if (!shard.IsEmpty() && shard.GetSendAttempts() == 0) { - result.push_back(IShardedWriteController::TPendingShardInfo{ + callback(IShardedWriteController::TPendingShardInfo{ .ShardId = id, .HasRead = shard.HasRead(), }); } } - return result; } bool Has(ui64 shardId) const { @@ -1382,6 +1380,18 @@ public: } } + void CleanupClosedTokens() override { + AFL_ENSURE(IsEmpty()); + for (auto it = WriteInfos.begin(); it != WriteInfos.end();) { + if (it->second.Closed) { + AFL_ENSURE(it->second.Serializer->IsFinished()); + it = WriteInfos.erase(it); + } else { + ++it; + } + } + } + void FlushBuffers() override { TVector<TWriteToken> writeTokensFoFlush; for (const auto& [token, writeInfo] : WriteInfos) { @@ -1420,8 +1430,14 @@ public: } } - TVector<TPendingShardInfo> GetPendingShards() const override { - return ShardsInfo.GetPendingShards(); + void ForEachPendingShard(std::function<void(const TPendingShardInfo&)>&& callback) const override { + ShardsInfo.ForEachPendingShard(std::move(callback)); + } + + std::vector<TPendingShardInfo> ExtractShardUpdates() override { + std::vector<TPendingShardInfo> shardUpdates; + std::swap(shardUpdates, ShardUpdates); + return shardUpdates; } TVector<ui64> GetShardsIds() const override { @@ -1580,11 +1596,16 @@ private: for (auto& [shardId, batches] : writeInfo.Serializer->FlushBatchesForce()) { for (auto& batch : batches) { if (batch && !batch->IsEmpty()) { + const bool hasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT + || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE); ShardsInfo.GetShard(shardId).PushBatch(TBatchWithMetadata{ .Token = token, .Data = std::move(batch), - .HasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT - || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE), + .HasRead = hasRead, + }); + ShardUpdates.push_back(IShardedWriteController::TPendingShardInfo{ + .ShardId = shardId, + .HasRead = hasRead, }); } } @@ -1597,11 +1618,16 @@ private: if (!batch || batch->IsEmpty()) { break; } + const bool hasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT + || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE); shard.PushBatch(TBatchWithMetadata{ .Token = token, .Data = std::move(batch), - .HasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT - || writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE), + .HasRead = hasRead, + }); + ShardUpdates.push_back(IShardedWriteController::TPendingShardInfo{ + .ShardId = shardId, + .HasRead = hasRead, }); } } @@ -1615,6 +1641,7 @@ private: shard.MakeNextBatches(1); } else { shard.MakeNextBatches(std::nullopt); + AFL_ENSURE(shard.GetBatchesInFlight() == shard.Size()); } } } @@ -1646,6 +1673,7 @@ private: TWriteToken CurrentWriteToken = 0; TShardsInfo ShardsInfo; + std::vector<IShardedWriteController::TPendingShardInfo> ShardUpdates; std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry; std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index 81f005f55cd..9526b76ae28 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -69,6 +69,8 @@ public: virtual void Write(TWriteToken token, IDataBatchPtr&& data) = 0; virtual void Close(TWriteToken token) = 0; + virtual void CleanupClosedTokens() = 0; + virtual void FlushBuffers() = 0; virtual void Close() = 0; @@ -79,7 +81,9 @@ public: ui64 ShardId; bool HasRead; }; - virtual TVector<TPendingShardInfo> GetPendingShards() const = 0; + virtual void ForEachPendingShard(std::function<void(const TPendingShardInfo&)>&& callback) const = 0; + virtual std::vector<TPendingShardInfo> ExtractShardUpdates() = 0; + virtual ui64 GetShardsCount() const = 0; virtual TVector<ui64> GetShardsIds() const = 0; |