diff options
author | gvit <[email protected]> | 2023-04-13 17:36:32 +0300 |
---|---|---|
committer | gvit <[email protected]> | 2023-04-13 17:36:32 +0300 |
commit | 782ed55b35b9805318a8d26ef040ff62a3875b42 (patch) | |
tree | 45784522f847a67bcb6f75e6cd7b6f0dc83e22ef | |
parent | 04a2ef84c2a459822321d96cfbac1ee25222c23c (diff) |
refactor kqp planner code
start refactoring kqp node service
continue refactoring task serialization process
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 24 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 376 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 54 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 46 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation.h | 9 |
8 files changed, 250 insertions, 298 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 903b156e7b5..e3b1f758207 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1901,7 +1901,7 @@ private: return; } - TasksGraph.GetMeta().Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); + SetSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); ImmediateTx = true; ContinueExecute(); @@ -2129,26 +2129,26 @@ private: ExecuteDataComputeTask(std::move(taskDesc), shareMailbox); } - size_t remoteComputeTasksCnt = 0; - THashMap<ui64, TVector<NDqProto::TDqTask>> tasksPerNode; + THashMap<ui64, TVector<ui64>> tasksPerNode; for (auto& [shardId, tasks] : RemoteComputeTasks) { auto it = ShardIdToNodeId.find(shardId); YQL_ENSURE(it != ShardIdToNodeId.end()); - for (ui64 taskId : tasks) { - const auto& task = TasksGraph.GetTask(taskId); - remoteComputeTasksCnt += 1; PendingComputeTasks.insert(taskId); - auto taskDesc = SerializeTaskToProto(TasksGraph, task); - tasksPerNode[it->second].emplace_back(std::move(taskDesc)); + tasksPerNode[it->second].emplace_back(taskId); } } - Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), - ExecuterSpan, {}, ExecuterRetriesConfig); - Planner->ProcessTasksForDataExecuter(); + ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */); + auto err = Planner->PlanExecution(); + if (err) { + TlsActivationContext->Send(err.release()); + return; + } + Planner->Submit(); // then start data tasks with known actor ids of compute tasks for (auto& [shardId, shardTx] : DatashardTxs) { @@ -2214,7 +2214,7 @@ private: << ", topicTxs: " << Request.TopicOperations.GetSize() << ", volatile: " << VolatileTx << ", immediate: " << ImmediateTx - << ", remote tasks" << remoteComputeTasksCnt + << ", pending compute tasks" << PendingComputeTasks.size() << ", useFollowers: " << UseFollowers); LOG_T("Updating channels after the creation of compute actors"); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 4e269ba263b..0c53539739d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -952,6 +952,10 @@ protected: return TasksGraph.GetMeta().Snapshot; } + void SetSnapshot(ui64 step, ui64 txId) { + TasksGraph.GetMeta().SetSnapshot(step, txId); + } + IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) { IActor* proxy; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index c8270bb628a..8c74f71486b 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -18,22 +18,52 @@ namespace NKikimr::NKqp { using namespace NYql; +namespace { + +const ui64 MaxTaskSize = 48_MB; + +template <class TCollection> +std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TCollection& tasks) { + for (const auto& task : tasks) { + if (ui32 size = task.ByteSize(); size > MaxTaskSize) { + LOG_E("Abort execution. Task #" << task.GetId() << " size is too big: " << size << " > " << MaxTaskSize); + return std::make_unique<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::ABORTED, + TStringBuilder() << "Datashard program size limit exceeded (" << size << " > " << MaxTaskSize << ")"); + } + } + return nullptr; +} + +void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskResourceEstimation& ret) { + const auto& task = graph.GetTask(taskId); + const auto& stageInfo = graph.GetStageInfo(task.StageId); + const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); + const auto& opts = stage.GetProgram().GetSettings(); + ret.TaskId = task.Id; + ret.ChannelBuffersCount += task.Inputs.size() ? 1 : 0; + ret.ChannelBuffersCount += task.Outputs.size() ? 1 : 0; + ret.HeavyProgram = opts.GetHasMapJoin(); +} + +} + // Task can allocate extra memory during execution. // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; -TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks, - THashMap<ui64, TVector<NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, +TKqpPlanner::TKqpPlanner(const TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, TVector<ui64>&& computeTasks, + THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + bool isDataQuery) : TxId(txId) , ExecuterId(executer) , ComputeTasks(std::move(computeTasks)) - , MainTasksPerNode(std::move(mainTasksPerNode)) + , TasksPerNode(std::move(tasksPerNode)) , Snapshot(snapshot) , Database(database) , UserToken(userToken) @@ -46,6 +76,8 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: , ResourcesSnapshot(std::move(resourcesSnapshot)) , ExecuterSpan(executerSpan) , ExecuterRetriesConfig(executerRetriesConfig) + , TasksGraph(graph) + , IsDataQuery(isDataQuery) { if (!Database) { // a piece of magic for tests @@ -57,14 +89,20 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: } bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) { + YQL_ENSURE(requestId < Requests.size()); + auto& requestData = Requests[requestId]; if (requestData.RetryNumber == ExecuterRetriesConfig.GetMaxRetryNumber() + 1) { return false; } - auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>(); - ev->Record = requestData.request; + std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> ev; + if (Y_LIKELY(requestData.SerializedRequest)) { + ev.reset(requestData.SerializedRequest.release()); + } else { + ev = SerializeRequest(requestData); + } if (requestData.RetryNumber == ExecuterRetriesConfig.GetMaxRetryNumber()) { LOG_E("Retry failed by retries limit, requestId: " << requestId); @@ -78,7 +116,7 @@ bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& targe if (targetNode) { LOG_D("Try to retry to another node, nodeId: " << *targetNode << ", requestId: " << requestId); auto anotherTarget = MakeKqpNodeServiceID(*targetNode); - TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.Release(), + TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, requestId, nullptr, ExecuterSpan.GetTraceId())); requestData.RetryNumber++; return true; @@ -93,46 +131,71 @@ bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& targe requestData.RetryNumber++; - TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.Release(), - requestData.flag, requestId, nullptr, ExecuterSpan.GetTraceId())); + TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.release(), + requestData.Flag, requestId, nullptr, ExecuterSpan.GetTraceId())); return true; } -void TKqpPlanner::ProcessTasksForDataExecuter() { - - long requestsCnt = 0; - - for (auto& [nodeId, tasks] : MainTasksPerNode) { +std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeRequest(const TRequestData& requestData) const { + auto result = std::make_unique<TEvKqpNode::TEvStartKqpTasksRequest>(); + auto& request = result->Record; + request.SetTxId(TxId); + ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); - auto& requestData = Requests.emplace_back(); + if (Deadline) { + TDuration timeout = Deadline - TAppData::TimeProvider->Now(); + request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); + } - requestData.request.SetTxId(TxId); - ActorIdToProto(ExecuterId, requestData.request.MutableExecuterActorId()); + bool enableLlvm = EnableLlvm; - if (Deadline) { - TDuration timeout = Deadline - TAppData::TimeProvider->Now(); - requestData.request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); + for (ui64 taskId : requestData.TaskIds) { + const auto& task = TasksGraph.GetTask(taskId); + auto serializedTask = SerializeTaskToProto(TasksGraph, task); + if (DisableLlvmForUdfStages && serializedTask.GetProgram().GetSettings().GetHasUdf()) { + enableLlvm = false; } + request.AddTasks()->Swap(&serializedTask); + } - requestData.request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); - requestData.request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); - requestData.request.MutableRuntimeSettings()->SetUseLLVM(false); - requestData.request.SetStartAllOrFail(true); + if (IsDataQuery) { + request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA); + request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); + request.MutableRuntimeSettings()->SetUseLLVM(false); + request.SetStartAllOrFail(true); + } else { + request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN); + request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); + request.MutableRuntimeSettings()->SetUseLLVM(enableLlvm); + request.SetStartAllOrFail(true); + } - for (auto&& task : tasks) { - requestData.request.AddTasks()->Swap(&task); - } + if (RlPath) { + auto rlPath = request.MutableRuntimeSettings()->MutableRlPath(); + rlPath->SetCoordinationNode(RlPath->GetCoordinationNode()); + rlPath->SetResourcePath(RlPath->GetResourcePath()); + rlPath->SetDatabase(Database); + if (UserToken) + rlPath->SetToken(UserToken->GetSerializedToken()); + } - auto target = MakeKqpNodeServiceID(nodeId); + if (Snapshot.IsValid()) { + request.MutableSnapshot()->SetTxId(Snapshot.TxId); + request.MutableSnapshot()->SetStep(Snapshot.Step); + } - requestData.flag = CalcSendMessageFlagsForNode(nodeId); - requestsCnt++; + return result; +} - SendStartKqpTasksRequest(Requests.size() - 1, target); +void TKqpPlanner::Submit() { + for (size_t reqId = 0; reqId < Requests.size(); ++reqId) { + ui64 nodeId = Requests[reqId].NodeId; + auto target = MakeKqpNodeServiceID(nodeId); + SendStartKqpTasksRequest(reqId, target); } if (ExecuterSpan) { - ExecuterSpan.Attribute("requestsCnt", requestsCnt); + ExecuterSpan.Attribute("requestsCnt", static_cast<long>(Requests.size())); } } @@ -148,7 +211,10 @@ ui32 TKqpPlanner::GetCurrentRetryDelay(ui32 requestId) { return requestData.CurrentDelay; } -void TKqpPlanner::ProcessTasksForScanExecuter() { +std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() { + if (ComputeTasks.empty()) + return nullptr; + PrepareToProcess(); auto localResources = GetKqpResourceManager()->GetLocalResources(); @@ -157,8 +223,12 @@ void TKqpPlanner::ProcessTasksForScanExecuter() { ResourceEstimations.size() <= localResources.ExecutionUnits && ResourceEstimations.size() <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT) { - RunLocal(ResourcesSnapshot); - return; + ui64 selfNodeId = ExecuterId.NodeId(); + for(ui64 taskId: ComputeTasks) { + TasksPerNode[selfNodeId].push_back(taskId); + } + + return nullptr; } if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) { @@ -166,8 +236,12 @@ void TKqpPlanner::ProcessTasksForScanExecuter() { if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && ResourceEstimations.size() <= localResources.ExecutionUnits) { - RunLocal(ResourcesSnapshot); - return; + ui64 localNodeId = ExecuterId.NodeId(); + for(ui64 taskId: ComputeTasks) { + TasksPerNode[localNodeId].push_back(taskId); + } + + return nullptr; } LOG_E("Not enough resources to execute query locally and no information about other nodes"); @@ -175,8 +249,7 @@ void TKqpPlanner::ProcessTasksForScanExecuter() { "Not enough resources to execute query locally and no information about other nodes (estimation: " + ToString(LocalRunMemoryEst) + ";" + GetEstimationsInfo() + ")"); - TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); - return; + return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()); } auto planner = CreateKqpGreedyPlanner(); @@ -193,55 +266,53 @@ void TKqpPlanner::ProcessTasksForScanExecuter() { auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations); - long requestsCnt = 0; + THashMap<ui64, ui64> alreadyAssigned; + for(auto& [nodeId, tasks] : TasksPerNode) { + for(ui64 taskId: tasks) { + alreadyAssigned.emplace(taskId, nodeId); + } + } if (!plan.empty()) { for (auto& group : plan) { - auto& requestData = Requests.emplace_back(); - PrepareKqpNodeRequest(requestData.request, THashSet<ui64>(group.TaskIds.begin(), group.TaskIds.end())); - AddScansToKqpNodeRequest(requestData.request, group.NodeId); - - auto target = MakeKqpNodeServiceID(group.NodeId); - requestData.flag = CalcSendMessageFlagsForNode(group.NodeId); - - SendStartKqpTasksRequest(Requests.size() - 1, target); - ++requestsCnt; - } - - TVector<ui64> nodes; - nodes.reserve(MainTasksPerNode.size()); - for (auto& [nodeId, _]: MainTasksPerNode) { - nodes.push_back(nodeId); + for(ui64 taskId: group.TaskIds) { + auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId); + if (success) { + TasksPerNode[group.NodeId].push_back(taskId); + } + } } - for (ui64 nodeId: nodes) { - auto& requestData = Requests.emplace_back(); - PrepareKqpNodeRequest(requestData.request, {}); - AddScansToKqpNodeRequest(requestData.request, nodeId); - - auto target = MakeKqpNodeServiceID(nodeId); - requestData.flag = CalcSendMessageFlagsForNode(nodeId); - LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); - SendStartKqpTasksRequest(Requests.size() - 1, target); - ++requestsCnt; - } - Y_VERIFY(MainTasksPerNode.empty()); - } else { + return nullptr; + } else { auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, "Not enough resources to execute query"); + return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()); + } +} - TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); +std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() { + auto err = AssignTasksToNodes(); + if (err) { + return err; } - if (ExecuterSpan) { - ExecuterSpan.Attribute("requestsCnt", requestsCnt); + for(auto& [nodeId, tasks] : TasksPerNode) { + SortUnique(tasks); + auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); + request.SerializedRequest = SerializeRequest(request); + auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks()); + if (ev != nullptr) { + return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release()); + } } + return nullptr; } TString TKqpPlanner::GetEstimationsInfo() const { TStringStream ss; ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:"; - if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { + if (auto it = TasksPerNode.find(ExecuterId.NodeId()); it != TasksPerNode.end()) { ss << it->second.size() << ";"; } else { ss << "0;"; @@ -253,7 +324,7 @@ void TKqpPlanner::PrepareToProcess() { auto rmConfig = GetKqpResourceManager()->GetConfig(); ui32 tasksCount = ComputeTasks.size(); - for (auto& [shardId, tasks] : MainTasksPerNode) { + for (auto& [shardId, tasks] : TasksPerNode) { tasksCount += tasks.size(); } @@ -261,147 +332,21 @@ void TKqpPlanner::PrepareToProcess() { LocalRunMemoryEst = 0; for (size_t i = 0; i < ComputeTasks.size(); ++i) { - EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i], ComputeTasks.size()); + BuildInitialTaskResources(TasksGraph, ComputeTasks[i], ResourceEstimations[i]); + EstimateTaskResources(rmConfig, ResourceEstimations[i], ComputeTasks.size()); LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit; } - if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { - for (size_t i = 0; i < it->second.size(); ++i) { - EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()], it->second.size()); - LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit; - } - } - Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; }); -} -ui64 TKqpPlanner::GetComputeTasksNumber() const { - return ComputeTasks.size(); -} - -ui64 TKqpPlanner::GetMainTasksNumber() const { - return MainTasksPerNode.size(); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -/// Local Execution -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) { - LOG_D("Execute query locally"); - - auto& requestData = Requests.emplace_back(); - PrepareKqpNodeRequest(requestData.request, {}); - AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId()); - - auto target = MakeKqpNodeServiceID(ExecuterId.NodeId()); - requestData.flag = CalcSendMessageFlagsForNode(ExecuterId.NodeId()); - LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); - SendStartKqpTasksRequest(Requests.size() - 1, target); - - long requestsCnt = 1; - - TVector<ui64> nodes; - for (const auto& pair: MainTasksPerNode) { - nodes.push_back(pair.first); - YQL_ENSURE(pair.first != ExecuterId.NodeId()); - } - - THashMap<ui64, size_t> nodeIdToIdx; - for (size_t idx = 0; idx < snapshot.size(); ++idx) { - nodeIdToIdx[snapshot[idx].nodeid()] = idx; - LOG_D("snapshot #" << idx << ": " << snapshot[idx].ShortDebugString()); - } - - for (auto nodeId: nodes) { - auto& requestData = Requests.emplace_back(); - PrepareKqpNodeRequest(requestData.request, {}); - AddScansToKqpNodeRequest(requestData.request, nodeId); - - auto target = MakeKqpNodeServiceID(nodeId); - requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); - SendStartKqpTasksRequest(Requests.size() - 1, target); - - requestsCnt++; - } - Y_VERIFY(MainTasksPerNode.size() == 0); - - if (ExecuterSpan) { - ExecuterSpan.Attribute("requestsCnt", requestsCnt); - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds) { - request.SetTxId(TxId); - ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); - - bool withLLVM = EnableLlvm; - - if (taskIds.empty()) { - for (auto& taskDesc : ComputeTasks) { - if (taskDesc.GetId()) { - if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { - withLLVM = false; - } - request.AddTasks()->Swap(&taskDesc); - } + ui32 currentEst = ComputeTasks.size(); + for(auto& [nodeId, tasks] : TasksPerNode) { + for (ui64 taskId: tasks) { + BuildInitialTaskResources(TasksGraph, taskId, ResourceEstimations[currentEst]); + EstimateTaskResources(rmConfig, ResourceEstimations[currentEst], tasks.size()); + LocalRunMemoryEst += ResourceEstimations[currentEst].TotalMemoryLimit; + ++currentEst; } - } else { - for (auto& taskDesc : ComputeTasks) { - if (taskDesc.GetId() && Find(taskIds, taskDesc.GetId()) != taskIds.end()) { - if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { - withLLVM = false; - } - request.AddTasks()->Swap(&taskDesc); - } - } - } - - if (Deadline) { - TDuration timeout = Deadline - TAppData::TimeProvider->Now(); - request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); - } - - request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN); - request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); - request.MutableRuntimeSettings()->SetUseLLVM(withLLVM); - request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling); - - if (RlPath) { - auto rlPath = request.MutableRuntimeSettings()->MutableRlPath(); - rlPath->SetCoordinationNode(RlPath->GetCoordinationNode()); - rlPath->SetResourcePath(RlPath->GetResourcePath()); - rlPath->SetDatabase(Database); - if (UserToken) - rlPath->SetToken(UserToken->GetSerializedToken()); - } - - request.SetStartAllOrFail(true); -} - -void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) { - if (!Snapshot.IsValid()) { - Y_ASSERT(MainTasksPerNode.size() == 0); - return; - } - - bool withLLVM = true; - if (auto nodeTasks = MainTasksPerNode.FindPtr(nodeId)) { - LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request"); - - request.MutableSnapshot()->SetTxId(Snapshot.TxId); - request.MutableSnapshot()->SetStep(Snapshot.Step); - - for (auto& task: *nodeTasks) { - if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) { - withLLVM = false; - } - request.AddTasks()->Swap(&task); - } - MainTasksPerNode.erase(nodeId); - } - - if (request.GetRuntimeSettings().GetUseLLVM()) { - request.MutableRuntimeSettings()->SetUseLLVM(withLLVM); } + Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; }); } ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { @@ -413,16 +358,17 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, + THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, - TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) + TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + bool isDataQuery) { - return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(mainTasksPerNode), snapshot, - database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, - std::move(resourcesSnapshot), executerRetriesConfig); + return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot, + database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, + std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 1149c67901d..1f5766eea86 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -18,45 +18,49 @@ namespace NKikimr::NKqp { class TKqpPlanner { - struct RequestData { - NKikimrKqp::TEvStartKqpTasksRequest request; - ui32 flag; + struct TRequestData { + TVector<ui64> TaskIds; + ui32 Flag; + ui64 NodeId; ui32 RetryNumber = 0; ui32 CurrentDelay = 0; + std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializedRequest; + + explicit TRequestData(TVector<ui64>&& taskIds, ui64 flag, ui64 nodeId) + : TaskIds(std::move(taskIds)) + , Flag(flag) + , NodeId(nodeId) + {} }; public: - TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, + TKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, + THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, - TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); - bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); - - void ProcessTasksForScanExecuter(); - void ProcessTasksForDataExecuter(); - - ui64 GetComputeTasksNumber() const; - ui64 GetMainTasksNumber() const; + TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + bool isDataQuery); + bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); + std::unique_ptr<IEventHandle> PlanExecution(); + std::unique_ptr<IEventHandle> AssignTasksToNodes(); + void Submit(); ui32 GetCurrentRetryDelay(ui32 requestId); + private: + void PrepareToProcess(); TString GetEstimationsInfo() const; - void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot); - - void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds); - void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId); - + std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData) const; ui32 CalcSendMessageFlagsForNode(ui32 nodeId); private: const ui64 TxId; const TActorId ExecuterId; - TVector<NYql::NDqProto::TDqTask> ComputeTasks; - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> MainTasksPerNode; + TVector<ui64> ComputeTasks; + THashMap<ui64, TVector<ui64>> TasksPerNode; const IKqpGateway::TKqpSnapshot Snapshot; TString Database; const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; @@ -72,15 +76,17 @@ private: const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig; ui64 LocalRunMemoryEst; TVector<TTaskResourceEstimation> ResourceEstimations; - TVector<RequestData> Requests; + TVector<TRequestData> Requests; + const TKqpTasksGraph& TasksGraph; + const bool IsDataQuery; }; -std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, + THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 079ee399e9d..078ac610bd5 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -572,30 +572,29 @@ private: } // NodeId -> {Tasks} - THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> scanTasks; + THashMap<ui64, TVector<ui64>> scanTasks; ui32 nShardScans = 0; ui32 nScanTasks = 0; - TVector<NYql::NDqProto::TDqTask> computeTasks; + TVector<ui64> computeTasks; InitializeChannelProxies(); for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task); if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) { // Task with source if (!task.Meta.Reads) { - scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); + scanTasks[task.Meta.NodeId].emplace_back(task.Id); nScanTasks++; continue; } if (stageInfo.Meta.IsSysView()) { - computeTasks.emplace_back(std::move(taskDesc)); + computeTasks.emplace_back(task.Id); } else { - scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); + scanTasks[task.Meta.NodeId].emplace_back(task.Id); nScanTasks++; } @@ -607,7 +606,7 @@ private: } } else { - computeTasks.emplace_back(std::move(taskDesc)); + computeTasks.emplace_back(task.Id); } } @@ -620,12 +619,6 @@ private: return; } - bool fitSize = AllOf(scanTasks, [this](const auto& x){ return ValidateTaskSize(x.second); }) - && ValidateTaskSize(computeTasks); - if (!fitSize) { - return; - } - if (prepareTasksSpan) { prepareTasksSpan.End(); } @@ -675,27 +668,32 @@ public: } private: - void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, + void ExecuteScanTx(TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode, TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { - LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), scanTasks.size()); - for (const auto& [_, tasks]: scanTasks) { + LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), tasksPerNode.size()); + for (const auto& [_, tasks]: tasksPerNode) { for (const auto& task : tasks) { - PendingComputeTasks.insert(task.GetId()); + PendingComputeTasks.insert(task); } } - for (auto& taskDesc : computeTasks) { - PendingComputeTasks.insert(taskDesc.GetId()); + for (auto& task : computeTasks) { + PendingComputeTasks.insert(task); } - Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks), - std::move(scanTasks), GetSnapshot(), + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks), + std::move(tasksPerNode), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, - Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig); - LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetMainTasksNumber()); + Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */); + LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size()); + auto err = Planner->PlanExecution(); + if (err) { + TlsActivationContext->Send(err.release()); + return; + } - Planner->ProcessTasksForScanExecuter(); + Planner->Submit(); } private: diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index ad3777350a7..bbcebbba257 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -89,6 +89,10 @@ struct TGraphMeta { IKqpGateway::TKqpSnapshot Snapshot; std::unordered_map<ui64, TActorId> ResultChannelProxies; TActorId ExecuterId; + + void SetSnapshot(ui64 step, ui64 txId) { + Snapshot = IKqpGateway::TKqpSnapshot(step, txId); + } }; struct TTaskInputMeta {}; diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp index ff279362f0f..7e339fa24f6 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp @@ -8,17 +8,24 @@ using namespace NKikimrConfig; TTaskResourceEstimation EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount) { - TTaskResourceEstimation ret; - EstimateTaskResources(task, config, ret, tasksCount); + TTaskResourceEstimation ret = BuildInitialTaskResources(task); + EstimateTaskResources(config, ret, tasksCount); return ret; } -void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config, - TTaskResourceEstimation& ret, const ui32 tasksCount) -{ +TTaskResourceEstimation BuildInitialTaskResources(const TDqTask& task) { + TTaskResourceEstimation ret; + const auto& opts = task.GetProgram().GetSettings(); ret.TaskId = task.GetId(); ret.ChannelBuffersCount += task.GetInputs().size() ? 1 : 0; ret.ChannelBuffersCount += task.GetOutputs().size() ? 1 : 0; + ret.HeavyProgram = opts.GetHasMapJoin(); + return ret; +} + +void EstimateTaskResources(const TTableServiceConfig::TResourceManager& config, + TTaskResourceEstimation& ret, const ui32 tasksCount) +{ ui64 channelBuffersSize = ret.ChannelBuffersCount * config.GetChannelBufferSize(); if (channelBuffersSize > config.GetMaxTotalChannelBuffersSize()) { @@ -28,8 +35,7 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso ret.ChannelBufferMemoryLimit = config.GetChannelBufferSize(); } - const auto& opts = task.GetProgram().GetSettings(); - if (/* opts.GetHasSort() || */opts.GetHasMapJoin()) { + if (ret.HeavyProgram) { ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit() / tasksCount; } else { ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit() / tasksCount; @@ -39,15 +45,4 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso + ret.MkqlProgramMemoryLimit; } -TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, - const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount) -{ - TVector<TTaskResourceEstimation> ret; - ret.resize(tasks.size()); - for (ui64 i = 0; i < tasks.size(); ++i) { - EstimateTaskResources(tasks[i], config, ret[i], tasksCount); - } - return ret; -} - } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.h b/ydb/core/kqp/rm_service/kqp_resource_estimation.h index 9abbf8dcc78..08c1d674f9a 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.h +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.h @@ -14,6 +14,7 @@ struct TTaskResourceEstimation { ui64 ChannelBufferMemoryLimit = 0; ui64 MkqlProgramMemoryLimit = 0; ui64 TotalMemoryLimit = 0; + bool HeavyProgram = false; TString ToString() const { return TStringBuilder() << "TaskResourceEstimation{" @@ -26,13 +27,11 @@ struct TTaskResourceEstimation { } }; +TTaskResourceEstimation BuildInitialTaskResources(const NYql::NDqProto::TDqTask& task); + TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); -void EstimateTaskResources(const NYql::NDqProto::TDqTask& task, - const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount); - -TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, - const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); +void EstimateTaskResources(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount); } // namespace NKikimr::NKqp |