aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2022-12-06 12:38:53 +0300
committerssmike <ssmike@ydb.tech>2022-12-06 12:38:53 +0300
commita13a044a719875aa309bee399534784dfe876d8c (patch)
tree2796500028f60a5a5962ff0aed016f281167e676
parent67aa93ab5fe5908e07d8eb8add192947f76f5886 (diff)
downloadydb-a13a044a719875aa309bee399534784dfe876d8c.tar.gz
refactor waitresolve statefn
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp34
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h64
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp43
3 files changed, 80 insertions, 61 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 1130e7a08e4..6297bdeeb21 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -260,6 +260,23 @@ public:
PassAway();
}
+ STATEFN(WaitResolveState) {
+ try {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
+ hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
+ hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvents::TEvWakeup, HandleTimeout);
+ default:
+ UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
+ }
+
+ } catch (const yexception& e) {
+ InternalError(e.what());
+ }
+ ReportEventElapsedTime();
+ }
+
private:
STATEFN(PrepareState) {
try {
@@ -1388,11 +1405,6 @@ private:
YQL_ENSURE(result.second);
}
-public:
- void OnTablesResolve() override {
- Execute();
- }
-
void Execute() {
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
@@ -1667,7 +1679,17 @@ public:
}
}
- void OnShardsResolve() override {
+ void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
+ if (!TBase::HandleResolve(ev)) return;
+ Execute();
+ }
+
+ void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
+ if (!TBase::HandleResolve(ev)) return;
+ OnShardsResolve();
+ }
+
+ void OnShardsResolve() {
const bool forceSnapshot = (
ReadOnlyTx &&
!ImmediateTx &&
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index d7606afe190..a09eeda1ae4 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -123,70 +123,26 @@ public:
}
protected:
- TActorId KqpShardsResolverId;
-
- STATEFN(WaitResolveState) {
- try {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
- hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
- hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- hFunc(TEvents::TEvWakeup, HandleTimeout);
- default:
- UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
- }
-
- } catch (const yexception& e) {
- InternalError(e.what());
- }
- ReportEventElapsedTime();
- }
-
- TSet<ui64> CollectShards() {
- TSet<ui64> shardIds;
- for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) {
- if (stageInfo.Meta.ShardKey) {
- for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
- shardIds.insert(partition.ShardId);
- }
- }
- }
- return shardIds;
- }
-
- virtual void OnTablesResolve() {
- TSet<ui64> shardIds = CollectShards();
- if (shardIds.size() > 0) {
- LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
- auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds));
- KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver);
- } else {
- static_cast<TDerived*>(this)->Execute();
- }
- }
-
- void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
+ [[nodiscard]]
+ bool HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
auto& reply = *ev->Get();
KqpTableResolverId = {};
if (reply.Status != Ydb::StatusIds::SUCCESS) {
ReplyErrorAndDie(reply.Status, reply.Issues);
- return;
+ return false;
}
if (ExecuterTableResolveSpan) {
ExecuterTableResolveSpan.End();
}
- OnTablesResolve();
- }
-
- virtual void OnShardsResolve() {
- static_cast<TDerived*>(this)->Execute();
+ return true;
}
- void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
+ [[nodiscard]]
+ bool HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
auto& reply = *ev->Get();
KqpShardsResolverId = {};
@@ -197,7 +153,7 @@ protected:
LOG_W("Shards nodes resolve failed, status: " << Ydb::StatusIds_StatusCode_Name(reply.Status)
<< ", issues: " << reply.Issues.ToString());
ReplyErrorAndDie(reply.Status, reply.Issues);
- return;
+ return false;
}
LOG_D("Shards nodes resolved, success: " << reply.ShardNodes.size() << ", failed: " << reply.Unresolved);
@@ -221,8 +177,7 @@ protected:
}
LOG_D(sb);
}
-
- OnShardsResolve();
+ return true;
}
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
@@ -360,7 +315,7 @@ protected:
KqpTableResolverId = this->RegisterWithSameMailbox(kqpTableResolver);
LOG_T("Got request, become WaitResolveState");
- this->Become(&TKqpExecuterBase::WaitResolveState);
+ this->Become(&TDerived::WaitResolveState);
if (ExecuterStateSpan) {
ExecuterStateSpan.End();
ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterWaitResolveState, ExecuterSpan.GetTraceId(), "WaitResolveState", NWilson::EFlags::AUTO_END);
@@ -1013,6 +968,7 @@ protected:
TKqpTableKeys TableKeys;
TActorId KqpTableResolverId;
+ TActorId KqpShardsResolverId;
THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
TVector<TProgressStat> LastStats;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index b9b2661b179..63505936e38 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -70,6 +70,22 @@ public:
}
public:
+ STATEFN(WaitResolveState) {
+ try {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
+ hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
+ hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvents::TEvWakeup, HandleTimeout);
+ default:
+ UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
+ }
+
+ } catch (const yexception& e) {
+ InternalError(e.what());
+ }
+ ReportEventElapsedTime();
+ }
private:
STATEFN(ExecuteState) {
@@ -465,7 +481,31 @@ private:
}
}
-public:
+ void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
+ if (!TBase::HandleResolve(ev)) return;
+ TSet<ui64> shardIds;
+ for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) {
+ if (stageInfo.Meta.ShardKey) {
+ for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
+ shardIds.insert(partition.ShardId);
+ }
+ }
+ }
+ if (shardIds) {
+ LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
+ auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds));
+ KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver);
+ } else {
+ Execute();
+ }
+ }
+
+
+ void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
+ if (!TBase::HandleResolve(ev)) return;
+ Execute();
+ }
+
void Execute() {
LWTRACK(KqpScanExecuterStartExecute, ResponseEv->Orbit, TxId);
auto& funcRegistry = *AppData()->FunctionRegistry;
@@ -696,6 +736,7 @@ public:
}
}
+public:
void Finalize() {
auto& response = *ResponseEv->Record.MutableResponse();