diff options
| author | gvit <[email protected]> | 2023-04-13 17:36:32 +0300 | 
|---|---|---|
| committer | gvit <[email protected]> | 2023-04-13 17:36:32 +0300 | 
| commit | 782ed55b35b9805318a8d26ef040ff62a3875b42 (patch) | |
| tree | 45784522f847a67bcb6f75e6cd7b6f0dc83e22ef | |
| parent | 04a2ef84c2a459822321d96cfbac1ee25222c23c (diff) | |
refactor kqp planner code
start refactoring kqp node service
continue refactoring task serialization process
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 24 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 376 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 54 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 46 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 4 | ||||
| -rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation.cpp | 31 | ||||
| -rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation.h | 9 | 
8 files changed, 250 insertions, 298 deletions
| diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 903b156e7b5..e3b1f758207 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1901,7 +1901,7 @@ private:              return;          } -        TasksGraph.GetMeta().Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); +        SetSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());          ImmediateTx = true;          ContinueExecute(); @@ -2129,26 +2129,26 @@ private:              ExecuteDataComputeTask(std::move(taskDesc), shareMailbox);          } -        size_t remoteComputeTasksCnt = 0; -        THashMap<ui64, TVector<NDqProto::TDqTask>> tasksPerNode; +        THashMap<ui64, TVector<ui64>> tasksPerNode;          for (auto& [shardId, tasks] : RemoteComputeTasks) {              auto it = ShardIdToNodeId.find(shardId);              YQL_ENSURE(it != ShardIdToNodeId.end()); -              for (ui64 taskId : tasks) { -                const auto& task = TasksGraph.GetTask(taskId); -                remoteComputeTasksCnt += 1;                  PendingComputeTasks.insert(taskId); -                auto taskDesc = SerializeTaskToProto(TasksGraph, task); -                tasksPerNode[it->second].emplace_back(std::move(taskDesc)); +                tasksPerNode[it->second].emplace_back(taskId);              }          } -        Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), +        Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),              Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,              Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), -            ExecuterSpan, {}, ExecuterRetriesConfig); -        Planner->ProcessTasksForDataExecuter(); +            ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */); +        auto err = Planner->PlanExecution(); +        if (err) { +            TlsActivationContext->Send(err.release()); +            return; +        } +        Planner->Submit();          // then start data tasks with known actor ids of compute tasks          for (auto& [shardId, shardTx] : DatashardTxs) { @@ -2214,7 +2214,7 @@ private:              << ", topicTxs: " << Request.TopicOperations.GetSize()              << ", volatile: " << VolatileTx              << ", immediate: " << ImmediateTx -            << ", remote tasks" << remoteComputeTasksCnt +            << ", pending compute tasks" << PendingComputeTasks.size()              << ", useFollowers: " << UseFollowers);          LOG_T("Updating channels after the creation of compute actors"); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 4e269ba263b..0c53539739d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -952,6 +952,10 @@ protected:          return TasksGraph.GetMeta().Snapshot;      } +    void SetSnapshot(ui64 step, ui64 txId) { +        TasksGraph.GetMeta().SetSnapshot(step, txId); +    } +      IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) {          IActor* proxy; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index c8270bb628a..8c74f71486b 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -18,22 +18,52 @@ namespace NKikimr::NKqp {  using namespace NYql; +namespace { + +const ui64 MaxTaskSize = 48_MB; + +template <class TCollection> +std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TCollection& tasks) { +    for (const auto& task : tasks) { +        if (ui32 size = task.ByteSize(); size > MaxTaskSize) { +            LOG_E("Abort execution. Task #" << task.GetId() << " size is too big: " << size << " > " << MaxTaskSize); +            return std::make_unique<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::ABORTED, +                TStringBuilder() << "Datashard program size limit exceeded (" << size << " > " << MaxTaskSize << ")"); +        } +    } +    return nullptr; +} + +void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskResourceEstimation& ret) { +    const auto& task = graph.GetTask(taskId); +    const auto& stageInfo = graph.GetStageInfo(task.StageId); +    const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); +    const auto& opts = stage.GetProgram().GetSettings(); +    ret.TaskId = task.Id; +    ret.ChannelBuffersCount += task.Inputs.size() ? 1 : 0; +    ret.ChannelBuffersCount += task.Outputs.size() ? 1 : 0; +    ret.HeavyProgram = opts.GetHasMapJoin(); +} + +} +  // Task can allocate extra memory during execution.  // So, we estimate total memory amount required for task as apriori task size multiplied by this constant.  constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;  constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; -TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks, -    THashMap<ui64, TVector<NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, +TKqpPlanner::TKqpPlanner(const TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, TVector<ui64>&& computeTasks, +    THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,      const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,      const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,      bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,      TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, -    const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) +    const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, +    bool isDataQuery)      : TxId(txId)      , ExecuterId(executer)      , ComputeTasks(std::move(computeTasks)) -    , MainTasksPerNode(std::move(mainTasksPerNode)) +    , TasksPerNode(std::move(tasksPerNode))      , Snapshot(snapshot)      , Database(database)      , UserToken(userToken) @@ -46,6 +76,8 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::      , ResourcesSnapshot(std::move(resourcesSnapshot))      , ExecuterSpan(executerSpan)      , ExecuterRetriesConfig(executerRetriesConfig) +    , TasksGraph(graph) +    , IsDataQuery(isDataQuery)  {      if (!Database) {          // a piece of magic for tests @@ -57,14 +89,20 @@ TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::  }  bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) { +    YQL_ENSURE(requestId < Requests.size()); +      auto& requestData = Requests[requestId];      if (requestData.RetryNumber == ExecuterRetriesConfig.GetMaxRetryNumber() + 1) {          return false;      } -    auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>(); -    ev->Record = requestData.request; +    std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> ev; +    if (Y_LIKELY(requestData.SerializedRequest)) { +        ev.reset(requestData.SerializedRequest.release()); +    } else { +        ev = SerializeRequest(requestData); +    }      if (requestData.RetryNumber == ExecuterRetriesConfig.GetMaxRetryNumber()) {          LOG_E("Retry failed by retries limit, requestId: " << requestId); @@ -78,7 +116,7 @@ bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& targe          if (targetNode) {              LOG_D("Try to retry to another node, nodeId: " << *targetNode << ", requestId: " << requestId);              auto anotherTarget = MakeKqpNodeServiceID(*targetNode); -            TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.Release(), +            TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(anotherTarget, ExecuterId, ev.release(),                  IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, requestId,  nullptr, ExecuterSpan.GetTraceId()));              requestData.RetryNumber++;              return true; @@ -93,46 +131,71 @@ bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& targe      requestData.RetryNumber++; -    TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.Release(), -        requestData.flag, requestId,  nullptr, ExecuterSpan.GetTraceId())); +    TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(target, ExecuterId, ev.release(), +        requestData.Flag, requestId,  nullptr, ExecuterSpan.GetTraceId()));      return true;  } -void TKqpPlanner::ProcessTasksForDataExecuter() { - -    long requestsCnt = 0; - -    for (auto& [nodeId, tasks] : MainTasksPerNode) { +std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeRequest(const TRequestData& requestData) const { +    auto result = std::make_unique<TEvKqpNode::TEvStartKqpTasksRequest>(); +    auto& request = result->Record; +    request.SetTxId(TxId); +    ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); -        auto& requestData = Requests.emplace_back(); +    if (Deadline) { +        TDuration timeout = Deadline - TAppData::TimeProvider->Now(); +        request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); +    } -        requestData.request.SetTxId(TxId); -        ActorIdToProto(ExecuterId, requestData.request.MutableExecuterActorId()); +    bool enableLlvm = EnableLlvm; -        if (Deadline) { -            TDuration timeout = Deadline - TAppData::TimeProvider->Now(); -            requestData.request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); +    for (ui64 taskId : requestData.TaskIds) { +        const auto& task = TasksGraph.GetTask(taskId); +        auto serializedTask = SerializeTaskToProto(TasksGraph, task); +        if (DisableLlvmForUdfStages && serializedTask.GetProgram().GetSettings().GetHasUdf()) { +            enableLlvm = false;          } +        request.AddTasks()->Swap(&serializedTask); +    } -        requestData.request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); -        requestData.request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); -        requestData.request.MutableRuntimeSettings()->SetUseLLVM(false); -        requestData.request.SetStartAllOrFail(true); +    if (IsDataQuery) { +        request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA); +        request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); +        request.MutableRuntimeSettings()->SetUseLLVM(false); +        request.SetStartAllOrFail(true); +    } else { +        request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN); +        request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); +        request.MutableRuntimeSettings()->SetUseLLVM(enableLlvm); +        request.SetStartAllOrFail(true); +    } -        for (auto&& task : tasks) { -            requestData.request.AddTasks()->Swap(&task); -        } +    if (RlPath) { +        auto rlPath = request.MutableRuntimeSettings()->MutableRlPath(); +        rlPath->SetCoordinationNode(RlPath->GetCoordinationNode()); +        rlPath->SetResourcePath(RlPath->GetResourcePath()); +        rlPath->SetDatabase(Database); +        if (UserToken) +            rlPath->SetToken(UserToken->GetSerializedToken()); +    } -        auto target = MakeKqpNodeServiceID(nodeId); +    if (Snapshot.IsValid()) { +        request.MutableSnapshot()->SetTxId(Snapshot.TxId); +        request.MutableSnapshot()->SetStep(Snapshot.Step); +    } -        requestData.flag = CalcSendMessageFlagsForNode(nodeId); -        requestsCnt++; +    return result; +} -        SendStartKqpTasksRequest(Requests.size() - 1, target); +void TKqpPlanner::Submit() { +    for (size_t reqId = 0; reqId < Requests.size(); ++reqId) { +        ui64 nodeId = Requests[reqId].NodeId; +        auto target = MakeKqpNodeServiceID(nodeId); +        SendStartKqpTasksRequest(reqId, target);      }      if (ExecuterSpan) { -        ExecuterSpan.Attribute("requestsCnt", requestsCnt); +        ExecuterSpan.Attribute("requestsCnt", static_cast<long>(Requests.size()));      }  } @@ -148,7 +211,10 @@ ui32 TKqpPlanner::GetCurrentRetryDelay(ui32 requestId) {      return requestData.CurrentDelay;  } -void TKqpPlanner::ProcessTasksForScanExecuter() { +std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() { +    if (ComputeTasks.empty()) +        return nullptr; +      PrepareToProcess();      auto localResources = GetKqpResourceManager()->GetLocalResources(); @@ -157,8 +223,12 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {          ResourceEstimations.size() <= localResources.ExecutionUnits &&          ResourceEstimations.size() <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT)      { -        RunLocal(ResourcesSnapshot); -        return; +        ui64 selfNodeId = ExecuterId.NodeId(); +        for(ui64 taskId: ComputeTasks) { +            TasksPerNode[selfNodeId].push_back(taskId); +        } + +        return nullptr;      }      if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) { @@ -166,8 +236,12 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {          if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&              ResourceEstimations.size() <= localResources.ExecutionUnits)          { -            RunLocal(ResourcesSnapshot); -            return; +            ui64 localNodeId = ExecuterId.NodeId(); +            for(ui64 taskId: ComputeTasks) { +                TasksPerNode[localNodeId].push_back(taskId); +            } + +            return nullptr;          }          LOG_E("Not enough resources to execute query locally and no information about other nodes"); @@ -175,8 +249,7 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {              "Not enough resources to execute query locally and no information about other nodes (estimation: "              + ToString(LocalRunMemoryEst) + ";" + GetEstimationsInfo() + ")"); -        TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); -        return; +        return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());      }      auto planner = CreateKqpGreedyPlanner(); @@ -193,55 +266,53 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {      auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations); -    long requestsCnt = 0; +    THashMap<ui64, ui64> alreadyAssigned; +    for(auto& [nodeId, tasks] : TasksPerNode) {  +        for(ui64 taskId: tasks) { +            alreadyAssigned.emplace(taskId, nodeId); +        } +    }      if (!plan.empty()) {          for (auto& group : plan) { -            auto& requestData = Requests.emplace_back(); -            PrepareKqpNodeRequest(requestData.request, THashSet<ui64>(group.TaskIds.begin(), group.TaskIds.end())); -            AddScansToKqpNodeRequest(requestData.request, group.NodeId); - -            auto target = MakeKqpNodeServiceID(group.NodeId); -            requestData.flag = CalcSendMessageFlagsForNode(group.NodeId); - -            SendStartKqpTasksRequest(Requests.size() - 1, target); -            ++requestsCnt; -        } - -        TVector<ui64> nodes; -        nodes.reserve(MainTasksPerNode.size()); -        for (auto& [nodeId, _]: MainTasksPerNode) { -            nodes.push_back(nodeId); +            for(ui64 taskId: group.TaskIds) { +                auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId); +                if (success) { +                    TasksPerNode[group.NodeId].push_back(taskId); +                } +            }          } -        for (ui64 nodeId: nodes) { -            auto& requestData = Requests.emplace_back(); -            PrepareKqpNodeRequest(requestData.request, {}); -            AddScansToKqpNodeRequest(requestData.request, nodeId); - -            auto target = MakeKqpNodeServiceID(nodeId); -            requestData.flag = CalcSendMessageFlagsForNode(nodeId); -            LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); -            SendStartKqpTasksRequest(Requests.size() - 1, target); -            ++requestsCnt; -        } -        Y_VERIFY(MainTasksPerNode.empty()); -    } else { +        return nullptr; +    }  else {          auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,              "Not enough resources to execute query"); +        return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()); +    } +} -        TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); +std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() { +    auto err = AssignTasksToNodes(); +    if (err) { +        return err;      } -    if (ExecuterSpan) { -        ExecuterSpan.Attribute("requestsCnt", requestsCnt); +    for(auto& [nodeId, tasks] : TasksPerNode) { +        SortUnique(tasks); +        auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); +        request.SerializedRequest = SerializeRequest(request); +        auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks()); +        if (ev != nullptr) { +            return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release()); +        }      } +    return nullptr;  }  TString TKqpPlanner::GetEstimationsInfo() const {      TStringStream ss;      ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:"; -    if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { +    if (auto it = TasksPerNode.find(ExecuterId.NodeId()); it != TasksPerNode.end()) {          ss << it->second.size() << ";";      } else {          ss << "0;"; @@ -253,7 +324,7 @@ void TKqpPlanner::PrepareToProcess() {      auto rmConfig = GetKqpResourceManager()->GetConfig();      ui32 tasksCount = ComputeTasks.size(); -    for (auto& [shardId, tasks] : MainTasksPerNode) { +    for (auto& [shardId, tasks] : TasksPerNode) {          tasksCount += tasks.size();      } @@ -261,147 +332,21 @@ void TKqpPlanner::PrepareToProcess() {      LocalRunMemoryEst = 0;      for (size_t i = 0; i < ComputeTasks.size(); ++i) { -        EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i], ComputeTasks.size()); +        BuildInitialTaskResources(TasksGraph, ComputeTasks[i], ResourceEstimations[i]); +        EstimateTaskResources(rmConfig, ResourceEstimations[i], ComputeTasks.size());          LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit;      } -    if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { -        for (size_t i = 0; i < it->second.size(); ++i) { -            EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()], it->second.size()); -            LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit; -        } -    } -    Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; }); -} -ui64 TKqpPlanner::GetComputeTasksNumber() const { -    return ComputeTasks.size(); -} - -ui64 TKqpPlanner::GetMainTasksNumber() const { -    return MainTasksPerNode.size(); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -/// Local Execution -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot) { -    LOG_D("Execute query locally"); - -    auto& requestData = Requests.emplace_back(); -    PrepareKqpNodeRequest(requestData.request, {}); -    AddScansToKqpNodeRequest(requestData.request, ExecuterId.NodeId()); - -    auto target = MakeKqpNodeServiceID(ExecuterId.NodeId()); -    requestData.flag = CalcSendMessageFlagsForNode(ExecuterId.NodeId()); -    LOG_D("Send request to kqpnode: " << target << ", node_id: " << ExecuterId.NodeId() << ", TxId: " << TxId); -    SendStartKqpTasksRequest(Requests.size() - 1, target); - -    long requestsCnt = 1; - -    TVector<ui64> nodes; -    for (const auto& pair: MainTasksPerNode) { -        nodes.push_back(pair.first); -        YQL_ENSURE(pair.first != ExecuterId.NodeId()); -    } - -    THashMap<ui64, size_t> nodeIdToIdx; -    for (size_t idx = 0; idx < snapshot.size(); ++idx) { -        nodeIdToIdx[snapshot[idx].nodeid()] = idx; -        LOG_D("snapshot #" << idx << ": " << snapshot[idx].ShortDebugString()); -    } - -    for (auto nodeId: nodes) { -        auto& requestData = Requests.emplace_back(); -        PrepareKqpNodeRequest(requestData.request, {}); -        AddScansToKqpNodeRequest(requestData.request, nodeId); - -        auto target = MakeKqpNodeServiceID(nodeId); -        requestData.flag = CalcSendMessageFlagsForNode(target.NodeId()); -        SendStartKqpTasksRequest(Requests.size() - 1, target); - -        requestsCnt++; -    } -    Y_VERIFY(MainTasksPerNode.size() == 0); - -    if (ExecuterSpan) { -        ExecuterSpan.Attribute("requestsCnt", requestsCnt); -    } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds) { -    request.SetTxId(TxId); -    ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); - -    bool withLLVM = EnableLlvm; - -    if (taskIds.empty()) { -        for (auto& taskDesc : ComputeTasks) { -            if (taskDesc.GetId()) { -                if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { -                    withLLVM = false; -                } -                request.AddTasks()->Swap(&taskDesc); -            } +    ui32 currentEst = ComputeTasks.size(); +    for(auto& [nodeId, tasks] : TasksPerNode) { +        for (ui64 taskId: tasks) { +            BuildInitialTaskResources(TasksGraph, taskId, ResourceEstimations[currentEst]); +            EstimateTaskResources(rmConfig, ResourceEstimations[currentEst], tasks.size()); +            LocalRunMemoryEst += ResourceEstimations[currentEst].TotalMemoryLimit; +            ++currentEst;          } -    } else { -        for (auto& taskDesc : ComputeTasks) { -            if (taskDesc.GetId() && Find(taskIds, taskDesc.GetId()) != taskIds.end()) { -                if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { -                    withLLVM = false; -                } -                request.AddTasks()->Swap(&taskDesc); -            } -        } -    } - -    if (Deadline) { -        TDuration timeout = Deadline - TAppData::TimeProvider->Now(); -        request.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); -    } - -    request.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::SCAN); -    request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode)); -    request.MutableRuntimeSettings()->SetUseLLVM(withLLVM); -    request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling); - -    if (RlPath) { -        auto rlPath = request.MutableRuntimeSettings()->MutableRlPath(); -        rlPath->SetCoordinationNode(RlPath->GetCoordinationNode()); -        rlPath->SetResourcePath(RlPath->GetResourcePath()); -        rlPath->SetDatabase(Database); -        if (UserToken) -            rlPath->SetToken(UserToken->GetSerializedToken()); -    } - -    request.SetStartAllOrFail(true); -} - -void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId) { -    if (!Snapshot.IsValid()) { -        Y_ASSERT(MainTasksPerNode.size() == 0); -        return; -    } - -    bool withLLVM = true; -    if (auto nodeTasks = MainTasksPerNode.FindPtr(nodeId)) { -        LOG_D("Adding " << nodeTasks->size() << " scans to KqpNode request"); - -        request.MutableSnapshot()->SetTxId(Snapshot.TxId); -        request.MutableSnapshot()->SetStep(Snapshot.Step); - -        for (auto& task: *nodeTasks) { -            if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) { -                withLLVM = false; -            } -            request.AddTasks()->Swap(&task); -        } -        MainTasksPerNode.erase(nodeId); -    } - -    if (request.GetRuntimeSettings().GetUseLLVM()) { -        request.MutableRuntimeSettings()->SetUseLLVM(withLLVM);      } +    Sort(ResourceEstimations, [](const auto& l, const auto& r) { return l.TotalMemoryLimit > r.TotalMemoryLimit; });  }  ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { @@ -413,16 +358,17 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {  }  //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, -    THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, +    THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,      const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,      const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,      bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, -    TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig) +    TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, +    bool isDataQuery)  { -    return std::make_unique<TKqpPlanner>(txId, executer, std::move(tasks), std::move(mainTasksPerNode), snapshot, -        database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan,  -        std::move(resourcesSnapshot), executerRetriesConfig); +    return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot, +        database, userToken, deadline, statsMode, disableLlvmForUdfStages, enableLlvm, withSpilling, rlPath, executerSpan, +        std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery);  }  } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 1149c67901d..1f5766eea86 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -18,45 +18,49 @@ namespace NKikimr::NKqp {  class TKqpPlanner { -    struct RequestData { -        NKikimrKqp::TEvStartKqpTasksRequest request; -        ui32 flag; +    struct TRequestData { +        TVector<ui64> TaskIds; +        ui32 Flag; +        ui64 NodeId;          ui32 RetryNumber = 0;          ui32 CurrentDelay = 0; +        std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializedRequest; + +        explicit TRequestData(TVector<ui64>&& taskIds, ui64 flag, ui64 nodeId) +            : TaskIds(std::move(taskIds)) +            , Flag(flag) +            , NodeId(nodeId) +        {}      };  public: -    TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, -        THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, +    TKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, +        THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,          const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,          const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages,          bool enableLlvm, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, -        TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); -    bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); - -    void ProcessTasksForScanExecuter(); -    void ProcessTasksForDataExecuter(); - -    ui64 GetComputeTasksNumber() const; -    ui64 GetMainTasksNumber() const; +        TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, +        bool isDataQuery); +    bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); +    std::unique_ptr<IEventHandle> PlanExecution(); +    std::unique_ptr<IEventHandle> AssignTasksToNodes(); +    void Submit();      ui32 GetCurrentRetryDelay(ui32 requestId); +  private: +      void PrepareToProcess();      TString GetEstimationsInfo() const; -    void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot); - -    void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds); -    void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId); - +    std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData) const;      ui32 CalcSendMessageFlagsForNode(ui32 nodeId);  private:      const ui64 TxId;      const TActorId ExecuterId; -    TVector<NYql::NDqProto::TDqTask> ComputeTasks; -    THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> MainTasksPerNode; +    TVector<ui64> ComputeTasks; +    THashMap<ui64, TVector<ui64>> TasksPerNode;      const IKqpGateway::TKqpSnapshot Snapshot;      TString Database;      const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; @@ -72,15 +76,17 @@ private:      const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;      ui64 LocalRunMemoryEst;      TVector<TTaskResourceEstimation> ResourceEstimations; -    TVector<RequestData> Requests; +    TVector<TRequestData> Requests; +    const TKqpTasksGraph& TasksGraph; +    const bool IsDataQuery;  }; -std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, -    THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(const TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, +    THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,      const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,      const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool disableLlvmForUdfStages, bool enableLlvm,      bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,      TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, -    const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig); +    const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery);  } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 079ee399e9d..078ac610bd5 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -572,30 +572,29 @@ private:          }          // NodeId -> {Tasks} -        THashMap<ui64, TVector<NYql::NDqProto::TDqTask>> scanTasks; +        THashMap<ui64, TVector<ui64>> scanTasks;          ui32 nShardScans = 0;          ui32 nScanTasks = 0; -        TVector<NYql::NDqProto::TDqTask> computeTasks; +        TVector<ui64> computeTasks;          InitializeChannelProxies();          for (auto& task : TasksGraph.GetTasks()) {              auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); -            NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task);              if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {                  // Task with source                  if (!task.Meta.Reads) { -                    scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); +                    scanTasks[task.Meta.NodeId].emplace_back(task.Id);                      nScanTasks++;                      continue;                  }                  if (stageInfo.Meta.IsSysView()) { -                    computeTasks.emplace_back(std::move(taskDesc)); +                    computeTasks.emplace_back(task.Id);                  } else { -                    scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); +                    scanTasks[task.Meta.NodeId].emplace_back(task.Id);                      nScanTasks++;                  } @@ -607,7 +606,7 @@ private:                  }              } else { -                computeTasks.emplace_back(std::move(taskDesc)); +                computeTasks.emplace_back(task.Id);              }          } @@ -620,12 +619,6 @@ private:              return;          } -        bool fitSize = AllOf(scanTasks, [this](const auto& x){ return ValidateTaskSize(x.second); }) -                    && ValidateTaskSize(computeTasks); -        if (!fitSize) { -            return; -        } -          if (prepareTasksSpan) {              prepareTasksSpan.End();          } @@ -675,27 +668,32 @@ public:      }  private: -    void ExecuteScanTx(TVector<NYql::NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& scanTasks, +    void ExecuteScanTx(TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode,          TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { -        LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), scanTasks.size()); -        for (const auto& [_, tasks]: scanTasks) { +        LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), tasksPerNode.size()); +        for (const auto& [_, tasks]: tasksPerNode) {              for (const auto& task : tasks) { -                PendingComputeTasks.insert(task.GetId()); +                PendingComputeTasks.insert(task);              }          } -        for (auto& taskDesc : computeTasks) { -            PendingComputeTasks.insert(taskDesc.GetId()); +        for (auto& task : computeTasks) { +            PendingComputeTasks.insert(task);          } -        Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks), -            std::move(scanTasks), GetSnapshot(), +        Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks), +            std::move(tasksPerNode), GetSnapshot(),              Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,              Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, -            Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig); -        LOG_D("Execute scan tx, computeTasks: " << Planner->GetComputeTasksNumber() << ", scanTasks: " << Planner->GetMainTasksNumber()); +            Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */); +        LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size()); +        auto err = Planner->PlanExecution(); +        if (err) { +            TlsActivationContext->Send(err.release()); +            return; +        } -        Planner->ProcessTasksForScanExecuter(); +        Planner->Submit();      }  private: diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index ad3777350a7..bbcebbba257 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -89,6 +89,10 @@ struct TGraphMeta {      IKqpGateway::TKqpSnapshot Snapshot;      std::unordered_map<ui64, TActorId> ResultChannelProxies;      TActorId ExecuterId; + +    void SetSnapshot(ui64 step, ui64 txId) { +        Snapshot = IKqpGateway::TKqpSnapshot(step, txId); +    }  };  struct TTaskInputMeta {}; diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp index ff279362f0f..7e339fa24f6 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp @@ -8,17 +8,24 @@ using namespace NKikimrConfig;  TTaskResourceEstimation EstimateTaskResources(const TDqTask& task,      const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount)  { -    TTaskResourceEstimation ret; -    EstimateTaskResources(task, config, ret, tasksCount); +    TTaskResourceEstimation ret = BuildInitialTaskResources(task); +    EstimateTaskResources(config, ret, tasksCount);      return ret;  } -void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config, -    TTaskResourceEstimation& ret, const ui32 tasksCount) -{ +TTaskResourceEstimation BuildInitialTaskResources(const TDqTask& task) { +    TTaskResourceEstimation ret; +    const auto& opts = task.GetProgram().GetSettings();      ret.TaskId = task.GetId();      ret.ChannelBuffersCount += task.GetInputs().size() ? 1 : 0;      ret.ChannelBuffersCount += task.GetOutputs().size() ? 1 : 0; +    ret.HeavyProgram = opts.GetHasMapJoin(); +    return ret; +} + +void EstimateTaskResources(const TTableServiceConfig::TResourceManager& config, +    TTaskResourceEstimation& ret, const ui32 tasksCount) +{      ui64 channelBuffersSize = ret.ChannelBuffersCount * config.GetChannelBufferSize();      if (channelBuffersSize > config.GetMaxTotalChannelBuffersSize()) { @@ -28,8 +35,7 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso          ret.ChannelBufferMemoryLimit = config.GetChannelBufferSize();      } -    const auto& opts = task.GetProgram().GetSettings(); -    if (/* opts.GetHasSort() || */opts.GetHasMapJoin()) { +    if (ret.HeavyProgram) {          ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit() / tasksCount;      } else {          ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit() / tasksCount; @@ -39,15 +45,4 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso          + ret.MkqlProgramMemoryLimit;  } -TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, -    const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount) -{ -    TVector<TTaskResourceEstimation> ret; -    ret.resize(tasks.size()); -    for (ui64 i = 0; i < tasks.size(); ++i) { -        EstimateTaskResources(tasks[i], config, ret[i], tasksCount); -    } -    return ret; -} -  } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.h b/ydb/core/kqp/rm_service/kqp_resource_estimation.h index 9abbf8dcc78..08c1d674f9a 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.h +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.h @@ -14,6 +14,7 @@ struct TTaskResourceEstimation {      ui64 ChannelBufferMemoryLimit = 0;      ui64 MkqlProgramMemoryLimit = 0;      ui64 TotalMemoryLimit = 0; +    bool HeavyProgram = false;      TString ToString() const {          return TStringBuilder() << "TaskResourceEstimation{" @@ -26,13 +27,11 @@ struct TTaskResourceEstimation {      }  }; +TTaskResourceEstimation BuildInitialTaskResources(const NYql::NDqProto::TDqTask& task); +  TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task,      const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); -void EstimateTaskResources(const NYql::NDqProto::TDqTask& task, -    const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount); - -TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, -    const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); +void EstimateTaskResources(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount);  } // namespace NKikimr::NKqp | 
