aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-02-13 23:52:29 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-02-13 23:52:29 +0300
commit0a57f2296f811e91347ef317aaa555a32a4693f1 (patch)
tree52091067683c907d031ef6f4be7ca5646c6c20c3
parent59c29fbfd00b5c755e74098d19e1ae4a592edb60 (diff)
downloadydb-0a57f2296f811e91347ef317aaa555a32a4693f1.tar.gz
add retries to TKqpDataExecuter
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp54
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h60
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp88
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp63
-rw-r--r--ydb/core/protos/config.proto2
7 files changed, 140 insertions, 142 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 82c1b23ebe2..cf123827a02 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -48,19 +48,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
using TBase = TKqpExecuterBase<TKqpDataExecuter, EExecType::Data>;
using TKqpSnapshot = IKqpGateway::TKqpSnapshot;
- struct TEvPrivate {
- enum EEv {
- EvReattachToShard = EventSpaceBegin(TEvents::ES_PRIVATE),
- };
-
- struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> {
- const ui64 TabletId;
-
- explicit TEvReattachToShard(ui64 tabletId)
- : TabletId(tabletId) {}
- };
- };
-
struct TReattachState {
TDuration Delay;
TInstant Deadline;
@@ -136,8 +123,8 @@ public:
}
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult)
- : TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter")
+ TKqpRequestCounters::TPtr counters, bool streamResult, ui32 executerDelayToRetryMs)
+ : TBase(std::move(request), database, userToken, counters, executerDelayToRetryMs, TWilsonKqp::DataExecuter, "DataExecuter")
, StreamResult(streamResult)
{
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
@@ -788,6 +775,7 @@ private:
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
+ hFunc(TEvPrivate::TEvRetry, HandleRetry);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
@@ -2028,34 +2016,10 @@ private:
}
}
- for (auto& [nodeId, tasks] : tasksPerNode) {
- auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>();
-
- ev->Record.SetTxId(TxId);
- ActorIdToProto(SelfId(), ev->Record.MutableExecuterActorId());
-
- if (Deadline) {
- TDuration timeout = *Deadline - TAppData::TimeProvider->Now();
- ev->Record.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
- }
-
- ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA);
- ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(Request.StatsMode));
- ev->Record.MutableRuntimeSettings()->SetUseLLVM(false);
- ev->Record.SetStartAllOrFail(true);
-
- for (auto&& task : tasks) {
- ev->Record.AddTasks()->Swap(&task);
- }
-
- auto target = MakeKqpNodeServiceID(nodeId);
-
- ui32 flags = IEventHandle::FlagTrackDelivery;
- if (SubscribedNodes.emplace(nodeId).second) {
- flags |= IEventHandle::FlagSubscribeOnSession;
- }
- TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags, nodeId));
- }
+ Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), Request.Snapshot,
+ Database, Nothing(), Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
+ Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), ExecuterSpan, {});
+ Planner->ProcessTasksForDataExecuter();
// then start data tasks with known actor ids of compute tasks
for (auto& [shardId, shardTx] : DatashardTxs) {
@@ -2363,9 +2327,9 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters, bool streamResult)
+ TKqpRequestCounters::TPtr counters, bool streamResult, ui32 executerDelayToRetryMs)
{
- return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult);
+ return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerDelayToRetryMs);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 8e4a08be433..9bc27a4a152 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -151,7 +151,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.EraseLocks);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerDelayToRetryMs);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -167,13 +167,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerDelayToRetryMs);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerDelayToRetryMs);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerDelayToRetryMs);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 686207b901a..40c8f2a2808 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -2,6 +2,7 @@
#include "kqp_executer.h"
#include "kqp_executer_stats.h"
+#include "kqp_planner.h"
#include "kqp_partition_helper.h"
#include "kqp_table_resolver.h"
#include "kqp_shards_resolver.h"
@@ -76,14 +77,48 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
template <class TDerived, EExecType ExecType>
class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
+protected:
+ struct TEvPrivate {
+ enum EEv {
+ EvRetry = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvResourcesSnapshot,
+ EvReattachToShard,
+ };
+
+ struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> {
+ ui32 RequestId;
+ TActorId Target;
+
+ TEvRetry(ui64 requestId, const TActorId& target)
+ : RequestId(requestId)
+ , Target(target) {}
+ };
+
+ struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
+ TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
+
+ TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
+ : Snapshot(std::move(snapshot)) {}
+ };
+
+ struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> {
+ const ui64 TabletId;
+
+ explicit TEvReattachToShard(ui64 tabletId)
+ : TabletId(tabletId) {}
+ };
+ };
+
public:
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters, ui64 spanVerbosity = 0, TString spanName = "no_name")
+ TKqpRequestCounters::TPtr counters, ui32 executerDelayToRetryMs, ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
, Database(database)
, UserToken(userToken)
, Counters(counters)
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
+ , Planner(nullptr)
+ , ExecuterDelayToRetryMs(executerDelayToRetryMs)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv->Orbit = std::move(Request.Orbit);
@@ -375,12 +410,17 @@ protected:
}
}
- virtual void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
+ void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
ui32 eventType = ev->Get()->SourceType;
auto reason = ev->Get()->Reason;
switch (eventType) {
case TEvKqpNode::TEvStartKqpTasksRequest::EventType: {
- InvalidateNode(ev->Cookie);
+ if (reason == TEvents::TEvUndelivered::EReason::ReasonActorUnknown) {
+ LOG_D("Schedule a retry by ActorUnknown reason, nodeId:" << ev->Sender.NodeId() << " requestId: " << ev->Cookie);
+ this->Schedule(TDuration::MilliSeconds(ExecuterDelayToRetryMs), new typename TEvPrivate::TEvRetry(ev->Cookie, ev->Sender));
+ return;
+ }
+ InvalidateNode(ev->Sender.NodeId());
return InternalError(TStringBuilder()
<< "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason);
}
@@ -390,6 +430,15 @@ protected:
}
}
+ void HandleRetry(typename TEvPrivate::TEvRetry::TPtr& ev) {
+ if (Planner && Planner->SendStartKqpTasksRequest(ev->Get()->RequestId, ev->Get()->Target)) {
+ return;
+ }
+ InvalidateNode(Target.NodeId());
+ return InternalError(TStringBuilder()
+ << "TEvKqpNode::TEvStartKqpTasksRequest lost: ActorUnknown");
+ }
+
void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
auto nodeId = ev->Get()->NodeId;
LOG_N("Disconnected node " << nodeId);
@@ -1130,6 +1179,9 @@ protected:
ui64 LastTaskId = 0;
TString LastComputeActorId = "";
+ std::unique_ptr<TKqpPlanner> Planner;
+ ui32 ExecuterDelayToRetryMs;
+
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
@@ -1139,7 +1191,7 @@ private:
IActor* CreateKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters);
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult);
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, ui32 executerDelayToRetryMs);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 5567d314f81..0b8fa4cefc6 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -24,7 +24,7 @@ constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks,
- THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+ THashMap<ui64, TVector<NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
@@ -32,7 +32,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
: TxId(txId)
, ExecuterId(executer)
, ComputeTasks(std::move(computeTasks))
- , ScanTasks(std::move(scanTasks))
+ , MainTasksPerNode(std::move(mainTasksPerNode))
, Snapshot(snapshot)
, Database(database)
, UserToken(userToken)
@@ -54,7 +54,7 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
}
}
-bool TKqpPlanner::SendKqpTasksRequest(ui32 requestId, const TActorId& target) {
+bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) {
auto& requestData = Requests[requestId];
if (requestData.RetryNumber == 3) {
@@ -92,7 +92,45 @@ bool TKqpPlanner::SendKqpTasksRequest(ui32 requestId, const TActorId& target) {
return true;
}
-void TKqpPlanner::Process() {
+void TKqpPlanner::ProcessTasksForDataExecuter() {
+
+ long requestsCnt = 0;
+
+ for (auto& [nodeId, tasks] : MainTasksPerNode) {
+
+ auto& requestData = Requests.emplace_back();
+
+ requestData.request.SetTxId(TxId);
+ ActorIdToProto(ExecuterId, requestData.request.MutableExecuterActorId());
+
+ if (Deadline) {
+ TDuration timeout = Deadline - TAppData::TimeProvider->Now();
+ requestData.request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
+ }
+
+ requestData.request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA);
+ requestData.request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
+ requestData.request.MutableRuntimeSettings()->SetUseLLVM(false);
+ requestData.request.SetStartAllOrFail(true);
+
+ for (auto&& task : tasks) {
+ requestData.request.AddTasks()->Swap(&task);
+ }
+
+ auto target = MakeKqpNodeServiceID(nodeId);
+
+ requestData.flag = CalcSendMessageFlagsForNode(nodeId);
+ requestsCnt++;
+
+ SendStartKqpTasksRequest(Requests.size() - 1, target);
+ }
+
+ if (ExecuterSpan) {
+ ExecuterSpan.Attribute("requestsCnt", requestsCnt);
+ }
+}
+
+void TKqpPlanner::ProcessTasksForScanExecuter() {
PrepareToProcess();
auto localResources = GetKqpResourceManager()->GetLocalResources();
@@ -145,15 +183,15 @@ void TKqpPlanner::Process() {
AddScansToKqpNodeRequest(requestData.request, group.NodeId);
auto target = MakeKqpNodeServiceID(group.NodeId);
- requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+ requestData.flag = CalcSendMessageFlagsForNode(group.NodeId);
- SendKqpTasksRequest(Requests.size() - 1, target);
+ SendStartKqpTasksRequest(Requests.size() - 1, target);
++requestsCnt;
}
TVector<ui64> nodes;
- nodes.reserve(ScanTasks.size());
- for (auto& [nodeId, _]: ScanTasks) {
+ nodes.reserve(MainTasksPerNode.size());
+ for (auto& [nodeId, _]: MainTasksPerNode) {
nodes.push_back(nodeId);
}
@@ -163,12 +201,12 @@ void TKqpPlanner::Process() {
AddScansToKqpNodeRequest(requestData.request, nodeId);
auto target = MakeKqpNodeServiceID(nodeId);
- requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+ requestData.flag = CalcSendMessageFlagsForNode(nodeId);
LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId);
- SendKqpTasksRequest(Requests.size() - 1, target);
+ SendStartKqpTasksRequest(Requests.size() - 1, target);
++requestsCnt;
}
- Y_VERIFY(ScanTasks.empty());
+ Y_VERIFY(MainTasksPerNode.empty());
} else {
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query");
@@ -185,7 +223,7 @@ void TKqpPlanner::PrepareToProcess() {
auto rmConfig = GetKqpResourceManager()->GetConfig();
ui32 tasksCount = ComputeTasks.size();
- for (auto& [shardId, tasks] : ScanTasks) {
+ for (auto& [shardId, tasks] : MainTasksPerNode) {
tasksCount += tasks.size();
}
@@ -196,7 +234,7 @@ void TKqpPlanner::PrepareToProcess() {
EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i]);
LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit;
}
- if (auto it = ScanTasks.find(ExecuterId.NodeId()); it != ScanTasks.end()) {
+ if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) {
for (size_t i = 0; i < it->second.size(); ++i) {
EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()]);
LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit;
@@ -209,8 +247,8 @@ ui64 TKqpPlanner::GetComputeTasksNumber() const {
return ComputeTasks.size();
}
-ui64 TKqpPlanner::GetScanTasksNumber() const {
- return ScanTasks.size();
+ui64 TKqpPlanner::GetMainTasksNumber() const {
+ return MainTasksPerNode.size();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -224,14 +262,14 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId());
auto target = MakeKqpNodeServiceID(ExecuterId.NodeId());
- requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
+ requestData.flag = CalcSendMessageFlagsForNode(ExecuterId.NodeId());
LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId);
- SendKqpTasksRequest(Requests.size() - 1, target);
+ SendStartKqpTasksRequest(Requests.size() - 1, target);
long requestsCnt = 1;
TVector<ui64> nodes;
- for (const auto& pair: ScanTasks) {
+ for (const auto& pair: MainTasksPerNode) {
nodes.push_back(pair.first);
YQL_ENSURE(pair.first != ExecuterId.NodeId());
}
@@ -249,11 +287,11 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
auto target = MakeKqpNodeServiceID(nodeId);
requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
- SendKqpTasksRequest(Requests.size() - 1, target);
+ SendStartKqpTasksRequest(Requests.size() - 1, target);
requestsCnt++;
}
- Y_VERIFY(ScanTasks.size() == 0);
+ Y_VERIFY(MainTasksPerNode.size() == 0);
if (ExecuterSpan) {
ExecuterSpan.Attribute("requestsCnt", requestsCnt);
@@ -313,12 +351,12 @@ void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& req
void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) {
if (!Snapshot.IsValid()) {
- Y_ASSERT(ScanTasks.size() == 0);
+ Y_ASSERT(MainTasksPerNode.size() == 0);
return;
}
bool withLLVM = true;
- if (auto nodeTasks = ScanTasks.FindPtr(nodeId)) {
+ if (auto nodeTasks = MainTasksPerNode.FindPtr(nodeId)) {
LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request");
request.MutableSnapshot()->SetTxId(Snapshot.TxId);
@@ -331,7 +369,7 @@ void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest&
AddSnapshotInfoToTaskInputs(task);
request.AddTasks()->Swap(&task);
}
- ScanTasks.erase(nodeId);
+ MainTasksPerNode.erase(nodeId);
}
if (request.GetRuntimeSettings().GetUseLLVM()) {
@@ -392,13 +430,13 @@ void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+ THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TMaybe<TString>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot)
{
- return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(scanTasks), snapshot,
+ return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(mainTasksPerNode), snapshot,
database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, std::move(resourcesSnapshot));
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 74ef4e4c1db..def2a6f3a8c 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -31,12 +31,13 @@ public:
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages,
bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot);
- bool SendKqpTasksRequest(ui32 requestId, const TActorId& target);
+ bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target);
- void Process();
+ void ProcessTasksForScanExecuter();
+ void ProcessTasksForDataExecuter();
ui64 GetComputeTasksNumber() const;
- ui64 GetScanTasksNumber() const;
+ ui64 GetMainTasksNumber() const;
private:
void PrepareToProcess();
@@ -52,7 +53,7 @@ private:
const ui64 TxId;
const TActorId ExecuterId;
TVector<NYql::NDqProto::TDqTask> ComputeTasks;
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> ScanTasks;
+ THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> MainTasksPerNode;
const IKqpGateway::TKqpSnapshot Snapshot;
TString Database;
const TMaybe<TString> UserToken;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 2737b45ccf9..128d66d7b9c 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -1,7 +1,6 @@
#include "kqp_executer.h"
#include "kqp_executer_impl.h"
#include "kqp_partition_helper.h"
-#include "kqp_planner.h"
#include "kqp_result_channel.h"
#include "kqp_tasks_graph.h"
#include "kqp_tasks_validate.h"
@@ -40,29 +39,6 @@ namespace {
class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan> {
using TBase = TKqpExecuterBase<TKqpScanExecuter, EExecType::Scan>;
- struct TEvPrivate {
- enum EEv {
- EvResourcesSnapshot = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvRetry
- };
-
- struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
- TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
-
- TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
- : Snapshot(std::move(snapshot)) {}
- };
-
- struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> {
- ui32 RequestId;
- TActorId Target;
-
- TEvRetry(ui64 requestId, const TActorId& target)
- : RequestId(requestId)
- , Target(target) {}
- };
- };
-
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
@@ -72,10 +48,8 @@ public:
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
ui32 executerDelayToRetryMs)
- : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter")
+ : TBase(std::move(request), database, userToken, counters, executerDelayToRetryMs, TWilsonKqp::ScanExecuter, "ScanExecuter")
, AggregationSettings(aggregation)
- , Planner(nullptr)
- , ExecuterDelayToRetryMs(executerDelayToRetryMs)
{
YQL_ENSURE(Request.Transactions.size() == 1);
YQL_ENSURE(Request.DataShardLocks.empty());
@@ -749,38 +723,9 @@ private:
std::move(scanTasks), Request.Snapshot,
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot));
- LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetScanTasksNumber());
-
- Planner->Process();
- }
+ LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetMainTasksNumber());
- void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) override {
- ui32 eventType = ev->Get()->SourceType;
- auto reason = ev->Get()->Reason;
- switch (eventType) {
- case TEvKqpNode::TEvStartKqpTasksRequest::EventType: {
- if (reason == TEvents::TEvUndelivered::EReason::ReasonActorUnknown) {
- LOG_D("Schedule a retry by ActorUnknown reason, nodeId:" << ev->Sender.NodeId() << " requestId: " << ev->Cookie);
- Schedule(TDuration::MilliSeconds(ExecuterDelayToRetryMs), new TEvPrivate::TEvRetry(ev->Cookie, ev->Sender));
- return;
- }
- InvalidateNode(ev->Sender.NodeId());
- return InternalError(TStringBuilder()
- << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason);
- }
- default: {
- LOG_E("Event lost, type: " << eventType << ", reason: " << reason);
- }
- }
- }
-
- void HandleRetry(TEvPrivate::TEvRetry::TPtr& ev) {
- if (Planner->SendKqpTasksRequest(ev->Get()->RequestId, ev->Get()->Target)) {
- return;
- }
- InvalidateNode(Target.NodeId());
- return InternalError(TStringBuilder()
- << "TEvKqpNode::TEvStartKqpTasksRequest lost: ActorUnknown");
+ Planner->ProcessTasksForScanExecuter();
}
private:
@@ -846,8 +791,6 @@ public:
}
private:
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
- std::unique_ptr<TKqpPlanner> Planner;
- ui32 ExecuterDelayToRetryMs;
};
} // namespace
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 0a2b64f8209..a353f50fc6b 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1236,7 +1236,7 @@ message TTableServiceConfig {
optional TAggregationConfig AggregationConfig = 29;
optional bool EnableKqpScanQueryStreamLookup = 30 [default = false];
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
- optional uint32 ExecuterDelayToRetryMs = 32 [default = 200];
+ optional uint32 ExecuterDelayToRetryMs = 32 [default = 400];
};
// Config describes immediate controls and allows