aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-03-24 15:56:34 +0300
committerGitHub <noreply@github.com>2025-03-24 15:56:34 +0300
commitb7dfcdeb5b91e484351beeedf2beeca9698d5491 (patch)
treeeb375147e5855ebd685e1a0fca17dbf014703391
parent76677884a105e0cd67a634f897e2e05b61ac02a3 (diff)
downloadydb-b7dfcdeb5b91e484351beeedf2beeca9698d5491.tar.gz
Fix participant nodes count for sinks (#16137)
-rw-r--r--.github/config/muted_ya.txt2
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.cpp10
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp4
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h6
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp10
-rw-r--r--ydb/core/kqp/ut/query/kqp_stats_ut.cpp2
9 files changed, 38 insertions, 6 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index f567fca7e3d..46923926eea 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -27,8 +27,6 @@ ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink
ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink
-ydb/core/kqp/ut/query KqpStats.OneShardLocalExec+UseSink
-ydb/core/kqp/ut/query KqpStats.OneShardNonLocalExec+UseSink
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp
index cfdf91276e3..2d973deea18 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.cpp
+++ b/ydb/core/kqp/common/kqp_tx_manager.cpp
@@ -162,6 +162,14 @@ public:
return nullptr;
}
+ void AddParticipantNode(const ui32 nodeId) override {
+ ParticipantNodes.insert(nodeId);
+ }
+
+ const THashSet<ui32>& GetParticipantNodes() const override {
+ return ParticipantNodes;
+ }
+
void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) override {
TopicOperations = std::move(topicOperations);
}
@@ -506,6 +514,8 @@ private:
THashMap<ui64, TShardInfo> ShardsInfo;
std::unordered_set<TString> TablePathes;
+ THashSet<ui32> ParticipantNodes;
+
THashMap<TTableId, std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>> TablePartitioning;
bool AllowVolatile = false;
diff --git a/ydb/core/kqp/common/kqp_tx_manager.h b/ydb/core/kqp/common/kqp_tx_manager.h
index 91596b139fe..dc11e0b12be 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.h
+++ b/ydb/core/kqp/common/kqp_tx_manager.h
@@ -71,6 +71,9 @@ public:
virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0;
virtual bool HasTopics() const = 0;
+ virtual void AddParticipantNode(const ui32 nodeId) = 0;
+ virtual const THashSet<ui32>& GetParticipantNodes() const = 0;
+
virtual bool IsTxPrepared() const = 0;
virtual bool IsTxFinished() const = 0;
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index c3e0e508fd5..1eae29d58c4 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1277,6 +1277,10 @@ private:
Stats->AddDatashardStats(std::move(*res->Record.MutableTxStats()));
}
+ if (TxManager) {
+ TxManager->AddParticipantNode(ev->Sender.NodeId());
+ }
+
switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
YQL_ENSURE(false);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 89774bf64cf..14e63bae93a 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -265,6 +265,9 @@ protected:
for (auto& [shardId, nodeId] : ShardIdToNodeId) {
ShardsOnNode[nodeId].push_back(shardId);
ParticipantNodes.emplace(nodeId);
+ if (TxManager) {
+ TxManager->AddParticipantNode(nodeId);
+ }
}
if (IsDebugLogEnabled()) {
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 70a1944c599..da31de6db07 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -548,6 +548,8 @@ public:
<< ", Cookie=" << ev->Cookie);
UpdateStats(ev->Get()->Record.GetTxStats());
+ TxManager->AddParticipantNode(ev->Sender.NodeId());
+
switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
CA_LOG_E("Got UNSPECIFIED for table `"
@@ -2349,6 +2351,8 @@ public:
return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder;
};
+ TxManager->AddParticipantNode(ev->Sender.NodeId());
+
// TODO: get rid of copy-paste
switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index ecbd3a5348e..57411dde596 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -176,12 +176,16 @@ public:
if (RequestEv->GetRequestCtx() == nullptr) {
return false;
}
- if (ParticipantNodes.size() == 1) {
+ if (IsSingleNodeExecution()) {
return *ParticipantNodes.begin() == nodeId;
}
return false;
}
+ bool IsSingleNodeExecution() const {
+ return ParticipantNodes.size() == 1;
+ }
+
NKikimrKqp::EQueryAction GetAction() const {
return QueryAction;
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c29342f91a9..229e439434f 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1627,8 +1627,12 @@ public:
QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats());
}
- for (auto nodeId : ev->ParticipantNodes) {
- QueryState->ParticipantNodes.emplace(nodeId);
+ if (QueryState->TxCtx->TxManager) {
+ QueryState->ParticipantNodes = QueryState->TxCtx->TxManager->GetParticipantNodes();
+ } else {
+ for (auto nodeId : ev->ParticipantNodes) {
+ QueryState->ParticipantNodes.emplace(nodeId);
+ }
}
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -2377,7 +2381,7 @@ public:
QueryState->PoolHandlerActor = Nothing();
}
- if (QueryState && QueryState->ParticipantNodes.size() == 1) {
+ if (QueryState && QueryState->IsSingleNodeExecution()) {
Counters->TotalSingleNodeReqCount->Inc();
if (!QueryState->IsLocalExecution(SelfId().NodeId())) {
Counters->NonLocalSingleNodeReqCount->Inc();
diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
index d79aafa49f4..a610a1005b3 100644
--- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
@@ -685,6 +685,8 @@ Y_UNIT_TEST_TWIN(OneShardLocalExec, UseSink) {
auto session = db.CreateSession().GetValueSync().GetSession();
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 1);
{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/KeyValue` WHERE Key = 1;