diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-02-09 13:17:26 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-02-09 13:17:26 +0300 |
commit | 016ae54b59f8e01d21a2d1573fb3e5ab39e734ee (patch) | |
tree | 143107ef698cd010b814046465dee80783ba5eff | |
parent | aecae6efeea7d5c240e51e4652fd7d7f40d0a874 (diff) | |
download | ydb-016ae54b59f8e01d21a2d1573fb3e5ab39e734ee.tar.gz |
add retries to TKqpScanExecuter
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 286 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 52 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner_strategy.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 106 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 |
10 files changed, 272 insertions, 198 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 5c09f69138f..1d6d70929eb 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -84,7 +84,8 @@ struct TEvKqpExecuter { }; IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation); + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, ui32 executerDelayToRetryMs); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 79ff9fcdb41..8e4a08be433 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -145,7 +145,8 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation) + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, + ui32 executerDelayToRetryMs) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction @@ -169,7 +170,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false); case NKqpProto::TKqpPhyTx::TYPE_SCAN: - return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation); + return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerDelayToRetryMs); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 43ee7a96981..75f50425cfd 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -375,7 +375,7 @@ protected: } } - void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) { + virtual void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) { ui32 eventType = ev->Get()->SourceType; auto reason = ev->Get()->Reason; switch (eventType) { @@ -1142,7 +1142,8 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation); + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, ui32 executerDelayToRetryMs); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 6dbd70de951..cbbc58f8f6f 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -6,7 +6,6 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/wilson.h> -#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h> #include <util/generic/set.h> @@ -23,14 +22,15 @@ using namespace NYql; // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; -TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks, +TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, - bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId) + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, + TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot) : TxId(txId) , ExecuterId(executer) - , Tasks(std::move(tasks)) + , ComputeTasks(std::move(computeTasks)) , ScanTasks(std::move(scanTasks)) , Snapshot(snapshot) , Database(database) @@ -41,7 +41,8 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: , EnableLlvm(enableLlvm) , WithSpilling(withSpilling) , RlPath(rlPath) - , KqpPlannerSpan(TWilsonKqp::KqpPlanner, std::move(traceId), "KqpPlanner") + , ResourcesSnapshot(std::move(resourcesSnapshot)) + , ExecuterSpan(executerSpan) { if (!Database) { // a piece of magic for tests @@ -52,85 +53,61 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: } } -void TKqpPlanner::Bootstrap(const TActorContext&) { - GetKqpResourceManager()->RequestClusterResourcesInfo( - [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); - as->Send(eh); - }); +bool TKqpPlanner::SendKqpTasksRequest(ui32 requestId, const TActorId& target) { + auto& requestData = Requests[requestId]; - Become(&TKqpPlanner::WaitState); -} - -void TKqpPlanner::WaitState(TAutoPtr<IEventHandle>& ev, const TActorContext&) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvPrivate::TEvResourcesSnapshot, HandleWait); - hFunc(TEvKqp::TEvAbortExecution, HandleWait); - default: - LOG_C("Unexpected event type: " << ev->GetTypeRewrite() << " at Wait state" - << ", event: " << (ev->HasEvent() ? ev->GetBase()->ToString().data() : "<serialized>")); + if (requestData.RetryNumber == 3) { + return false; } -} -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -/// Wait State -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void TKqpPlanner::HandleWait(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { - if (ev->Get()->Snapshot.empty()) { - LOG_E("Can not find default state storage group for database " << Database); - RunLocal(ev->Get()->Snapshot); - return; + auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>(); + ev->Record = requestData.request; + + if (requestData.RetryNumber == 1) { + LOG_D("Try to retry by ActorUnknown reason, nodeId: " << target.NodeId() << ", requestId: " << requestId); + } else if (requestData.RetryNumber == 2) { + TMaybe<ui32> targetNode; + for (size_t i = 0; i < ResourcesSnapshot.size(); ++i) { + if (!TrackingNodes.contains(ResourcesSnapshot[i].nodeid())) { + targetNode = ResourcesSnapshot[i].nodeid(); + break; + } + } + if (targetNode) { + LOG_D("Try to retry to another node, nodeId: " << *targetNode << ", requestId: " << requestId); + auto anotherTarget = MakeKqpNodeServiceID(*targetNode); + TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.Release(), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, requestId, nullptr, ExecuterSpan.GetTraceId())); + requestData.RetryNumber++; + return true; + } + LOG_E("Retry failed because all nodes are busy, requestId: " << requestId); + return false; } + requestData.RetryNumber++; - Process(ev->Get()->Snapshot); + TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.Release(), + requestData.flag, requestId, nullptr, ExecuterSpan.GetTraceId())); + return true; } -void TKqpPlanner::HandleWait(TEvKqp::TEvAbortExecution::TPtr& ev) { - LOG_E("Terminate KqpPlanner, reason: " << ev->Get()->GetIssues().ToOneLineString()); - PassAway(); -} - -void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) { - auto rmConfig = GetKqpResourceManager()->GetConfig(); - - ui32 tasksCount = Tasks.size(); - for (auto& [shardId, tasks] : ScanTasks) { - tasksCount += tasks.size(); - } - - TVector<TTaskResourceEstimation> est; - est.resize(tasksCount); - - ui64 localRunMemoryEst = 0; - - ui64 i = 0; - for (auto& task : Tasks) { - EstimateTaskResources(task, rmConfig, est[i]); - localRunMemoryEst += est[i].TotalMemoryLimit; - i++; - } - if (auto it = ScanTasks.find(SelfId().NodeId()); it != ScanTasks.end()) { - for (auto& task : it->second) { - EstimateTaskResources(task, rmConfig, est[i]); - localRunMemoryEst += est[i].TotalMemoryLimit; - i++; - } - } +void TKqpPlanner::Process() { + PrepareToProcess(); auto localResources = GetKqpResourceManager()->GetLocalResources(); - if (localRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && - tasksCount <= localResources.ExecutionUnits) + if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && + ResourceEstimations.size() <= localResources.ExecutionUnits) { - RunLocal(snapshot); + RunLocal(ResourcesSnapshot); return; } - if (snapshot.empty() || (snapshot.size() == 1 && snapshot[0].GetNodeId() == SelfId().NodeId())) { + if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) { // try to run without memory overflow settings - if (localRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && - tasksCount <= localResources.ExecutionUnits) + if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && + ResourceEstimations.size() <= localResources.ExecutionUnits) { - RunLocal(snapshot); + RunLocal(ResourcesSnapshot); return; } @@ -138,12 +115,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, "Not enough resources to execute query locally and no information about other nodes"); - if (KqpPlannerSpan) { - KqpPlannerSpan.EndError("Not enough resources to execute query locally and no information about other nodes"); - } - - Send(ExecuterId, ev.Release()); - PassAway(); + TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); return; } @@ -155,22 +127,24 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot } THashMap<ui64, size_t> nodeIdtoIdx; - for (size_t idx = 0; idx < snapshot.size(); ++idx) { - nodeIdtoIdx[snapshot[idx].nodeid()] = idx; + for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) { + nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx; } - auto plan = planner->Plan(snapshot, std::move(est)); + auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations); long requestsCnt = 0; if (!plan.empty()) { for (auto& group : plan) { - auto ev = PrepareKqpNodeRequest(group.TaskIds); - AddScansToKqpNodeRequest(ev, group.NodeId); + auto& requestData = Requests.emplace_back(); + PrepareKqpNodeRequest(requestData.request, THashSet<ui64>(group.TaskIds.begin(), group.TaskIds.end())); + AddScansToKqpNodeRequest(requestData.request, group.NodeId); auto target = MakeKqpNodeServiceID(group.NodeId); - TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); + requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + + SendKqpTasksRequest(Requests.size() - 1, target); ++requestsCnt; } @@ -181,13 +155,14 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot } for (ui64 nodeId: nodes) { - auto ev = PrepareKqpNodeRequest({}); - AddScansToKqpNodeRequest(ev, nodeId); + auto& requestData = Requests.emplace_back(); + PrepareKqpNodeRequest(requestData.request, {}); + AddScansToKqpNodeRequest(requestData.request, nodeId); auto target = MakeKqpNodeServiceID(nodeId); - LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); - TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()), nodeId, nullptr, KqpPlannerSpan.GetTraceId())); + requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); + SendKqpTasksRequest(Requests.size() - 1, target); ++requestsCnt; } Y_VERIFY(ScanTasks.empty()); @@ -195,19 +170,44 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, "Not enough resources to execute query"); - if (KqpPlannerSpan) { - KqpPlannerSpan.EndError("Not enough resources to execute query"); - } + TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); + } - Send(ExecuterId, ev.Release()); + if (ExecuterSpan) { + ExecuterSpan.Attribute("requestsCnt", requestsCnt); } +} - if (KqpPlannerSpan) { - KqpPlannerSpan.Attribute("RequestsCnt", requestsCnt); - KqpPlannerSpan.EndOk(); +void TKqpPlanner::PrepareToProcess() { + auto rmConfig = GetKqpResourceManager()->GetConfig(); + + ui32 tasksCount = ComputeTasks.size(); + for (auto& [shardId, tasks] : ScanTasks) { + tasksCount += tasks.size(); } - PassAway(); + ResourceEstimations.resize(tasksCount); + LocalRunMemoryEst = 0; + + for (size_t i = 0; i < ComputeTasks.size(); ++i) { + EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i]); + LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit; + } + if (auto it = ScanTasks.find(ExecuterId.NodeId()); it != ScanTasks.end()) { + for (size_t i = 0; i < it->second.size(); ++i) { + EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()]); + LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit; + } + } + Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; }); +} + +ui64 TKqpPlanner::GetComputeTasksNumber() const { + return ComputeTasks.size(); +} + +ui64 TKqpPlanner::GetScanTasksNumber() const { + return ScanTasks.size(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -216,18 +216,21 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) { LOG_D("Execute query locally"); - auto ev = PrepareKqpNodeRequest({}); - AddScansToKqpNodeRequest(ev, SelfId().NodeId()); + auto& requestData = Requests.emplace_back(); + PrepareKqpNodeRequest(requestData.request, {}); + AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId()); + + auto target = MakeKqpNodeServiceID(ExecuterId.NodeId()); + requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); + SendKqpTasksRequest(Requests.size() - 1, target); - auto target = MakeKqpNodeServiceID(SelfId().NodeId()); - LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); - TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery, 0, nullptr, KqpPlannerSpan.GetTraceId())); long requestsCnt = 1; TVector<ui64> nodes; for (const auto& pair: ScanTasks) { nodes.push_back(pair.first); - YQL_ENSURE(pair.first != SelfId().NodeId()); + YQL_ENSURE(pair.first != ExecuterId.NodeId()); } THashMap<ui64, size_t> nodeIdToIdx; @@ -237,70 +240,64 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho } for (auto nodeId: nodes) { - auto ev = PrepareKqpNodeRequest({}); - AddScansToKqpNodeRequest(ev, nodeId); + auto& requestData = Requests.emplace_back(); + PrepareKqpNodeRequest(requestData.request, {}); + AddScansToKqpNodeRequest(requestData.request, nodeId); + auto target = MakeKqpNodeServiceID(nodeId); - TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); - ++requestsCnt; + requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); + SendKqpTasksRequest(Requests.size() - 1, target); + + requestsCnt++; } Y_VERIFY(ScanTasks.size() == 0); - if (KqpPlannerSpan) { - KqpPlannerSpan.Attribute("requestsCnt", requestsCnt); - KqpPlannerSpan.EndOk(); + if (ExecuterSpan) { + ExecuterSpan.Attribute("requestsCnt", requestsCnt); } - - PassAway(); -} - -void TKqpPlanner::PassAway() { - TBase::PassAway(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest(const TVector<ui64>& taskIds) { - auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>(); - - ev->Record.SetTxId(TxId); - ActorIdToProto(ExecuterId, ev->Record.MutableExecuterActorId()); +void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds) { + request.SetTxId(TxId); + ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); bool withLLVM = EnableLlvm; if (taskIds.empty()) { - for (auto& taskDesc : Tasks) { + for (auto& taskDesc : ComputeTasks) { if (taskDesc.GetId()) { if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } AddSnapshotInfoToTaskInputs(taskDesc); - ev->Record.AddTasks()->Swap(&taskDesc); + request.AddTasks()->Swap(&taskDesc); } } } else { - for (auto& taskDesc : Tasks) { + for (auto& taskDesc : ComputeTasks) { if (taskDesc.GetId() && Find(taskIds, taskDesc.GetId()) != taskIds.end()) { if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } AddSnapshotInfoToTaskInputs(taskDesc); - ev->Record.AddTasks()->Swap(&taskDesc); + request.AddTasks()->Swap(&taskDesc); } } } if (Deadline) { TDuration timeout = Deadline - TAppData::TimeProvider->Now(); - ev->Record.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); + request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); } - ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN); - ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); - ev->Record.MutableRuntimeSettings()->SetUseLLVM(withLLVM); - ev->Record.MutableRuntimeSettings()->SetUseSpilling(WithSpilling); + request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN); + request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); + request.MutableRuntimeSettings()->SetUseLLVM(withLLVM); + request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling); if (RlPath) { - auto rlPath = ev->Record.MutableRuntimeSettings()->MutableRlPath(); + auto rlPath = request.MutableRuntimeSettings()->MutableRlPath(); rlPath->SetCoordinationNode(RlPath->GetCoordinationNode()); rlPath->SetResourcePath(RlPath->GetResourcePath()); rlPath->SetDatabase(Database); @@ -308,12 +305,10 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest( rlPath->SetToken(UserToken.GetRef()); } - ev->Record.SetStartAllOrFail(true); - - return ev; + request.SetStartAllOrFail(true); } -void TKqpPlanner::AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksRequest>& ev, ui64 nodeId) { +void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) { if (!Snapshot.IsValid()) { Y_ASSERT(ScanTasks.size() == 0); return; @@ -323,21 +318,21 @@ void TKqpPlanner::AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksR if (auto nodeTasks = ScanTasks.FindPtr(nodeId)) { LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request"); - ev->Record.MutableSnapshot()->SetTxId(Snapshot.TxId); - ev->Record.MutableSnapshot()->SetStep(Snapshot.Step); + request.MutableSnapshot()->SetTxId(Snapshot.TxId); + request.MutableSnapshot()->SetStep(Snapshot.Step); for (auto& task: *nodeTasks) { if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } AddSnapshotInfoToTaskInputs(task); - ev->Record.AddTasks()->Swap(&task); + request.AddTasks()->Swap(&task); } ScanTasks.erase(nodeId); } - if (ev->Record.GetRuntimeSettings().GetUseLLVM()) { - ev->Record.MutableRuntimeSettings()->SetUseLLVM(withLLVM); + if (request.GetRuntimeSettings().GetUseLLVM()) { + request.MutableRuntimeSettings()->SetUseLLVM(withLLVM); } } @@ -393,14 +388,15 @@ void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks, - THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, - const TString& database, const TMaybe<TString>& token, TInstant deadline, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, + THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, + const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, - bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId) + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, + TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot) { - return new TKqpPlanner(txId, executer, std::move(tasks), std::move(scanTasks), snapshot, - database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, std::move(traceId)); + return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(scanTasks), snapshot, + database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, std::move(resourcesSnapshot)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 9e8b310f6fc..74ef4e4c1db 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -4,6 +4,7 @@ #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> +#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/wilson/wilson_span.h> @@ -15,20 +16,12 @@ namespace NKikimr::NKqp { -class TKqpPlanner : public TActorBootstrapped<TKqpPlanner> { - using TBase = TActorBootstrapped<TKqpPlanner>; +class TKqpPlanner { - struct TEvPrivate { - enum EEv { - EvResourcesSnapshot = EventSpaceBegin(TEvents::ES_PRIVATE) - }; - - struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { - TVector<NKikimrKqp::TKqpNodeResources> Snapshot; - - TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) - : Snapshot(std::move(snapshot)) {} - }; + struct RequestData { + NKikimrKqp::TEvStartKqpTasksRequest request; + ui32 flag; + ui32 RetryNumber = 0; }; public: @@ -36,23 +29,21 @@ public: THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, - bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId); + bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, + TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot); + bool SendKqpTasksRequest(ui32 requestId, const TActorId& target); - void Bootstrap(const TActorContext& ctx); + void Process(); + ui64 GetComputeTasksNumber() const; + ui64 GetScanTasksNumber() const; private: - STATEFN(WaitState); + void PrepareToProcess(); - void HandleWait(TEvPrivate::TEvResourcesSnapshot::TPtr& ev); - void HandleWait(TEvKqp::TEvAbortExecution::TPtr& ev); - - void Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot); void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot); - void PassAway() override; - - THolder<TEvKqpNode::TEvStartKqpTasksRequest> PrepareKqpNodeRequest(const TVector<ui64>& taskIds); - void AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksRequest>& ev, ui64 nodeId); + void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds); + void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId); void AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task); ui32 CalcSendMessageFlagsForNode(ui32 nodeId); @@ -60,7 +51,7 @@ private: private: const ui64 TxId; const TActorId ExecuterId; - TVector<NYql::NDqProto::TDqTask> Tasks; + TVector<NYql::NDqProto::TDqTask> ComputeTasks; THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> ScanTasks; const IKqpGateway::TKqpSnapshot Snapshot; TString Database; @@ -72,13 +63,18 @@ private: const bool WithSpilling; const TMaybe<NKikimrKqp::TRlPath> RlPath; THashSet<ui32> TrackingNodes; - NWilson::TSpan KqpPlannerSpan; + const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot; + NWilson::TSpan& ExecuterSpan; + ui64 LocalRunMemoryEst; + TVector<TTaskResourceEstimation> ResourceEstimations; + TVector<RequestData> Requests; }; -IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TMaybe<TString>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, - bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId = {}); + bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, + TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp index b0f323dbc24..07c1457cdec 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp @@ -22,7 +22,7 @@ public: ~TKqpGreedyPlanner() override {} TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources, - TVector<TTaskResourceEstimation>&& tasks) override + const TVector<TTaskResourceEstimation>& tasks) override { TVector<TResult> result; @@ -58,15 +58,13 @@ public: } } - Sort(tasks, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; }); - if (LogFunc) { - for (auto& task : tasks) { + for (const auto& task : tasks) { LogFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit); } } - for (auto& taskEstimation : tasks) { + for (const auto& taskEstimation : tasks) { TNodeDesc node = nodes.top(); if (node.RemainsComputeActors > 0 && diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h index 6e61114cb01..a0733fd4116 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h +++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h @@ -24,7 +24,7 @@ public: }; virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources, - TVector<TTaskResourceEstimation>&& estimatedResources) = 0; + const TVector<TTaskResourceEstimation>& estimatedResources) = 0; protected: TLogFunc LogFunc; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 5bea057897e..a80a87d6f64 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -40,15 +40,42 @@ namespace { class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> { using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>; + struct TEvPrivate { + enum EEv { + EvResourcesSnapshot = EventSpaceBegin(TEvents::ES_PRIVATE), + EvRetry + }; + + struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { + TVector<NKikimrKqp::TKqpNodeResources> Snapshot; + + TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) + : Snapshot(std::move(snapshot)) {} + }; + + struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> { + ui32 RequestId; + TActorId Target; + + TEvRetry(ui64 requestId, const TActorId& target) + : RequestId(requestId) + , Target(target) {} + }; + }; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; } TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation) + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, + ui32 executerDelayToRetryMs) : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter") , AggregationSettings(aggregation) + , Planner(nullptr) + , ExecuterDelayToRetryMs(executerDelayToRetryMs) { YQL_ENSURE(Request.Transactions.size() == 1); YQL_ENSURE(Request.DataShardLocks.empty()); @@ -77,6 +104,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); + hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); hFunc(TEvents::TEvWakeup, HandleTimeout); default: @@ -98,6 +126,7 @@ private: hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); hFunc(TEvents::TEvWakeup, HandleTimeout); hFunc(TEvents::TEvUndelivered, HandleUndelivered); + hFunc(TEvPrivate::TEvRetry, HandleRetry); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); IgnoreFunc(TEvKqpNode::TEvCancelKqpTasksResponse); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); @@ -405,6 +434,14 @@ private: } } + void GetResourcesSnapshot() { + GetKqpResourceManager()->RequestClusterResourcesInfo( + [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); + as->Send(eh); + }); + } + void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; TSet<ui64> shardIds; @@ -420,17 +457,24 @@ private: auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds)); KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver); } else { - Execute(); + GetResourcesSnapshot(); } } - void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; - Execute(); + GetResourcesSnapshot(); } - void Execute() { + void HandleResolve(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { + if (ev->Get()->Snapshot.empty()) { + LOG_E("Can not find default state storage group for database " << Database); + } + + Execute(std::move(ev->Get()->Snapshot)); + } + + void Execute(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { LWTRACK(KqpScanExecuterStartExecute, ResponseEv->Orbit, TxId); NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); @@ -648,7 +692,7 @@ private: << ", totalShardScans: " << nShardScans << ", execType: Scan" << ", snapshot: {" << Request.Snapshot.TxId << ", " << Request.Snapshot.Step << "}"); - ExecuteScanTx(std::move(computeTasks), std::move(scanTasks)); + ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot)); Become(&TKqpScanExecuter::ExecuteState); if (ExecuterStateSpan) { @@ -688,9 +732,9 @@ public: } private: - void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) { + void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, + TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), scanTasks.size()); - LOG_D("Execute scan tx, computeTasks: " << computeTasks.size() << ", scanTasks: " << scanTasks.size()); for (const auto& [_, tasks]: scanTasks) { for (const auto& task : tasks) { PendingComputeTasks.insert(task.GetId()); @@ -701,11 +745,42 @@ private: PendingComputeTasks.insert(taskDesc.GetId()); } - auto planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks), + Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks), std::move(scanTasks), Request.Snapshot, Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, - Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan.GetTraceId()); - RegisterWithSameMailbox(planner); + Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot)); + LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetScanTasksNumber()); + + Planner->Process(); + } + + void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) override { + ui32 eventType = ev->Get()->SourceType; + auto reason = ev->Get()->Reason; + switch (eventType) { + case TEvKqpNode::TEvStartKqpTasksRequest::EventType: { + if (reason == TEvents::TEvUndelivered::EReason::ReasonActorUnknown) { + LOG_D("Schedule a retry by ActorUnknown reason, nodeId:" << ev->Sender.NodeId() << " requestId: " << ev->Cookie); + Schedule(TDuration::MilliSeconds(ExecuterDelayToRetryMs), new TEvPrivate::TEvRetry(ev->Cookie, ev->Sender)); + return; + } + InvalidateNode(ev->Sender.NodeId()); + return InternalError(TStringBuilder() + << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason); + } + default: { + LOG_E("Event lost, type: " << eventType << ", reason: " << reason); + } + } + } + + void HandleRetry(TEvPrivate::TEvRetry::TPtr& ev) { + if (Planner->SendKqpTasksRequest(ev->Get()->RequestId, ev->Get()->Target)) { + return; + } + InvalidateNode(Target.NodeId()); + return InternalError(TStringBuilder() + << "TEvKqpNode::TEvStartKqpTasksRequest lost: ActorUnknown"); } private: @@ -769,15 +844,20 @@ public: channelDesc.SetIsPersistent(IsCrossShardChannel(TasksGraph, channel)); channelDesc.SetInMemory(channel.InMemory); } +private: const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; + std::unique_ptr<TKqpPlanner> Planner; + ui32 ExecuterDelayToRetryMs; }; } // namespace IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation) + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, + ui32 executerDelayToRetryMs) { - return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation); + return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerDelayToRetryMs); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 8e087bace41..bc5fce41553 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1290,7 +1290,7 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, (QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(), - RequestCounters, Settings.Service.GetAggregationConfig()); + RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterDelayToRetryMs()); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2ac9d4fb68b..a3d1e3184c3 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1236,6 +1236,7 @@ message TTableServiceConfig { optional TAggregationConfig AggregationConfig = 29; optional bool EnableKqpScanQueryStreamLookup = 30 [default = false]; optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; + optional uint32 ExecuterDelayToRetryMs = 32 [default = 200]; }; // Config describes immediate controls and allows |