summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <[email protected]>2023-04-13 17:36:32 +0300
committergvit <[email protected]>2023-04-13 17:36:32 +0300
commit782ed55b35b9805318a8d26ef040ff62a3875b42 (patch)
tree45784522f847a67bcb6f75e6cd7b6f0dc83e22ef
parent04a2ef84c2a459822321d96cfbac1ee25222c23c (diff)
refactor kqp planner code
start refactoring kqp node service continue refactoring task serialization process
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp24
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp376
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h54
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp46
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h4
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_estimation.cpp31
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_estimation.h9
8 files changed, 250 insertions, 298 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 903b156e7b5..e3b1f758207 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1901,7 +1901,7 @@ private:
return;
}
- TasksGraph.GetMeta().Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
+ SetSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
ImmediateTx = true;
ContinueExecute();
@@ -2129,26 +2129,26 @@ private:
ExecuteDataComputeTask(std::move(taskDesc), shareMailbox);
}
- size_t remoteComputeTasksCnt = 0;
- THashMap<ui64, TVector<NDqProto::TDqTask>> tasksPerNode;
+ THashMap<ui64, TVector<ui64>> tasksPerNode;
for (auto& [shardId, tasks] : RemoteComputeTasks) {
auto it = ShardIdToNodeId.find(shardId);
YQL_ENSURE(it != ShardIdToNodeId.end());
-
for (ui64 taskId : tasks) {
- const auto& task = TasksGraph.GetTask(taskId);
- remoteComputeTasksCnt += 1;
PendingComputeTasks.insert(taskId);
- auto taskDesc = SerializeTaskToProto(TasksGraph, task);
- tasksPerNode[it->second].emplace_back(std::move(taskDesc));
+ tasksPerNode[it->second].emplace_back(taskId);
}
}
- Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(),
- ExecuterSpan, {}, ExecuterRetriesConfig);
- Planner->ProcessTasksForDataExecuter();
+ ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */);
+ auto err = Planner->PlanExecution();
+ if (err) {
+ TlsActivationContext->Send(err.release());
+ return;
+ }
+ Planner->Submit();
// then start data tasks with known actor ids of compute tasks
for (auto& [shardId, shardTx] : DatashardTxs) {
@@ -2214,7 +2214,7 @@ private:
<< ", topicTxs: " << Request.TopicOperations.GetSize()
<< ", volatile: " << VolatileTx
<< ", immediate: " << ImmediateTx
- << ", remote tasks" << remoteComputeTasksCnt
+ << ", pending compute tasks" << PendingComputeTasks.size()
<< ", useFollowers: " << UseFollowers);
LOG_T("Updating channels after the creation of compute actors");
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 4e269ba263b..0c53539739d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -952,6 +952,10 @@ protected:
return TasksGraph.GetMeta().Snapshot;
}
+ void SetSnapshot(ui64 step, ui64 txId) {
+ TasksGraph.GetMeta().SetSnapshot(step, txId);
+ }
+
IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) {
IActor* proxy;
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index c8270bb628a..8c74f71486b 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -18,22 +18,52 @@ namespace NKikimr::NKqp {
using namespace NYql;
+namespace {
+
+const ui64 MaxTaskSize = 48_MB;
+
+template <class TCollection>
+std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TCollection& tasks) {
+ for (const auto& task : tasks) {
+ if (ui32 size = task.ByteSize(); size > MaxTaskSize) {
+ LOG_E("Abort execution. Task #" << task.GetId() << " size is too big: " << size << " > " << MaxTaskSize);
+ return std::make_unique<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::ABORTED,
+ TStringBuilder() << "Datashard program size limit exceeded (" << size << " > " << MaxTaskSize << ")");
+ }
+ }
+ return nullptr;
+}
+
+void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskResourceEstimation& ret) {
+ const auto& task = graph.GetTask(taskId);
+ const auto& stageInfo = graph.GetStageInfo(task.StageId);
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+ const auto& opts = stage.GetProgram().GetSettings();
+ ret.TaskId = task.Id;
+ ret.ChannelBuffersCount += task.Inputs.size() ? 1 : 0;
+ ret.ChannelBuffersCount += task.Outputs.size() ? 1 : 0;
+ ret.HeavyProgram = opts.GetHasMapJoin();
+}
+
+}
+
// Task can allocate extra memory during execution.
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
-TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks,
- THashMap<ui64, TVector<NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
+TKqpPlanner::TKqpPlanner(const TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, TVector<ui64>&& computeTasks,
+ THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig)
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
+ bool isDataQuery)
: TxId(txId)
, ExecuterId(executer)
, ComputeTasks(std::move(computeTasks))
- , MainTasksPerNode(std::move(mainTasksPerNode))
+ , TasksPerNode(std::move(tasksPerNode))
, Snapshot(snapshot)
, Database(database)
, UserToken(userToken)
@@ -46,6 +76,8 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
, ResourcesSnapshot(std::move(resourcesSnapshot))
, ExecuterSpan(executerSpan)
, ExecuterRetriesConfig(executerRetriesConfig)
+ , TasksGraph(graph)
+ , IsDataQuery(isDataQuery)
{
if (!Database) {
// a piece of magic for tests
@@ -57,14 +89,20 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
}
bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) {
+ YQL_ENSURE(requestId < Requests.size());
+
auto& requestData = Requests[requestId];
if (requestData.RetryNumber == ExecuterRetriesConfig.GetMaxRetryNumber() + 1) {
return false;
}
- auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>();
- ev->Record = requestData.request;
+ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> ev;
+ if (Y_LIKELY(requestData.SerializedRequest)) {
+ ev.reset(requestData.SerializedRequest.release());
+ } else {
+ ev = SerializeRequest(requestData);
+ }
if (requestData.RetryNumber == ExecuterRetriesConfig.GetMaxRetryNumber()) {
LOG_E("Retry failed by retries limit, requestId: " << requestId);
@@ -78,7 +116,7 @@ bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& targe
if (targetNode) {
LOG_D("Try to retry to another node, nodeId: " << *targetNode << ", requestId: " << requestId);
auto anotherTarget = MakeKqpNodeServiceID(*targetNode);
- TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.Release(),
+ TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.release(),
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, requestId, nullptr, ExecuterSpan.GetTraceId()));
requestData.RetryNumber++;
return true;
@@ -93,46 +131,71 @@ bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& targe
requestData.RetryNumber++;
- TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.Release(),
- requestData.flag, requestId, nullptr, ExecuterSpan.GetTraceId()));
+ TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.release(),
+ requestData.Flag, requestId, nullptr, ExecuterSpan.GetTraceId()));
return true;
}
-void TKqpPlanner::ProcessTasksForDataExecuter() {
-
- long requestsCnt = 0;
-
- for (auto& [nodeId, tasks] : MainTasksPerNode) {
+std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeRequest(const TRequestData& requestData) const {
+ auto result = std::make_unique<TEvKqpNode::TEvStartKqpTasksRequest>();
+ auto& request = result->Record;
+ request.SetTxId(TxId);
+ ActorIdToProto(ExecuterId, request.MutableExecuterActorId());
- auto& requestData = Requests.emplace_back();
+ if (Deadline) {
+ TDuration timeout = Deadline - TAppData::TimeProvider->Now();
+ request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
+ }
- requestData.request.SetTxId(TxId);
- ActorIdToProto(ExecuterId, requestData.request.MutableExecuterActorId());
+ bool enableLlvm = EnableLlvm;
- if (Deadline) {
- TDuration timeout = Deadline - TAppData::TimeProvider->Now();
- requestData.request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
+ for (ui64 taskId : requestData.TaskIds) {
+ const auto& task = TasksGraph.GetTask(taskId);
+ auto serializedTask = SerializeTaskToProto(TasksGraph, task);
+ if (DisableLlvmForUdfStages && serializedTask.GetProgram().GetSettings().GetHasUdf()) {
+ enableLlvm = false;
}
+ request.AddTasks()->Swap(&serializedTask);
+ }
- requestData.request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA);
- requestData.request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
- requestData.request.MutableRuntimeSettings()->SetUseLLVM(false);
- requestData.request.SetStartAllOrFail(true);
+ if (IsDataQuery) {
+ request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
+ request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
+ request.MutableRuntimeSettings()->SetUseLLVM(false);
+ request.SetStartAllOrFail(true);
+ } else {
+ request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
+ request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
+ request.MutableRuntimeSettings()->SetUseLLVM(enableLlvm);
+ request.SetStartAllOrFail(true);
+ }
- for (auto&& task : tasks) {
- requestData.request.AddTasks()->Swap(&task);
- }
+ if (RlPath) {
+ auto rlPath = request.MutableRuntimeSettings()->MutableRlPath();
+ rlPath->SetCoordinationNode(RlPath->GetCoordinationNode());
+ rlPath->SetResourcePath(RlPath->GetResourcePath());
+ rlPath->SetDatabase(Database);
+ if (UserToken)
+ rlPath->SetToken(UserToken->GetSerializedToken());
+ }
- auto target = MakeKqpNodeServiceID(nodeId);
+ if (Snapshot.IsValid()) {
+ request.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ request.MutableSnapshot()->SetStep(Snapshot.Step);
+ }
- requestData.flag = CalcSendMessageFlagsForNode(nodeId);
- requestsCnt++;
+ return result;
+}
- SendStartKqpTasksRequest(Requests.size() - 1, target);
+void TKqpPlanner::Submit() {
+ for (size_t reqId = 0; reqId < Requests.size(); ++reqId) {
+ ui64 nodeId = Requests[reqId].NodeId;
+ auto target = MakeKqpNodeServiceID(nodeId);
+ SendStartKqpTasksRequest(reqId, target);
}
if (ExecuterSpan) {
- ExecuterSpan.Attribute("requestsCnt", requestsCnt);
+ ExecuterSpan.Attribute("requestsCnt", static_cast<long>(Requests.size()));
}
}
@@ -148,7 +211,10 @@ ui32 TKqpPlanner::GetCurrentRetryDelay(ui32 requestId) {
return requestData.CurrentDelay;
}
-void TKqpPlanner::ProcessTasksForScanExecuter() {
+std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
+ if (ComputeTasks.empty())
+ return nullptr;
+
PrepareToProcess();
auto localResources = GetKqpResourceManager()->GetLocalResources();
@@ -157,8 +223,12 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {
ResourceEstimations.size() <= localResources.ExecutionUnits &&
ResourceEstimations.size() <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT)
{
- RunLocal(ResourcesSnapshot);
- return;
+ ui64 selfNodeId = ExecuterId.NodeId();
+ for(ui64 taskId: ComputeTasks) {
+ TasksPerNode[selfNodeId].push_back(taskId);
+ }
+
+ return nullptr;
}
if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
@@ -166,8 +236,12 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {
if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
ResourceEstimations.size() <= localResources.ExecutionUnits)
{
- RunLocal(ResourcesSnapshot);
- return;
+ ui64 localNodeId = ExecuterId.NodeId();
+ for(ui64 taskId: ComputeTasks) {
+ TasksPerNode[localNodeId].push_back(taskId);
+ }
+
+ return nullptr;
}
LOG_E("Not enough resources to execute query locally and no information about other nodes");
@@ -175,8 +249,7 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {
"Not enough resources to execute query locally and no information about other nodes (estimation: "
+ ToString(LocalRunMemoryEst) + ";" + GetEstimationsInfo() + ")");
- TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()));
- return;
+ return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}
auto planner = CreateKqpGreedyPlanner();
@@ -193,55 +266,53 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {
auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
- long requestsCnt = 0;
+ THashMap<ui64, ui64> alreadyAssigned;
+ for(auto& [nodeId, tasks] : TasksPerNode) {
+ for(ui64 taskId: tasks) {
+ alreadyAssigned.emplace(taskId, nodeId);
+ }
+ }
if (!plan.empty()) {
for (auto& group : plan) {
- auto& requestData = Requests.emplace_back();
- PrepareKqpNodeRequest(requestData.request, THashSet<ui64>(group.TaskIds.begin(), group.TaskIds.end()));
- AddScansToKqpNodeRequest(requestData.request, group.NodeId);
-
- auto target = MakeKqpNodeServiceID(group.NodeId);
- requestData.flag = CalcSendMessageFlagsForNode(group.NodeId);
-
- SendStartKqpTasksRequest(Requests.size() - 1, target);
- ++requestsCnt;
- }
-
- TVector<ui64> nodes;
- nodes.reserve(MainTasksPerNode.size());
- for (auto& [nodeId, _]: MainTasksPerNode) {
- nodes.push_back(nodeId);
+ for(ui64 taskId: group.TaskIds) {
+ auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
+ if (success) {
+ TasksPerNode[group.NodeId].push_back(taskId);
+ }
+ }
}
- for (ui64 nodeId: nodes) {
- auto& requestData = Requests.emplace_back();
- PrepareKqpNodeRequest(requestData.request, {});
- AddScansToKqpNodeRequest(requestData.request, nodeId);
-
- auto target = MakeKqpNodeServiceID(nodeId);
- requestData.flag = CalcSendMessageFlagsForNode(nodeId);
- LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId);
- SendStartKqpTasksRequest(Requests.size() - 1, target);
- ++requestsCnt;
- }
- Y_VERIFY(MainTasksPerNode.empty());
- } else {
+ return nullptr;
+ } else {
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
"Not enough resources to execute query");
+ return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
+ }
+}
- TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()));
+std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
+ auto err = AssignTasksToNodes();
+ if (err) {
+ return err;
}
- if (ExecuterSpan) {
- ExecuterSpan.Attribute("requestsCnt", requestsCnt);
+ for(auto& [nodeId, tasks] : TasksPerNode) {
+ SortUnique(tasks);
+ auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
+ request.SerializedRequest = SerializeRequest(request);
+ auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks());
+ if (ev != nullptr) {
+ return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release());
+ }
}
+ return nullptr;
}
TString TKqpPlanner::GetEstimationsInfo() const {
TStringStream ss;
ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:";
- if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) {
+ if (auto it = TasksPerNode.find(ExecuterId.NodeId()); it != TasksPerNode.end()) {
ss << it->second.size() << ";";
} else {
ss << "0;";
@@ -253,7 +324,7 @@ void TKqpPlanner::PrepareToProcess() {
auto rmConfig = GetKqpResourceManager()->GetConfig();
ui32 tasksCount = ComputeTasks.size();
- for (auto& [shardId, tasks] : MainTasksPerNode) {
+ for (auto& [shardId, tasks] : TasksPerNode) {
tasksCount += tasks.size();
}
@@ -261,147 +332,21 @@ void TKqpPlanner::PrepareToProcess() {
LocalRunMemoryEst = 0;
for (size_t i = 0; i < ComputeTasks.size(); ++i) {
- EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i], ComputeTasks.size());
+ BuildInitialTaskResources(TasksGraph, ComputeTasks[i], ResourceEstimations[i]);
+ EstimateTaskResources(rmConfig, ResourceEstimations[i], ComputeTasks.size());
LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit;
}
- if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) {
- for (size_t i = 0; i < it->second.size(); ++i) {
- EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()], it->second.size());
- LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit;
- }
- }
- Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; });
-}
-ui64 TKqpPlanner::GetComputeTasksNumber() const {
- return ComputeTasks.size();
-}
-
-ui64 TKqpPlanner::GetMainTasksNumber() const {
- return MainTasksPerNode.size();
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-/// Local Execution
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) {
- LOG_D("Execute query locally");
-
- auto& requestData = Requests.emplace_back();
- PrepareKqpNodeRequest(requestData.request, {});
- AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId());
-
- auto target = MakeKqpNodeServiceID(ExecuterId.NodeId());
- requestData.flag = CalcSendMessageFlagsForNode(ExecuterId.NodeId());
- LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId);
- SendStartKqpTasksRequest(Requests.size() - 1, target);
-
- long requestsCnt = 1;
-
- TVector<ui64> nodes;
- for (const auto& pair: MainTasksPerNode) {
- nodes.push_back(pair.first);
- YQL_ENSURE(pair.first != ExecuterId.NodeId());
- }
-
- THashMap<ui64, size_t> nodeIdToIdx;
- for (size_t idx = 0; idx < snapshot.size(); ++idx) {
- nodeIdToIdx[snapshot[idx].nodeid()] = idx;
- LOG_D("snapshot #" << idx << ": " << snapshot[idx].ShortDebugString());
- }
-
- for (auto nodeId: nodes) {
- auto& requestData = Requests.emplace_back();
- PrepareKqpNodeRequest(requestData.request, {});
- AddScansToKqpNodeRequest(requestData.request, nodeId);
-
- auto target = MakeKqpNodeServiceID(nodeId);
- requestData.flag = CalcSendMessageFlagsForNode(target.NodeId());
- SendStartKqpTasksRequest(Requests.size() - 1, target);
-
- requestsCnt++;
- }
- Y_VERIFY(MainTasksPerNode.size() == 0);
-
- if (ExecuterSpan) {
- ExecuterSpan.Attribute("requestsCnt", requestsCnt);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds) {
- request.SetTxId(TxId);
- ActorIdToProto(ExecuterId, request.MutableExecuterActorId());
-
- bool withLLVM = EnableLlvm;
-
- if (taskIds.empty()) {
- for (auto& taskDesc : ComputeTasks) {
- if (taskDesc.GetId()) {
- if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
- withLLVM = false;
- }
- request.AddTasks()->Swap(&taskDesc);
- }
+ ui32 currentEst = ComputeTasks.size();
+ for(auto& [nodeId, tasks] : TasksPerNode) {
+ for (ui64 taskId: tasks) {
+ BuildInitialTaskResources(TasksGraph, taskId, ResourceEstimations[currentEst]);
+ EstimateTaskResources(rmConfig, ResourceEstimations[currentEst], tasks.size());
+ LocalRunMemoryEst += ResourceEstimations[currentEst].TotalMemoryLimit;
+ ++currentEst;
}
- } else {
- for (auto& taskDesc : ComputeTasks) {
- if (taskDesc.GetId() && Find(taskIds, taskDesc.GetId()) != taskIds.end()) {
- if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
- withLLVM = false;
- }
- request.AddTasks()->Swap(&taskDesc);
- }
- }
- }
-
- if (Deadline) {
- TDuration timeout = Deadline - TAppData::TimeProvider->Now();
- request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
- }
-
- request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN);
- request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
- request.MutableRuntimeSettings()->SetUseLLVM(withLLVM);
- request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
-
- if (RlPath) {
- auto rlPath = request.MutableRuntimeSettings()->MutableRlPath();
- rlPath->SetCoordinationNode(RlPath->GetCoordinationNode());
- rlPath->SetResourcePath(RlPath->GetResourcePath());
- rlPath->SetDatabase(Database);
- if (UserToken)
- rlPath->SetToken(UserToken->GetSerializedToken());
- }
-
- request.SetStartAllOrFail(true);
-}
-
-void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) {
- if (!Snapshot.IsValid()) {
- Y_ASSERT(MainTasksPerNode.size() == 0);
- return;
- }
-
- bool withLLVM = true;
- if (auto nodeTasks = MainTasksPerNode.FindPtr(nodeId)) {
- LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request");
-
- request.MutableSnapshot()->SetTxId(Snapshot.TxId);
- request.MutableSnapshot()->SetStep(Snapshot.Step);
-
- for (auto& task: *nodeTasks) {
- if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) {
- withLLVM = false;
- }
- request.AddTasks()->Swap(&task);
- }
- MainTasksPerNode.erase(nodeId);
- }
-
- if (request.GetRuntimeSettings().GetUseLLVM()) {
- request.MutableRuntimeSettings()->SetUseLLVM(withLLVM);
}
+ Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; });
}
ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
@@ -413,16 +358,17 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
+std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
+ THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
- TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig)
+ TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
+ bool isDataQuery)
{
- return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(mainTasksPerNode), snapshot,
- database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan,
- std::move(resourcesSnapshot), executerRetriesConfig);
+ return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot,
+ database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan,
+ std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery);
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 1149c67901d..1f5766eea86 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -18,45 +18,49 @@ namespace NKikimr::NKqp {
class TKqpPlanner {
- struct RequestData {
- NKikimrKqp::TEvStartKqpTasksRequest request;
- ui32 flag;
+ struct TRequestData {
+ TVector<ui64> TaskIds;
+ ui32 Flag;
+ ui64 NodeId;
ui32 RetryNumber = 0;
ui32 CurrentDelay = 0;
+ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializedRequest;
+
+ explicit TRequestData(TVector<ui64>&& taskIds, ui64 flag, ui64 nodeId)
+ : TaskIds(std::move(taskIds))
+ , Flag(flag)
+ , NodeId(nodeId)
+ {}
};
public:
- TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+ TKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
+ THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages,
bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
- TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig);
- bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target);
-
- void ProcessTasksForScanExecuter();
- void ProcessTasksForDataExecuter();
-
- ui64 GetComputeTasksNumber() const;
- ui64 GetMainTasksNumber() const;
+ TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
+ bool isDataQuery);
+ bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target);
+ std::unique_ptr<IEventHandle> PlanExecution();
+ std::unique_ptr<IEventHandle> AssignTasksToNodes();
+ void Submit();
ui32 GetCurrentRetryDelay(ui32 requestId);
+
private:
+
void PrepareToProcess();
TString GetEstimationsInfo() const;
- void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot);
-
- void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds);
- void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId);
-
+ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData) const;
ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
private:
const ui64 TxId;
const TActorId ExecuterId;
- TVector<NYql::NDqProto::TDqTask> ComputeTasks;
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> MainTasksPerNode;
+ TVector<ui64> ComputeTasks;
+ THashMap<ui64, TVector<ui64>> TasksPerNode;
const IKqpGateway::TKqpSnapshot Snapshot;
TString Database;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
@@ -72,15 +76,17 @@ private:
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
ui64 LocalRunMemoryEst;
TVector<TTaskResourceEstimation> ResourceEstimations;
- TVector<RequestData> Requests;
+ TVector<TRequestData> Requests;
+ const TKqpTasksGraph& TasksGraph;
+ const bool IsDataQuery;
};
-std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
+ THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 079ee399e9d..078ac610bd5 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -572,30 +572,29 @@ private:
}
// NodeId -> {Tasks}
- THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> scanTasks;
+ THashMap<ui64, TVector<ui64>> scanTasks;
ui32 nShardScans = 0;
ui32 nScanTasks = 0;
- TVector<NYql::NDqProto::TDqTask> computeTasks;
+ TVector<ui64> computeTasks;
InitializeChannelProxies();
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task);
if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {
// Task with source
if (!task.Meta.Reads) {
- scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc));
+ scanTasks[task.Meta.NodeId].emplace_back(task.Id);
nScanTasks++;
continue;
}
if (stageInfo.Meta.IsSysView()) {
- computeTasks.emplace_back(std::move(taskDesc));
+ computeTasks.emplace_back(task.Id);
} else {
- scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc));
+ scanTasks[task.Meta.NodeId].emplace_back(task.Id);
nScanTasks++;
}
@@ -607,7 +606,7 @@ private:
}
} else {
- computeTasks.emplace_back(std::move(taskDesc));
+ computeTasks.emplace_back(task.Id);
}
}
@@ -620,12 +619,6 @@ private:
return;
}
- bool fitSize = AllOf(scanTasks, [this](const auto& x){ return ValidateTaskSize(x.second); })
- && ValidateTaskSize(computeTasks);
- if (!fitSize) {
- return;
- }
-
if (prepareTasksSpan) {
prepareTasksSpan.End();
}
@@ -675,27 +668,32 @@ public:
}
private:
- void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks,
+ void ExecuteScanTx(TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode,
TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
- LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), scanTasks.size());
- for (const auto& [_, tasks]: scanTasks) {
+ LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), tasksPerNode.size());
+ for (const auto& [_, tasks]: tasksPerNode) {
for (const auto& task : tasks) {
- PendingComputeTasks.insert(task.GetId());
+ PendingComputeTasks.insert(task);
}
}
- for (auto& taskDesc : computeTasks) {
- PendingComputeTasks.insert(taskDesc.GetId());
+ for (auto& task : computeTasks) {
+ PendingComputeTasks.insert(task);
}
- Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks),
- std::move(scanTasks), GetSnapshot(),
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks),
+ std::move(tasksPerNode), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling,
- Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig);
- LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetMainTasksNumber());
+ Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */);
+ LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size());
+ auto err = Planner->PlanExecution();
+ if (err) {
+ TlsActivationContext->Send(err.release());
+ return;
+ }
- Planner->ProcessTasksForScanExecuter();
+ Planner->Submit();
}
private:
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index ad3777350a7..bbcebbba257 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -89,6 +89,10 @@ struct TGraphMeta {
IKqpGateway::TKqpSnapshot Snapshot;
std::unordered_map<ui64, TActorId> ResultChannelProxies;
TActorId ExecuterId;
+
+ void SetSnapshot(ui64 step, ui64 txId) {
+ Snapshot = IKqpGateway::TKqpSnapshot(step, txId);
+ }
};
struct TTaskInputMeta {};
diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
index ff279362f0f..7e339fa24f6 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
+++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
@@ -8,17 +8,24 @@ using namespace NKikimrConfig;
TTaskResourceEstimation EstimateTaskResources(const TDqTask& task,
const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount)
{
- TTaskResourceEstimation ret;
- EstimateTaskResources(task, config, ret, tasksCount);
+ TTaskResourceEstimation ret = BuildInitialTaskResources(task);
+ EstimateTaskResources(config, ret, tasksCount);
return ret;
}
-void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config,
- TTaskResourceEstimation& ret, const ui32 tasksCount)
-{
+TTaskResourceEstimation BuildInitialTaskResources(const TDqTask& task) {
+ TTaskResourceEstimation ret;
+ const auto& opts = task.GetProgram().GetSettings();
ret.TaskId = task.GetId();
ret.ChannelBuffersCount += task.GetInputs().size() ? 1 : 0;
ret.ChannelBuffersCount += task.GetOutputs().size() ? 1 : 0;
+ ret.HeavyProgram = opts.GetHasMapJoin();
+ return ret;
+}
+
+void EstimateTaskResources(const TTableServiceConfig::TResourceManager& config,
+ TTaskResourceEstimation& ret, const ui32 tasksCount)
+{
ui64 channelBuffersSize = ret.ChannelBuffersCount * config.GetChannelBufferSize();
if (channelBuffersSize > config.GetMaxTotalChannelBuffersSize()) {
@@ -28,8 +35,7 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso
ret.ChannelBufferMemoryLimit = config.GetChannelBufferSize();
}
- const auto& opts = task.GetProgram().GetSettings();
- if (/* opts.GetHasSort() || */opts.GetHasMapJoin()) {
+ if (ret.HeavyProgram) {
ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit() / tasksCount;
} else {
ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit() / tasksCount;
@@ -39,15 +45,4 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso
+ ret.MkqlProgramMemoryLimit;
}
-TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks,
- const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount)
-{
- TVector<TTaskResourceEstimation> ret;
- ret.resize(tasks.size());
- for (ui64 i = 0; i < tasks.size(); ++i) {
- EstimateTaskResources(tasks[i], config, ret[i], tasksCount);
- }
- return ret;
-}
-
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.h b/ydb/core/kqp/rm_service/kqp_resource_estimation.h
index 9abbf8dcc78..08c1d674f9a 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_estimation.h
+++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.h
@@ -14,6 +14,7 @@ struct TTaskResourceEstimation {
ui64 ChannelBufferMemoryLimit = 0;
ui64 MkqlProgramMemoryLimit = 0;
ui64 TotalMemoryLimit = 0;
+ bool HeavyProgram = false;
TString ToString() const {
return TStringBuilder() << "TaskResourceEstimation{"
@@ -26,13 +27,11 @@ struct TTaskResourceEstimation {
}
};
+TTaskResourceEstimation BuildInitialTaskResources(const NYql::NDqProto::TDqTask& task);
+
TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task,
const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount);
-void EstimateTaskResources(const NYql::NDqProto::TDqTask& task,
- const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount);
-
-TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks,
- const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount);
+void EstimateTaskResources(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount);
} // namespace NKikimr::NKqp