aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-02-09 13:17:26 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-02-09 13:17:26 +0300
commit016ae54b59f8e01d21a2d1573fb3e5ab39e734ee (patch)
tree143107ef698cd010b814046465dee80783ba5eff
parentaecae6efeea7d5c240e51e4652fd7d7f40d0a874 (diff)
downloadydb-016ae54b59f8e01d21a2d1573fb3e5ab39e734ee.tar.gz
add retries to TKqpScanExecuter
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp286
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h52
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner_strategy.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp106
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp2
-rw-r--r--ydb/core/protos/config.proto1
10 files changed, 272 insertions, 198 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 5c09f69138f..1d6d70929eb 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -84,7 +84,8 @@ struct TEvKqpExecuter {
};
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation);
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, ui32 executerDelayToRetryMs);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 79ff9fcdb41..8e4a08be433 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -145,7 +145,8 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation)
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
+ ui32 executerDelayToRetryMs)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
@@ -169,7 +170,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
- return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation);
+ return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerDelayToRetryMs);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 43ee7a96981..75f50425cfd 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -375,7 +375,7 @@ protected:
}
}
- void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
+ virtual void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
ui32 eventType = ev->Get()->SourceType;
auto reason = ev->Get()->Reason;
switch (eventType) {
@@ -1142,7 +1142,8 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation);
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, ui32 executerDelayToRetryMs);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 6dbd70de951..cbbc58f8f6f 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -6,7 +6,6 @@
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/wilson.h>
-#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
#include <util/generic/set.h>
@@ -23,14 +22,15 @@ using namespace NYql;
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
-TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks,
+TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks,
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId)
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
+ TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot)
: TxId(txId)
, ExecuterId(executer)
- , Tasks(std::move(tasks))
+ , ComputeTasks(std::move(computeTasks))
, ScanTasks(std::move(scanTasks))
, Snapshot(snapshot)
, Database(database)
@@ -41,7 +41,8 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
, EnableLlvm(enableLlvm)
, WithSpilling(withSpilling)
, RlPath(rlPath)
- , KqpPlannerSpan(TWilsonKqp::KqpPlanner, std::move(traceId), "KqpPlanner")
+ , ResourcesSnapshot(std::move(resourcesSnapshot))
+ , ExecuterSpan(executerSpan)
{
if (!Database) {
// a piece of magic for tests
@@ -52,85 +53,61 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
}
}
-void TKqpPlanner::Bootstrap(const TActorContext&) {
- GetKqpResourceManager()->RequestClusterResourcesInfo(
- [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
- as->Send(eh);
- });
+bool TKqpPlanner::SendKqpTasksRequest(ui32 requestId, const TActorId& target) {
+ auto& requestData = Requests[requestId];
- Become(&TKqpPlanner::WaitState);
-}
-
-void TKqpPlanner::WaitState(TAutoPtr<IEventHandle>& ev, const TActorContext&) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvPrivate::TEvResourcesSnapshot, HandleWait);
- hFunc(TEvKqp::TEvAbortExecution, HandleWait);
- default:
- LOG_C("Unexpected event type: " << ev->GetTypeRewrite() << " at Wait state"
- << ", event: " << (ev->HasEvent() ? ev->GetBase()->ToString().data() : "<serialized>"));
+ if (requestData.RetryNumber == 3) {
+ return false;
}
-}
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-/// Wait State
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-void TKqpPlanner::HandleWait(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
- if (ev->Get()->Snapshot.empty()) {
- LOG_E("Can not find default state storage group for database " << Database);
- RunLocal(ev->Get()->Snapshot);
- return;
+ auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>();
+ ev->Record = requestData.request;
+
+ if (requestData.RetryNumber == 1) {
+ LOG_D("Try to retry by ActorUnknown reason, nodeId: " << target.NodeId() << ", requestId: " << requestId);
+ } else if (requestData.RetryNumber == 2) {
+ TMaybe<ui32> targetNode;
+ for (size_t i = 0; i < ResourcesSnapshot.size(); ++i) {
+ if (!TrackingNodes.contains(ResourcesSnapshot[i].nodeid())) {
+ targetNode = ResourcesSnapshot[i].nodeid();
+ break;
+ }
+ }
+ if (targetNode) {
+ LOG_D("Try to retry to another node, nodeId: " << *targetNode << ", requestId: " << requestId);
+ auto anotherTarget = MakeKqpNodeServiceID(*targetNode);
+ TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.Release(),
+ IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, requestId, nullptr, ExecuterSpan.GetTraceId()));
+ requestData.RetryNumber++;
+ return true;
+ }
+ LOG_E("Retry failed because all nodes are busy, requestId: " << requestId);
+ return false;
}
+ requestData.RetryNumber++;
- Process(ev->Get()->Snapshot);
+ TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.Release(),
+ requestData.flag, requestId, nullptr, ExecuterSpan.GetTraceId()));
+ return true;
}
-void TKqpPlanner::HandleWait(TEvKqp::TEvAbortExecution::TPtr& ev) {
- LOG_E("Terminate KqpPlanner, reason: " << ev->Get()->GetIssues().ToOneLineString());
- PassAway();
-}
-
-void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) {
- auto rmConfig = GetKqpResourceManager()->GetConfig();
-
- ui32 tasksCount = Tasks.size();
- for (auto& [shardId, tasks] : ScanTasks) {
- tasksCount += tasks.size();
- }
-
- TVector<TTaskResourceEstimation> est;
- est.resize(tasksCount);
-
- ui64 localRunMemoryEst = 0;
-
- ui64 i = 0;
- for (auto& task : Tasks) {
- EstimateTaskResources(task, rmConfig, est[i]);
- localRunMemoryEst += est[i].TotalMemoryLimit;
- i++;
- }
- if (auto it = ScanTasks.find(SelfId().NodeId()); it != ScanTasks.end()) {
- for (auto& task : it->second) {
- EstimateTaskResources(task, rmConfig, est[i]);
- localRunMemoryEst += est[i].TotalMemoryLimit;
- i++;
- }
- }
+void TKqpPlanner::Process() {
+ PrepareToProcess();
auto localResources = GetKqpResourceManager()->GetLocalResources();
- if (localRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
- tasksCount <= localResources.ExecutionUnits)
+ if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
+ ResourceEstimations.size() <= localResources.ExecutionUnits)
{
- RunLocal(snapshot);
+ RunLocal(ResourcesSnapshot);
return;
}
- if (snapshot.empty() || (snapshot.size() == 1 && snapshot[0].GetNodeId() == SelfId().NodeId())) {
+ if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
// try to run without memory overflow settings
- if (localRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
- tasksCount <= localResources.ExecutionUnits)
+ if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
+ ResourceEstimations.size() <= localResources.ExecutionUnits)
{
- RunLocal(snapshot);
+ RunLocal(ResourcesSnapshot);
return;
}
@@ -138,12 +115,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query locally and no information about other nodes");
- if (KqpPlannerSpan) {
- KqpPlannerSpan.EndError("Not enough resources to execute query locally and no information about other nodes");
- }
-
- Send(ExecuterId, ev.Release());
- PassAway();
+ TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()));
return;
}
@@ -155,22 +127,24 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
}
THashMap<ui64, size_t> nodeIdtoIdx;
- for (size_t idx = 0; idx < snapshot.size(); ++idx) {
- nodeIdtoIdx[snapshot[idx].nodeid()] = idx;
+ for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) {
+ nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
}
- auto plan = planner->Plan(snapshot, std::move(est));
+ auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
long requestsCnt = 0;
if (!plan.empty()) {
for (auto& group : plan) {
- auto ev = PrepareKqpNodeRequest(group.TaskIds);
- AddScansToKqpNodeRequest(ev, group.NodeId);
+ auto& requestData = Requests.emplace_back();
+ PrepareKqpNodeRequest(requestData.request, THashSet<ui64>(group.TaskIds.begin(), group.TaskIds.end()));
+ AddScansToKqpNodeRequest(requestData.request, group.NodeId);
auto target = MakeKqpNodeServiceID(group.NodeId);
- TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+
+ SendKqpTasksRequest(Requests.size() - 1, target);
++requestsCnt;
}
@@ -181,13 +155,14 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
}
for (ui64 nodeId: nodes) {
- auto ev = PrepareKqpNodeRequest({});
- AddScansToKqpNodeRequest(ev, nodeId);
+ auto& requestData = Requests.emplace_back();
+ PrepareKqpNodeRequest(requestData.request, {});
+ AddScansToKqpNodeRequest(requestData.request, nodeId);
auto target = MakeKqpNodeServiceID(nodeId);
- LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
- TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId()), nodeId, nullptr, KqpPlannerSpan.GetTraceId()));
+ requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+ LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId);
+ SendKqpTasksRequest(Requests.size() - 1, target);
++requestsCnt;
}
Y_VERIFY(ScanTasks.empty());
@@ -195,19 +170,44 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query");
- if (KqpPlannerSpan) {
- KqpPlannerSpan.EndError("Not enough resources to execute query");
- }
+ TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()));
+ }
- Send(ExecuterId, ev.Release());
+ if (ExecuterSpan) {
+ ExecuterSpan.Attribute("requestsCnt", requestsCnt);
}
+}
- if (KqpPlannerSpan) {
- KqpPlannerSpan.Attribute("RequestsCnt", requestsCnt);
- KqpPlannerSpan.EndOk();
+void TKqpPlanner::PrepareToProcess() {
+ auto rmConfig = GetKqpResourceManager()->GetConfig();
+
+ ui32 tasksCount = ComputeTasks.size();
+ for (auto& [shardId, tasks] : ScanTasks) {
+ tasksCount += tasks.size();
}
- PassAway();
+ ResourceEstimations.resize(tasksCount);
+ LocalRunMemoryEst = 0;
+
+ for (size_t i = 0; i < ComputeTasks.size(); ++i) {
+ EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i]);
+ LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit;
+ }
+ if (auto it = ScanTasks.find(ExecuterId.NodeId()); it != ScanTasks.end()) {
+ for (size_t i = 0; i < it->second.size(); ++i) {
+ EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()]);
+ LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit;
+ }
+ }
+ Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; });
+}
+
+ui64 TKqpPlanner::GetComputeTasksNumber() const {
+ return ComputeTasks.size();
+}
+
+ui64 TKqpPlanner::GetScanTasksNumber() const {
+ return ScanTasks.size();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -216,18 +216,21 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) {
LOG_D("Execute query locally");
- auto ev = PrepareKqpNodeRequest({});
- AddScansToKqpNodeRequest(ev, SelfId().NodeId());
+ auto& requestData = Requests.emplace_back();
+ PrepareKqpNodeRequest(requestData.request, {});
+ AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId());
+
+ auto target = MakeKqpNodeServiceID(ExecuterId.NodeId());
+ requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+ LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId);
+ SendKqpTasksRequest(Requests.size() - 1, target);
- auto target = MakeKqpNodeServiceID(SelfId().NodeId());
- LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
- TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery, 0, nullptr, KqpPlannerSpan.GetTraceId()));
long requestsCnt = 1;
TVector<ui64> nodes;
for (const auto& pair: ScanTasks) {
nodes.push_back(pair.first);
- YQL_ENSURE(pair.first != SelfId().NodeId());
+ YQL_ENSURE(pair.first != ExecuterId.NodeId());
}
THashMap<ui64, size_t> nodeIdToIdx;
@@ -237,70 +240,64 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
}
for (auto nodeId: nodes) {
- auto ev = PrepareKqpNodeRequest({});
- AddScansToKqpNodeRequest(ev, nodeId);
+ auto& requestData = Requests.emplace_back();
+ PrepareKqpNodeRequest(requestData.request, {});
+ AddScansToKqpNodeRequest(requestData.request, nodeId);
+
auto target = MakeKqpNodeServiceID(nodeId);
- TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
- ++requestsCnt;
+ requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+ SendKqpTasksRequest(Requests.size() - 1, target);
+
+ requestsCnt++;
}
Y_VERIFY(ScanTasks.size() == 0);
- if (KqpPlannerSpan) {
- KqpPlannerSpan.Attribute("requestsCnt", requestsCnt);
- KqpPlannerSpan.EndOk();
+ if (ExecuterSpan) {
+ ExecuterSpan.Attribute("requestsCnt", requestsCnt);
}
-
- PassAway();
-}
-
-void TKqpPlanner::PassAway() {
- TBase::PassAway();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest(const TVector<ui64>& taskIds) {
- auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>();
-
- ev->Record.SetTxId(TxId);
- ActorIdToProto(ExecuterId, ev->Record.MutableExecuterActorId());
+void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds) {
+ request.SetTxId(TxId);
+ ActorIdToProto(ExecuterId, request.MutableExecuterActorId());
bool withLLVM = EnableLlvm;
if (taskIds.empty()) {
- for (auto& taskDesc : Tasks) {
+ for (auto& taskDesc : ComputeTasks) {
if (taskDesc.GetId()) {
if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
AddSnapshotInfoToTaskInputs(taskDesc);
- ev->Record.AddTasks()->Swap(&taskDesc);
+ request.AddTasks()->Swap(&taskDesc);
}
}
} else {
- for (auto& taskDesc : Tasks) {
+ for (auto& taskDesc : ComputeTasks) {
if (taskDesc.GetId() && Find(taskIds, taskDesc.GetId()) != taskIds.end()) {
if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
AddSnapshotInfoToTaskInputs(taskDesc);
- ev->Record.AddTasks()->Swap(&taskDesc);
+ request.AddTasks()->Swap(&taskDesc);
}
}
}
if (Deadline) {
TDuration timeout = Deadline - TAppData::TimeProvider->Now();
- ev->Record.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
+ request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
}
- ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN);
- ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
- ev->Record.MutableRuntimeSettings()->SetUseLLVM(withLLVM);
- ev->Record.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
+ request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN);
+ request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
+ request.MutableRuntimeSettings()->SetUseLLVM(withLLVM);
+ request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
if (RlPath) {
- auto rlPath = ev->Record.MutableRuntimeSettings()->MutableRlPath();
+ auto rlPath = request.MutableRuntimeSettings()->MutableRlPath();
rlPath->SetCoordinationNode(RlPath->GetCoordinationNode());
rlPath->SetResourcePath(RlPath->GetResourcePath());
rlPath->SetDatabase(Database);
@@ -308,12 +305,10 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest(
rlPath->SetToken(UserToken.GetRef());
}
- ev->Record.SetStartAllOrFail(true);
-
- return ev;
+ request.SetStartAllOrFail(true);
}
-void TKqpPlanner::AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksRequest>& ev, ui64 nodeId) {
+void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) {
if (!Snapshot.IsValid()) {
Y_ASSERT(ScanTasks.size() == 0);
return;
@@ -323,21 +318,21 @@ void TKqpPlanner::AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksR
if (auto nodeTasks = ScanTasks.FindPtr(nodeId)) {
LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request");
- ev->Record.MutableSnapshot()->SetTxId(Snapshot.TxId);
- ev->Record.MutableSnapshot()->SetStep(Snapshot.Step);
+ request.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ request.MutableSnapshot()->SetStep(Snapshot.Step);
for (auto& task: *nodeTasks) {
if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
AddSnapshotInfoToTaskInputs(task);
- ev->Record.AddTasks()->Swap(&task);
+ request.AddTasks()->Swap(&task);
}
ScanTasks.erase(nodeId);
}
- if (ev->Record.GetRuntimeSettings().GetUseLLVM()) {
- ev->Record.MutableRuntimeSettings()->SetUseLLVM(withLLVM);
+ if (request.GetRuntimeSettings().GetUseLLVM()) {
+ request.MutableRuntimeSettings()->SetUseLLVM(withLLVM);
}
}
@@ -393,14 +388,15 @@ void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks,
- THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
- const TString& database, const TMaybe<TString>& token, TInstant deadline,
+std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
+ THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+ const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId)
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
+ TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot)
{
- return new TKqpPlanner(txId, executer, std::move(tasks), std::move(scanTasks), snapshot,
- database, token, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, std::move(traceId));
+ return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(scanTasks), snapshot,
+ database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, std::move(resourcesSnapshot));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 9e8b310f6fc..74ef4e4c1db 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
+#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/wilson/wilson_span.h>
@@ -15,20 +16,12 @@
namespace NKikimr::NKqp {
-class TKqpPlanner : public TActorBootstrapped<TKqpPlanner> {
- using TBase = TActorBootstrapped<TKqpPlanner>;
+class TKqpPlanner {
- struct TEvPrivate {
- enum EEv {
- EvResourcesSnapshot = EventSpaceBegin(TEvents::ES_PRIVATE)
- };
-
- struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
- TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
-
- TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
- : Snapshot(std::move(snapshot)) {}
- };
+ struct RequestData {
+ NKikimrKqp::TEvStartKqpTasksRequest request;
+ ui32 flag;
+ ui32 RetryNumber = 0;
};
public:
@@ -36,23 +29,21 @@ public:
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages,
- bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId);
+ bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
+ TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot);
+ bool SendKqpTasksRequest(ui32 requestId, const TActorId& target);
- void Bootstrap(const TActorContext& ctx);
+ void Process();
+ ui64 GetComputeTasksNumber() const;
+ ui64 GetScanTasksNumber() const;
private:
- STATEFN(WaitState);
+ void PrepareToProcess();
- void HandleWait(TEvPrivate::TEvResourcesSnapshot::TPtr& ev);
- void HandleWait(TEvKqp::TEvAbortExecution::TPtr& ev);
-
- void Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot);
void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot);
- void PassAway() override;
-
- THolder<TEvKqpNode::TEvStartKqpTasksRequest> PrepareKqpNodeRequest(const TVector<ui64>& taskIds);
- void AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksRequest>& ev, ui64 nodeId);
+ void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds);
+ void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId);
void AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task);
ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
@@ -60,7 +51,7 @@ private:
private:
const ui64 TxId;
const TActorId ExecuterId;
- TVector<NYql::NDqProto::TDqTask> Tasks;
+ TVector<NYql::NDqProto::TDqTask> ComputeTasks;
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> ScanTasks;
const IKqpGateway::TKqpSnapshot Snapshot;
TString Database;
@@ -72,13 +63,18 @@ private:
const bool WithSpilling;
const TMaybe<NKikimrKqp::TRlPath> RlPath;
THashSet<ui32> TrackingNodes;
- NWilson::TSpan KqpPlannerSpan;
+ const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
+ NWilson::TSpan& ExecuterSpan;
+ ui64 LocalRunMemoryEst;
+ TVector<TTaskResourceEstimation> ResourceEstimations;
+ TVector<RequestData> Requests;
};
-IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
+std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
- bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TTraceId traceId = {});
+ bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
+ TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
index b0f323dbc24..07c1457cdec 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
@@ -22,7 +22,7 @@ public:
~TKqpGreedyPlanner() override {}
TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
- TVector<TTaskResourceEstimation>&& tasks) override
+ const TVector<TTaskResourceEstimation>& tasks) override
{
TVector<TResult> result;
@@ -58,15 +58,13 @@ public:
}
}
- Sort(tasks, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; });
-
if (LogFunc) {
- for (auto& task : tasks) {
+ for (const auto& task : tasks) {
LogFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit);
}
}
- for (auto& taskEstimation : tasks) {
+ for (const auto& taskEstimation : tasks) {
TNodeDesc node = nodes.top();
if (node.RemainsComputeActors > 0 &&
diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h
index 6e61114cb01..a0733fd4116 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h
@@ -24,7 +24,7 @@ public:
};
virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
- TVector<TTaskResourceEstimation>&& estimatedResources) = 0;
+ const TVector<TTaskResourceEstimation>& estimatedResources) = 0;
protected:
TLogFunc LogFunc;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 5bea057897e..a80a87d6f64 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -40,15 +40,42 @@ namespace {
class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> {
using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>;
+ struct TEvPrivate {
+ enum EEv {
+ EvResourcesSnapshot = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvRetry
+ };
+
+ struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
+ TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
+
+ TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
+ : Snapshot(std::move(snapshot)) {}
+ };
+
+ struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> {
+ ui32 RequestId;
+ TActorId Target;
+
+ TEvRetry(ui64 requestId, const TActorId& target)
+ : RequestId(requestId)
+ , Target(target) {}
+ };
+ };
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
}
TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation)
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
+ ui32 executerDelayToRetryMs)
: TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter")
, AggregationSettings(aggregation)
+ , Planner(nullptr)
+ , ExecuterDelayToRetryMs(executerDelayToRetryMs)
{
YQL_ENSURE(Request.Transactions.size() == 1);
YQL_ENSURE(Request.DataShardLocks.empty());
@@ -77,6 +104,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
+ hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
hFunc(TEvents::TEvWakeup, HandleTimeout);
default:
@@ -98,6 +126,7 @@ private:
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
hFunc(TEvents::TEvWakeup, HandleTimeout);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
+ hFunc(TEvPrivate::TEvRetry, HandleRetry);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
IgnoreFunc(TEvKqpNode::TEvCancelKqpTasksResponse);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
@@ -405,6 +434,14 @@ private:
}
}
+ void GetResourcesSnapshot() {
+ GetKqpResourceManager()->RequestClusterResourcesInfo(
+ [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
+ as->Send(eh);
+ });
+ }
+
void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
TSet<ui64> shardIds;
@@ -420,17 +457,24 @@ private:
auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds));
KqpShardsResolverId = this->RegisterWithSameMailbox(kqpShardsResolver);
} else {
- Execute();
+ GetResourcesSnapshot();
}
}
-
void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
- Execute();
+ GetResourcesSnapshot();
}
- void Execute() {
+ void HandleResolve(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
+ if (ev->Get()->Snapshot.empty()) {
+ LOG_E("Can not find default state storage group for database " << Database);
+ }
+
+ Execute(std::move(ev->Get()->Snapshot));
+ }
+
+ void Execute(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
LWTRACK(KqpScanExecuterStartExecute, ResponseEv->Orbit, TxId);
NWilson::TSpan prepareTasksSpan(TWilsonKqp::ScanExecuterPrepareTasks, ExecuterStateSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
@@ -648,7 +692,7 @@ private:
<< ", totalShardScans: " << nShardScans << ", execType: Scan"
<< ", snapshot: {" << Request.Snapshot.TxId << ", " << Request.Snapshot.Step << "}");
- ExecuteScanTx(std::move(computeTasks), std::move(scanTasks));
+ ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot));
Become(&TKqpScanExecuter::ExecuteState);
if (ExecuterStateSpan) {
@@ -688,9 +732,9 @@ public:
}
private:
- void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks) {
+ void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks,
+ TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), scanTasks.size());
- LOG_D("Execute scan tx, computeTasks: " << computeTasks.size() << ", scanTasks: " << scanTasks.size());
for (const auto& [_, tasks]: scanTasks) {
for (const auto& task : tasks) {
PendingComputeTasks.insert(task.GetId());
@@ -701,11 +745,42 @@ private:
PendingComputeTasks.insert(taskDesc.GetId());
}
- auto planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks),
+ Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks),
std::move(scanTasks), Request.Snapshot,
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
- Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan.GetTraceId());
- RegisterWithSameMailbox(planner);
+ Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot));
+ LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetScanTasksNumber());
+
+ Planner->Process();
+ }
+
+ void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) override {
+ ui32 eventType = ev->Get()->SourceType;
+ auto reason = ev->Get()->Reason;
+ switch (eventType) {
+ case TEvKqpNode::TEvStartKqpTasksRequest::EventType: {
+ if (reason == TEvents::TEvUndelivered::EReason::ReasonActorUnknown) {
+ LOG_D("Schedule a retry by ActorUnknown reason, nodeId:" << ev->Sender.NodeId() << " requestId: " << ev->Cookie);
+ Schedule(TDuration::MilliSeconds(ExecuterDelayToRetryMs), new TEvPrivate::TEvRetry(ev->Cookie, ev->Sender));
+ return;
+ }
+ InvalidateNode(ev->Sender.NodeId());
+ return InternalError(TStringBuilder()
+ << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason);
+ }
+ default: {
+ LOG_E("Event lost, type: " << eventType << ", reason: " << reason);
+ }
+ }
+ }
+
+ void HandleRetry(TEvPrivate::TEvRetry::TPtr& ev) {
+ if (Planner->SendKqpTasksRequest(ev->Get()->RequestId, ev->Get()->Target)) {
+ return;
+ }
+ InvalidateNode(Target.NodeId());
+ return InternalError(TStringBuilder()
+ << "TEvKqpNode::TEvStartKqpTasksRequest lost: ActorUnknown");
}
private:
@@ -769,15 +844,20 @@ public:
channelDesc.SetIsPersistent(IsCrossShardChannel(TasksGraph, channel));
channelDesc.SetInMemory(channel.InMemory);
}
+private:
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
+ std::unique_ptr<TKqpPlanner> Planner;
+ ui32 ExecuterDelayToRetryMs;
};
} // namespace
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation)
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
+ ui32 executerDelayToRetryMs)
{
- return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation);
+ return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerDelayToRetryMs);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 8e087bace41..bc5fce41553 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1290,7 +1290,7 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
(QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(),
- RequestCounters, Settings.Service.GetAggregationConfig());
+ RequestCounters, Settings.Service.GetAggregationConfig(), Settings.Service.GetExecuterDelayToRetryMs());
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 2ac9d4fb68b..a3d1e3184c3 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1236,6 +1236,7 @@ message TTableServiceConfig {
optional TAggregationConfig AggregationConfig = 29;
optional bool EnableKqpScanQueryStreamLookup = 30 [default = false];
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
+ optional uint32 ExecuterDelayToRetryMs = 32 [default = 200];
};
// Config describes immediate controls and allows