diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-03-24 15:56:34 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-24 15:56:34 +0300 |
commit | b7dfcdeb5b91e484351beeedf2beeca9698d5491 (patch) | |
tree | eb375147e5855ebd685e1a0fca17dbf014703391 | |
parent | 76677884a105e0cd67a634f897e2e05b61ac02a3 (diff) | |
download | ydb-b7dfcdeb5b91e484351beeedf2beeca9698d5491.tar.gz |
Fix participant nodes count for sinks (#16137)
-rw-r--r-- | .github/config/muted_ya.txt | 2 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_stats_ut.cpp | 2 |
9 files changed, 38 insertions, 6 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index f567fca7e3d..46923926eea 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -27,8 +27,6 @@ ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink -ydb/core/kqp/ut/query KqpStats.OneShardLocalExec+UseSink -ydb/core/kqp/ut/query KqpStats.OneShardNonLocalExec+UseSink ydb/core/kqp/ut/query KqpStats.SysViewClientLost ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index cfdf91276e3..2d973deea18 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -162,6 +162,14 @@ public: return nullptr; } + void AddParticipantNode(const ui32 nodeId) override { + ParticipantNodes.insert(nodeId); + } + + const THashSet<ui32>& GetParticipantNodes() const override { + return ParticipantNodes; + } + void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) override { TopicOperations = std::move(topicOperations); } @@ -506,6 +514,8 @@ private: THashMap<ui64, TShardInfo> ShardsInfo; std::unordered_set<TString> TablePathes; + THashSet<ui32> ParticipantNodes; + THashMap<TTableId, std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>> TablePartitioning; bool AllowVolatile = false; diff --git a/ydb/core/kqp/common/kqp_tx_manager.h b/ydb/core/kqp/common/kqp_tx_manager.h index 91596b139fe..dc11e0b12be 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.h +++ b/ydb/core/kqp/common/kqp_tx_manager.h @@ -71,6 +71,9 @@ public: virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0; virtual bool HasTopics() const = 0; + virtual void AddParticipantNode(const ui32 nodeId) = 0; + virtual const THashSet<ui32>& GetParticipantNodes() const = 0; + virtual bool IsTxPrepared() const = 0; virtual bool IsTxFinished() const = 0; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index c3e0e508fd5..1eae29d58c4 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1277,6 +1277,10 @@ private: Stats->AddDatashardStats(std::move(*res->Record.MutableTxStats())); } + if (TxManager) { + TxManager->AddParticipantNode(ev->Sender.NodeId()); + } + switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { YQL_ENSURE(false); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 89774bf64cf..14e63bae93a 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -265,6 +265,9 @@ protected: for (auto& [shardId, nodeId] : ShardIdToNodeId) { ShardsOnNode[nodeId].push_back(shardId); ParticipantNodes.emplace(nodeId); + if (TxManager) { + TxManager->AddParticipantNode(nodeId); + } } if (IsDebugLogEnabled()) { diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 70a1944c599..da31de6db07 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -548,6 +548,8 @@ public: << ", Cookie=" << ev->Cookie); UpdateStats(ev->Get()->Record.GetTxStats()); + TxManager->AddParticipantNode(ev->Sender.NodeId()); + switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { CA_LOG_E("Got UNSPECIFIED for table `" @@ -2349,6 +2351,8 @@ public: return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder; }; + TxManager->AddParticipantNode(ev->Sender.NodeId()); + // TODO: get rid of copy-paste switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index ecbd3a5348e..57411dde596 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -176,12 +176,16 @@ public: if (RequestEv->GetRequestCtx() == nullptr) { return false; } - if (ParticipantNodes.size() == 1) { + if (IsSingleNodeExecution()) { return *ParticipantNodes.begin() == nodeId; } return false; } + bool IsSingleNodeExecution() const { + return ParticipantNodes.size() == 1; + } + NKikimrKqp::EQueryAction GetAction() const { return QueryAction; } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c29342f91a9..229e439434f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1627,8 +1627,12 @@ public: QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats()); } - for (auto nodeId : ev->ParticipantNodes) { - QueryState->ParticipantNodes.emplace(nodeId); + if (QueryState->TxCtx->TxManager) { + QueryState->ParticipantNodes = QueryState->TxCtx->TxManager->GetParticipantNodes(); + } else { + for (auto nodeId : ev->ParticipantNodes) { + QueryState->ParticipantNodes.emplace(nodeId); + } } if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -2377,7 +2381,7 @@ public: QueryState->PoolHandlerActor = Nothing(); } - if (QueryState && QueryState->ParticipantNodes.size() == 1) { + if (QueryState && QueryState->IsSingleNodeExecution()) { Counters->TotalSingleNodeReqCount->Inc(); if (!QueryState->IsLocalExecution(SelfId().NodeId())) { Counters->NonLocalSingleNodeReqCount->Inc(); diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index d79aafa49f4..a610a1005b3 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -685,6 +685,8 @@ Y_UNIT_TEST_TWIN(OneShardLocalExec, UseSink) { auto session = db.CreateSession().GetValueSync().GetSession(); TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 1); { auto result = session.ExecuteDataQuery(R"( SELECT * FROM `/Root/KeyValue` WHERE Key = 1; |