diff options
author | ssmike <ssmike@ydb.tech> | 2022-12-06 12:38:53 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2022-12-06 12:38:53 +0300 |
commit | a13a044a719875aa309bee399534784dfe876d8c (patch) | |
tree | 2796500028f60a5a5962ff0aed016f281167e676 | |
parent | 67aa93ab5fe5908e07d8eb8add192947f76f5886 (diff) | |
download | ydb-a13a044a719875aa309bee399534784dfe876d8c.tar.gz |
refactor waitresolve statefn
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 34 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 64 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 43 |
3 files changed, 80 insertions, 61 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 1130e7a08e4..6297bdeeb21 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -260,6 +260,23 @@ public: PassAway(); } + STATEFN(WaitResolveState) { + try { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); + hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); + hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvents::TEvWakeup, HandleTimeout); + default: + UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); + } + + } catch (const yexception& e) { + InternalError(e.what()); + } + ReportEventElapsedTime(); + } + private: STATEFN(PrepareState) { try { @@ -1388,11 +1405,6 @@ private: YQL_ENSURE(result.second); } -public: - void OnTablesResolve() override { - Execute(); - } - void Execute() { NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); @@ -1667,7 +1679,17 @@ public: } } - void OnShardsResolve() override { + void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { + if (!TBase::HandleResolve(ev)) return; + Execute(); + } + + void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { + if (!TBase::HandleResolve(ev)) return; + OnShardsResolve(); + } + + void OnShardsResolve() { const bool forceSnapshot = ( ReadOnlyTx && !ImmediateTx && diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index d7606afe190..a09eeda1ae4 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -123,70 +123,26 @@ public: } protected: - TActorId KqpShardsResolverId; - - STATEFN(WaitResolveState) { - try { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); - hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); - hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - hFunc(TEvents::TEvWakeup, HandleTimeout); - default: - UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); - } - - } catch (const yexception& e) { - InternalError(e.what()); - } - ReportEventElapsedTime(); - } - - TSet<ui64> CollectShards() { - TSet<ui64> shardIds; - for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) { - if (stageInfo.Meta.ShardKey) { - for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { - shardIds.insert(partition.ShardId); - } - } - } - return shardIds; - } - - virtual void OnTablesResolve() { - TSet<ui64> shardIds = CollectShards(); - if (shardIds.size() > 0) { - LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); - auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds)); - KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver); - } else { - static_cast<TDerived*>(this)->Execute(); - } - } - - void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { + [[nodiscard]] + bool HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { auto& reply = *ev->Get(); KqpTableResolverId = {}; if (reply.Status != Ydb::StatusIds::SUCCESS) { ReplyErrorAndDie(reply.Status, reply.Issues); - return; + return false; } if (ExecuterTableResolveSpan) { ExecuterTableResolveSpan.End(); } - OnTablesResolve(); - } - - virtual void OnShardsResolve() { - static_cast<TDerived*>(this)->Execute(); + return true; } - void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { + [[nodiscard]] + bool HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { auto& reply = *ev->Get(); KqpShardsResolverId = {}; @@ -197,7 +153,7 @@ protected: LOG_W("Shards nodes resolve failed, status: " << Ydb::StatusIds_StatusCode_Name(reply.Status) << ", issues: " << reply.Issues.ToString()); ReplyErrorAndDie(reply.Status, reply.Issues); - return; + return false; } LOG_D("Shards nodes resolved, success: " << reply.ShardNodes.size() << ", failed: " << reply.Unresolved); @@ -221,8 +177,7 @@ protected: } LOG_D(sb); } - - OnShardsResolve(); + return true; } void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { @@ -360,7 +315,7 @@ protected: KqpTableResolverId = this->RegisterWithSameMailbox(kqpTableResolver); LOG_T("Got request, become WaitResolveState"); - this->Become(&TKqpExecuterBase::WaitResolveState); + this->Become(&TDerived::WaitResolveState); if (ExecuterStateSpan) { ExecuterStateSpan.End(); ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::ExecuterWaitResolveState, ExecuterSpan.GetTraceId(), "WaitResolveState", NWilson::EFlags::AUTO_END); @@ -1013,6 +968,7 @@ protected: TKqpTableKeys TableKeys; TActorId KqpTableResolverId; + TActorId KqpShardsResolverId; THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS) TVector<TProgressStat> LastStats; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index b9b2661b179..63505936e38 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -70,6 +70,22 @@ public: } public: + STATEFN(WaitResolveState) { + try { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); + hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); + hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvents::TEvWakeup, HandleTimeout); + default: + UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); + } + + } catch (const yexception& e) { + InternalError(e.what()); + } + ReportEventElapsedTime(); + } private: STATEFN(ExecuteState) { @@ -465,7 +481,31 @@ private: } } -public: + void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { + if (!TBase::HandleResolve(ev)) return; + TSet<ui64> shardIds; + for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) { + if (stageInfo.Meta.ShardKey) { + for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { + shardIds.insert(partition.ShardId); + } + } + } + if (shardIds) { + LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); + auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds)); + KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver); + } else { + Execute(); + } + } + + + void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { + if (!TBase::HandleResolve(ev)) return; + Execute(); + } + void Execute() { LWTRACK(KqpScanExecuterStartExecute, ResponseEv->Orbit, TxId); auto& funcRegistry = *AppData()->FunctionRegistry; @@ -696,6 +736,7 @@ public: } } +public: void Finalize() { auto& response = *ResponseEv->Record.MutableResponse(); |