diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-02-13 23:52:29 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-02-13 23:52:29 +0300 |
commit | 0a57f2296f811e91347ef317aaa555a32a4693f1 (patch) | |
tree | 52091067683c907d031ef6f4be7ca5646c6c20c3 | |
parent | 59c29fbfd00b5c755e74098d19e1ae4a592edb60 (diff) | |
download | ydb-0a57f2296f811e91347ef317aaa555a32a4693f1.tar.gz |
add retries to TKqpDataExecuter
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 54 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 60 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 88 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 63 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 |
7 files changed, 140 insertions, 142 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 82c1b23ebe2..cf123827a02 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -48,19 +48,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da using TBase = TKqpExecuterBase<TKqpDataExecuter, EExecType::Data>; using TKqpSnapshot = IKqpGateway::TKqpSnapshot; - struct TEvPrivate { - enum EEv { - EvReattachToShard = EventSpaceBegin(TEvents::ES_PRIVATE), - }; - - struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> { - const ui64 TabletId; - - explicit TEvReattachToShard(ui64 tabletId) - : TabletId(tabletId) {} - }; - }; - struct TReattachState { TDuration Delay; TInstant Deadline; @@ -136,8 +123,8 @@ public: } TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult) - : TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter") + TKqpRequestCounters::TPtr counters, bool streamResult, ui32 executerDelayToRetryMs) + : TBase(std::move(request), database, userToken, counters, executerDelayToRetryMs, TWilsonKqp::DataExecuter, "DataExecuter") , StreamResult(streamResult) { YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); @@ -788,6 +775,7 @@ private: hFunc(TEvPrivate::TEvReattachToShard, HandleExecute); hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute); hFunc(TEvents::TEvUndelivered, HandleUndelivered); + hFunc(TEvPrivate::TEvRetry, HandleRetry); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute); @@ -2028,34 +2016,10 @@ private: } } - for (auto& [nodeId, tasks] : tasksPerNode) { - auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>(); - - ev->Record.SetTxId(TxId); - ActorIdToProto(SelfId(), ev->Record.MutableExecuterActorId()); - - if (Deadline) { - TDuration timeout = *Deadline - TAppData::TimeProvider->Now(); - ev->Record.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); - } - - ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); - ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(Request.StatsMode)); - ev->Record.MutableRuntimeSettings()->SetUseLLVM(false); - ev->Record.SetStartAllOrFail(true); - - for (auto&& task : tasks) { - ev->Record.AddTasks()->Swap(&task); - } - - auto target = MakeKqpNodeServiceID(nodeId); - - ui32 flags = IEventHandle::FlagTrackDelivery; - if (SubscribedNodes.emplace(nodeId).second) { - flags |= IEventHandle::FlagSubscribeOnSession; - } - TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags, nodeId)); - } + Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), Request.Snapshot, + Database, Nothing(), Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, + Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), ExecuterSpan, {}); + Planner->ProcessTasksForDataExecuter(); // then start data tasks with known actor ids of compute tasks for (auto& [shardId, shardTx] : DatashardTxs) { @@ -2363,9 +2327,9 @@ private: } // namespace IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters, bool streamResult) + TKqpRequestCounters::TPtr counters, bool streamResult, ui32 executerDelayToRetryMs) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult); + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerDelayToRetryMs); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 8e4a08be433..9bc27a4a152 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -151,7 +151,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.EraseLocks); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerDelayToRetryMs); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -167,13 +167,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerDelayToRetryMs); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerDelayToRetryMs); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerDelayToRetryMs); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 686207b901a..40c8f2a2808 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -2,6 +2,7 @@ #include "kqp_executer.h" #include "kqp_executer_stats.h" +#include "kqp_planner.h" #include "kqp_partition_helper.h" #include "kqp_table_resolver.h" #include "kqp_shards_resolver.h" @@ -76,14 +77,48 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, template <class TDerived, EExecType ExecType> class TKqpExecuterBase : public TActorBootstrapped<TDerived> { +protected: + struct TEvPrivate { + enum EEv { + EvRetry = EventSpaceBegin(TEvents::ES_PRIVATE), + EvResourcesSnapshot, + EvReattachToShard, + }; + + struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> { + ui32 RequestId; + TActorId Target; + + TEvRetry(ui64 requestId, const TActorId& target) + : RequestId(requestId) + , Target(target) {} + }; + + struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { + TVector<NKikimrKqp::TKqpNodeResources> Snapshot; + + TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) + : Snapshot(std::move(snapshot)) {} + }; + + struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> { + const ui64 TabletId; + + explicit TEvReattachToShard(ui64 tabletId) + : TabletId(tabletId) {} + }; + }; + public: TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters, ui64 spanVerbosity = 0, TString spanName = "no_name") + TKqpRequestCounters::TPtr counters, ui32 executerDelayToRetryMs, ui64 spanVerbosity = 0, TString spanName = "no_name") : Request(std::move(request)) , Database(database) , UserToken(userToken) , Counters(counters) , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName) + , Planner(nullptr) + , ExecuterDelayToRetryMs(executerDelayToRetryMs) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc); ResponseEv->Orbit = std::move(Request.Orbit); @@ -375,12 +410,17 @@ protected: } } - virtual void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) { + void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) { ui32 eventType = ev->Get()->SourceType; auto reason = ev->Get()->Reason; switch (eventType) { case TEvKqpNode::TEvStartKqpTasksRequest::EventType: { - InvalidateNode(ev->Cookie); + if (reason == TEvents::TEvUndelivered::EReason::ReasonActorUnknown) { + LOG_D("Schedule a retry by ActorUnknown reason, nodeId:" << ev->Sender.NodeId() << " requestId: " << ev->Cookie); + this->Schedule(TDuration::MilliSeconds(ExecuterDelayToRetryMs), new typename TEvPrivate::TEvRetry(ev->Cookie, ev->Sender)); + return; + } + InvalidateNode(ev->Sender.NodeId()); return InternalError(TStringBuilder() << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason); } @@ -390,6 +430,15 @@ protected: } } + void HandleRetry(typename TEvPrivate::TEvRetry::TPtr& ev) { + if (Planner && Planner->SendStartKqpTasksRequest(ev->Get()->RequestId, ev->Get()->Target)) { + return; + } + InvalidateNode(Target.NodeId()); + return InternalError(TStringBuilder() + << "TEvKqpNode::TEvStartKqpTasksRequest lost: ActorUnknown"); + } + void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { auto nodeId = ev->Get()->NodeId; LOG_N("Disconnected node " << nodeId); @@ -1130,6 +1179,9 @@ protected: ui64 LastTaskId = 0; TString LastComputeActorId = ""; + std::unique_ptr<TKqpPlanner> Planner; + ui32 ExecuterDelayToRetryMs; + private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; @@ -1139,7 +1191,7 @@ private: IActor* CreateKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters); IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult); + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, ui32 executerDelayToRetryMs); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 5567d314f81..0b8fa4cefc6 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -24,7 +24,7 @@ constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks, - THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, + THashMap<ui64, TVector<NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, @@ -32,7 +32,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: : TxId(txId) , ExecuterId(executer) , ComputeTasks(std::move(computeTasks)) - , ScanTasks(std::move(scanTasks)) + , MainTasksPerNode(std::move(mainTasksPerNode)) , Snapshot(snapshot) , Database(database) , UserToken(userToken) @@ -54,7 +54,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: } } -bool TKqpPlanner::SendKqpTasksRequest(ui32 requestId, const TActorId& target) { +bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) { auto& requestData = Requests[requestId]; if (requestData.RetryNumber == 3) { @@ -92,7 +92,45 @@ bool TKqpPlanner::SendKqpTasksRequest(ui32 requestId, const TActorId& target) { return true; } -void TKqpPlanner::Process() { +void TKqpPlanner::ProcessTasksForDataExecuter() { + + long requestsCnt = 0; + + for (auto& [nodeId, tasks] : MainTasksPerNode) { + + auto& requestData = Requests.emplace_back(); + + requestData.request.SetTxId(TxId); + ActorIdToProto(ExecuterId, requestData.request.MutableExecuterActorId()); + + if (Deadline) { + TDuration timeout = Deadline - TAppData::TimeProvider->Now(); + requestData.request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); + } + + requestData.request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); + requestData.request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); + requestData.request.MutableRuntimeSettings()->SetUseLLVM(false); + requestData.request.SetStartAllOrFail(true); + + for (auto&& task : tasks) { + requestData.request.AddTasks()->Swap(&task); + } + + auto target = MakeKqpNodeServiceID(nodeId); + + requestData.flag = CalcSendMessageFlagsForNode(nodeId); + requestsCnt++; + + SendStartKqpTasksRequest(Requests.size() - 1, target); + } + + if (ExecuterSpan) { + ExecuterSpan.Attribute("requestsCnt", requestsCnt); + } +} + +void TKqpPlanner::ProcessTasksForScanExecuter() { PrepareToProcess(); auto localResources = GetKqpResourceManager()->GetLocalResources(); @@ -145,15 +183,15 @@ void TKqpPlanner::Process() { AddScansToKqpNodeRequest(requestData.request, group.NodeId); auto target = MakeKqpNodeServiceID(group.NodeId); - requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + requestData.flag = CalcSendMessageFlagsForNode(group.NodeId); - SendKqpTasksRequest(Requests.size() - 1, target); + SendStartKqpTasksRequest(Requests.size() - 1, target); ++requestsCnt; } TVector<ui64> nodes; - nodes.reserve(ScanTasks.size()); - for (auto& [nodeId, _]: ScanTasks) { + nodes.reserve(MainTasksPerNode.size()); + for (auto& [nodeId, _]: MainTasksPerNode) { nodes.push_back(nodeId); } @@ -163,12 +201,12 @@ void TKqpPlanner::Process() { AddScansToKqpNodeRequest(requestData.request, nodeId); auto target = MakeKqpNodeServiceID(nodeId); - requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + requestData.flag = CalcSendMessageFlagsForNode(nodeId); LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); - SendKqpTasksRequest(Requests.size() - 1, target); + SendStartKqpTasksRequest(Requests.size() - 1, target); ++requestsCnt; } - Y_VERIFY(ScanTasks.empty()); + Y_VERIFY(MainTasksPerNode.empty()); } else { auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, "Not enough resources to execute query"); @@ -185,7 +223,7 @@ void TKqpPlanner::PrepareToProcess() { auto rmConfig = GetKqpResourceManager()->GetConfig(); ui32 tasksCount = ComputeTasks.size(); - for (auto& [shardId, tasks] : ScanTasks) { + for (auto& [shardId, tasks] : MainTasksPerNode) { tasksCount += tasks.size(); } @@ -196,7 +234,7 @@ void TKqpPlanner::PrepareToProcess() { EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i]); LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit; } - if (auto it = ScanTasks.find(ExecuterId.NodeId()); it != ScanTasks.end()) { + if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { for (size_t i = 0; i < it->second.size(); ++i) { EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()]); LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit; @@ -209,8 +247,8 @@ ui64 TKqpPlanner::GetComputeTasksNumber() const { return ComputeTasks.size(); } -ui64 TKqpPlanner::GetScanTasksNumber() const { - return ScanTasks.size(); +ui64 TKqpPlanner::GetMainTasksNumber() const { + return MainTasksPerNode.size(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -224,14 +262,14 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId()); auto target = MakeKqpNodeServiceID(ExecuterId.NodeId()); - requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + requestData.flag = CalcSendMessageFlagsForNode(ExecuterId.NodeId()); LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); - SendKqpTasksRequest(Requests.size() - 1, target); + SendStartKqpTasksRequest(Requests.size() - 1, target); long requestsCnt = 1; TVector<ui64> nodes; - for (const auto& pair: ScanTasks) { + for (const auto& pair: MainTasksPerNode) { nodes.push_back(pair.first); YQL_ENSURE(pair.first != ExecuterId.NodeId()); } @@ -249,11 +287,11 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho auto target = MakeKqpNodeServiceID(nodeId); requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); - SendKqpTasksRequest(Requests.size() - 1, target); + SendStartKqpTasksRequest(Requests.size() - 1, target); requestsCnt++; } - Y_VERIFY(ScanTasks.size() == 0); + Y_VERIFY(MainTasksPerNode.size() == 0); if (ExecuterSpan) { ExecuterSpan.Attribute("requestsCnt", requestsCnt); @@ -313,12 +351,12 @@ void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& req void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) { if (!Snapshot.IsValid()) { - Y_ASSERT(ScanTasks.size() == 0); + Y_ASSERT(MainTasksPerNode.size() == 0); return; } bool withLLVM = true; - if (auto nodeTasks = ScanTasks.FindPtr(nodeId)) { + if (auto nodeTasks = MainTasksPerNode.FindPtr(nodeId)) { LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request"); request.MutableSnapshot()->SetTxId(Snapshot.TxId); @@ -331,7 +369,7 @@ void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& AddSnapshotInfoToTaskInputs(task); request.AddTasks()->Swap(&task); } - ScanTasks.erase(nodeId); + MainTasksPerNode.erase(nodeId); } if (request.GetRuntimeSettings().GetUseLLVM()) { @@ -392,13 +430,13 @@ void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, + THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot) { - return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(scanTasks), snapshot, + return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(mainTasksPerNode), snapshot, database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, std::move(resourcesSnapshot)); } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 74ef4e4c1db..def2a6f3a8c 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -31,12 +31,13 @@ public: const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot); - bool SendKqpTasksRequest(ui32 requestId, const TActorId& target); + bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); - void Process(); + void ProcessTasksForScanExecuter(); + void ProcessTasksForDataExecuter(); ui64 GetComputeTasksNumber() const; - ui64 GetScanTasksNumber() const; + ui64 GetMainTasksNumber() const; private: void PrepareToProcess(); @@ -52,7 +53,7 @@ private: const ui64 TxId; const TActorId ExecuterId; TVector<NYql::NDqProto::TDqTask> ComputeTasks; - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> ScanTasks; + THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> MainTasksPerNode; const IKqpGateway::TKqpSnapshot Snapshot; TString Database; const TMaybe<TString> UserToken; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 2737b45ccf9..128d66d7b9c 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -1,7 +1,6 @@ #include "kqp_executer.h" #include "kqp_executer_impl.h" #include "kqp_partition_helper.h" -#include "kqp_planner.h" #include "kqp_result_channel.h" #include "kqp_tasks_graph.h" #include "kqp_tasks_validate.h" @@ -40,29 +39,6 @@ namespace { class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> { using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>; - struct TEvPrivate { - enum EEv { - EvResourcesSnapshot = EventSpaceBegin(TEvents::ES_PRIVATE), - EvRetry - }; - - struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { - TVector<NKikimrKqp::TKqpNodeResources> Snapshot; - - TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) - : Snapshot(std::move(snapshot)) {} - }; - - struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> { - ui32 RequestId; - TActorId Target; - - TEvRetry(ui64 requestId, const TActorId& target) - : RequestId(requestId) - , Target(target) {} - }; - }; - public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; @@ -72,10 +48,8 @@ public: const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, ui32 executerDelayToRetryMs) - : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter") + : TBase(std::move(request), database, userToken, counters, executerDelayToRetryMs, TWilsonKqp::ScanExecuter, "ScanExecuter") , AggregationSettings(aggregation) - , Planner(nullptr) - , ExecuterDelayToRetryMs(executerDelayToRetryMs) { YQL_ENSURE(Request.Transactions.size() == 1); YQL_ENSURE(Request.DataShardLocks.empty()); @@ -749,38 +723,9 @@ private: std::move(scanTasks), Request.Snapshot, Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot)); - LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetScanTasksNumber()); - - Planner->Process(); - } + LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetMainTasksNumber()); - void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) override { - ui32 eventType = ev->Get()->SourceType; - auto reason = ev->Get()->Reason; - switch (eventType) { - case TEvKqpNode::TEvStartKqpTasksRequest::EventType: { - if (reason == TEvents::TEvUndelivered::EReason::ReasonActorUnknown) { - LOG_D("Schedule a retry by ActorUnknown reason, nodeId:" << ev->Sender.NodeId() << " requestId: " << ev->Cookie); - Schedule(TDuration::MilliSeconds(ExecuterDelayToRetryMs), new TEvPrivate::TEvRetry(ev->Cookie, ev->Sender)); - return; - } - InvalidateNode(ev->Sender.NodeId()); - return InternalError(TStringBuilder() - << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason); - } - default: { - LOG_E("Event lost, type: " << eventType << ", reason: " << reason); - } - } - } - - void HandleRetry(TEvPrivate::TEvRetry::TPtr& ev) { - if (Planner->SendKqpTasksRequest(ev->Get()->RequestId, ev->Get()->Target)) { - return; - } - InvalidateNode(Target.NodeId()); - return InternalError(TStringBuilder() - << "TEvKqpNode::TEvStartKqpTasksRequest lost: ActorUnknown"); + Planner->ProcessTasksForScanExecuter(); } private: @@ -846,8 +791,6 @@ public: } private: const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; - std::unique_ptr<TKqpPlanner> Planner; - ui32 ExecuterDelayToRetryMs; }; } // namespace diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 0a2b64f8209..a353f50fc6b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1236,7 +1236,7 @@ message TTableServiceConfig { optional TAggregationConfig AggregationConfig = 29; optional bool EnableKqpScanQueryStreamLookup = 30 [default = false]; optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; - optional uint32 ExecuterDelayToRetryMs = 32 [default = 200]; + optional uint32 ExecuterDelayToRetryMs = 32 [default = 400]; }; // Config describes immediate controls and allows |