diff options
author | makostrov <makostrov@yandex-team.com> | 2023-07-19 14:13:25 +0300 |
---|---|---|
committer | makostrov <makostrov@yandex-team.com> | 2023-07-19 14:13:25 +0300 |
commit | 29ea4eeb548a98f60ebf35a20a38a10229628da0 (patch) | |
tree | 569052e6fd6ace4ddc2fcb196ec318f9411ba294 | |
parent | b03dce345d49158a866df832fc0e4cd257893146 (diff) | |
download | ydb-29ea4eeb548a98f60ebf35a20a38a10229628da0.tar.gz |
kqp delegate tasks execution to planner KIKIMR-18631
KIKIMR-18631
Fix TTaskType
Add Compute Tasks to Planner correctly
Add optimization: execute locally scan tasks at one Node
fix code
Move on
Fix Stats: Affected shards
Shift calculation of scan tasks to Planner
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 99 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 150 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 147 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 31 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 77 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 8 |
6 files changed, 306 insertions, 206 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 31920b9cb0..0e023ec038 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -163,10 +163,12 @@ public: if (IsDebugLogEnabled()) { auto sb = TStringBuilder() << "ActorState: " << CurrentStateFuncName() - << ", waiting for " << PendingComputeActors.size() << " compute actor(s) and " + << ", waiting for " << (Planner ? Planner->GetPendingComputeActors().size() : 0) << " compute actor(s) and " << notFinished << " datashard(s): "; - for (const auto& shardId : PendingComputeActors) { - sb << "CA " << shardId.first << ", "; + if (Planner) { + for (const auto& shardId : Planner->GetPendingComputeActors()) { + sb << "CA " << shardId.first << ", "; + } } for (const auto& [shardId, shardState] : ShardStates) { if (shardState.State != TShardState::EState::Finished) { @@ -1343,6 +1345,7 @@ private: return TasksGraph.GetTask(it->second); } auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.Type = TTaskMeta::TTaskType::DataShard; task.Meta.ExecuterId = SelfId(); task.Meta.ShardId = shardId; shardTasks.emplace(shardId, task.Id); @@ -1527,6 +1530,7 @@ private: for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.ExecuterId = SelfId(); + task.Meta.Type = TTaskMeta::TTaskType::Compute; LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id); } } @@ -1623,44 +1627,6 @@ private: YQL_ENSURE(result.second); } - void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) { - auto& task = TasksGraph.GetTask(taskId); - NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task); - - TComputeRuntimeSettings settings; - if (Deadline) { - settings.Timeout = *Deadline - TAppData::TimeProvider->Now(); - } - //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery; - settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified; - settings.FailOnUndelivery = true; - settings.StatsMode = GetDqStatsMode(Request.StatsMode); - settings.UseSpilling = false; - - TComputeMemoryLimits limits; - limits.ChannelBufferSize = 50_MB; - limits.MkqlLightProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(500_MB, Request.MkqlMemoryLimit) : 500_MB; - limits.MkqlHeavyProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(2_GB, Request.MkqlMemoryLimit) : 2_GB; - - auto& taskOpts = taskDesc->GetProgram().GetSettings(); - auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/ - ? limits.MkqlHeavyProgramMemoryLimit - : limits.MkqlLightProgramMemoryLimit; - - limits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(limit, limit); - - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, taskDesc, AsyncIoFactory, - AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); - - auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor); - task.ComputeActorId = computeActorId; - - LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); - - auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat()); - YQL_ENSURE(result.second); - } - void Execute() { NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); @@ -2128,43 +2094,27 @@ private: NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size()); - // first, start compute tasks - bool shareMailbox = (ComputeTasks.size() <= 1); - for (ui64 taskId : ComputeTasks) { - ExecuteDataComputeTask(taskId, shareMailbox); + for (auto& [shardId, tasks] : RemoteComputeTasks) { + auto it = ShardIdToNodeId.find(shardId); + YQL_ENSURE(it != ShardIdToNodeId.end()); + for (ui64 taskId : tasks) { + auto& task = TasksGraph.GetTask(taskId); + task.Meta.NodeId = it->second; + } } - if (ComputeTasks.size() == 0 && RemoteComputeTasks.size() == 1 && !UnknownAffectedShardCount) { - // query affects a single key or shard, so it might be more effective - // to execute this task locally so we can avoid useless overhead for remote task launching. - for(auto& [shardId, tasks]: RemoteComputeTasks) { - for(ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, true); - } - } + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, {}, GetSnapshot(), + Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(), + ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, !UnknownAffectedShardCount); - } else { - THashMap<ui64, TVector<ui64>> tasksPerNode; - for (auto& [shardId, tasks] : RemoteComputeTasks) { - auto it = ShardIdToNodeId.find(shardId); - YQL_ENSURE(it != ShardIdToNodeId.end()); - for (ui64 taskId : tasks) { - PendingComputeTasks.insert(taskId); - tasksPerNode[it->second].emplace_back(taskId); - } - } - - Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), - Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(), - ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */); - auto err = Planner->PlanExecution(); - if (err) { - TlsActivationContext->Send(err.release()); - return; - } - Planner->Submit(); + auto err = Planner->PlanExecution(); + if (err) { + TlsActivationContext->Send(err.release()); + return; } + Planner->Submit(); + // then start data tasks with known actor ids of compute tasks for (auto& [shardId, shardTx] : DatashardTxs) { shardTx->SetType(NKikimrTxDataShard::KQP_TX_TYPE_DATA); @@ -2229,9 +2179,10 @@ private: << ", topicTxs: " << Request.TopicOperations.GetSize() << ", volatile: " << VolatileTx << ", immediate: " << ImmediateTx - << ", pending compute tasks" << PendingComputeTasks.size() + << ", pending compute tasks" << (Planner ? Planner->GetPendingComputeTasks().size() : 0) << ", useFollowers: " << GetUseFollowers()); + // error LOG_T("Updating channels after the creation of compute actors"); THashMap<TActorId, THashSet<ui64>> updates; for (ui64 taskId : ComputeTasks) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index bcb554a144..7fa3567aeb 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -245,25 +245,27 @@ protected: case NYql::NDqProto::COMPUTE_STATE_EXECUTING: { // initial TEvState event from Compute Actor // there can be race with RM answer - if (PendingComputeTasks.erase(taskId)) { - auto it = PendingComputeActors.emplace(computeActor, TProgressStat()); - YQL_ENSURE(it.second); + if (Planner) { + if (Planner->GetPendingComputeTasks().erase(taskId)) { + auto it = Planner->GetPendingComputeActors().emplace(computeActor, TProgressStat()); + YQL_ENSURE(it.second); - if (state.HasStats()) { - it.first->second.Set(state.GetStats()); - } - - auto& task = TasksGraph.GetTask(taskId); - task.ComputeActorId = computeActor; - - THashMap<TActorId, THashSet<ui64>> updates; - CollectTaskChannelsUpdates(task, updates); - PropagateChannelsUpdates(updates); - } else { - auto it = PendingComputeActors.find(computeActor); - if (it != PendingComputeActors.end()) { if (state.HasStats()) { - it->second.Set(state.GetStats()); + it.first->second.Set(state.GetStats()); + } + + auto& task = TasksGraph.GetTask(taskId); + task.ComputeActorId = computeActor; + + THashMap<TActorId, THashSet<ui64>> updates; + CollectTaskChannelsUpdates(task, updates); + PropagateChannelsUpdates(updates); + } else { + auto it = Planner->GetPendingComputeActors().find(computeActor); + if (it != Planner->GetPendingComputeActors().end()) { + if (state.HasStats()) { + it->second.Set(state.GetStats()); + } } } } @@ -279,26 +281,28 @@ protected: LastTaskId = taskId; LastComputeActorId = computeActor.ToString(); - auto it = PendingComputeActors.find(computeActor); - if (it == PendingComputeActors.end()) { - LOG_W("Got execution state for compute actor: " << computeActor - << ", task: " << taskId - << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()) - << ", too early (waiting reply from RM)"); - - if (PendingComputeTasks.erase(taskId)) { - LOG_E("Got execution state for compute actor: " << computeActor - << ", for unknown task: " << state.GetTaskId() - << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())); - return; - } - } else { - if (state.HasStats()) { - it->second.Set(state.GetStats()); + if (Planner) { + auto it = Planner->GetPendingComputeActors().find(computeActor); + if (it == Planner->GetPendingComputeActors().end()) { + LOG_W("Got execution state for compute actor: " << computeActor + << ", task: " << taskId + << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()) + << ", too early (waiting reply from RM)"); + + if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) { + LOG_E("Got execution state for compute actor: " << computeActor + << ", for unknown task: " << state.GetTaskId() + << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())); + return; + } + } else { + if (state.HasStats()) { + it->second.Set(state.GetStats()); + } + LastStats.emplace_back(std::move(it->second)); + Planner->GetPendingComputeActors().erase(it); + YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end()); } - LastStats.emplace_back(std::move(it->second)); - PendingComputeActors.erase(it); - YQL_ENSURE(PendingComputeTasks.find(taskId) == PendingComputeTasks.end()); } } } @@ -371,7 +375,7 @@ protected: protected: bool CheckExecutionComplete() { - if (PendingComputeActors.empty() && PendingComputeTasks.empty()) { + if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) { static_cast<TDerived*>(this)->Finalize(); UpdateResourcesUsage(true); return true; @@ -382,11 +386,13 @@ protected: if (IsDebugLogEnabled()) { TStringBuilder sb; sb << "Waiting for: "; - for (auto ct : PendingComputeTasks) { - sb << "CT " << ct << ", "; - } - for (auto ca : PendingComputeActors) { - sb << "CA " << ca.first << ", "; + if (Planner) { + for (auto ct : Planner->GetPendingComputeTasks()) { + sb << "CT " << ct << ", "; + } + for (auto ca : Planner->GetPendingComputeActors()) { + sb << "CA " << ca.first << ", "; + } } LOG_D(sb); } @@ -434,15 +440,17 @@ protected: auto nodeId = ev->Get()->NodeId; LOG_N("Disconnected node " << nodeId); - for (auto computeActor : PendingComputeActors) { - if (computeActor.first.NodeId() == nodeId) { - return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost."); + if (Planner) { + for (auto computeActor : Planner->GetPendingComputeActors()) { + if (computeActor.first.NodeId() == nodeId) { + return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost."); + } } - } - for (auto& task : TasksGraph.GetTasks()) { - if (task.Meta.NodeId == nodeId && PendingComputeTasks.contains(task.Id)) { - return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost."); + for (auto& task : TasksGraph.GetTasks()) { + if (task.Meta.NodeId == nodeId && Planner->GetPendingComputeTasks().contains(task.Id)) { + return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost."); + } } } } @@ -502,13 +510,15 @@ protected: LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); - if (PendingComputeTasks.erase(taskId) == 0) { - LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished"); - } else { - auto result = PendingComputeActors.emplace(std::make_pair(task.ComputeActorId, TProgressStat())); - YQL_ENSURE(result.second); + if (Planner) { + if (Planner->GetPendingComputeTasks().erase(taskId) == 0) { + LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished"); + } else { + auto result = Planner->GetPendingComputeActors().emplace(std::make_pair(task.ComputeActorId, TProgressStat())); + YQL_ENSURE(result.second); - CollectTaskChannelsUpdates(task, channelsUpdates); + CollectTaskChannelsUpdates(task, channelsUpdates); + } } } @@ -602,9 +612,11 @@ protected: LastResourceUsageUpdate = now; TProgressStat::TEntry consumption; - for (const auto& p : PendingComputeActors) { - const auto& t = p.second.GetLastUsage(); - consumption += t; + if (Planner) { + for (const auto& p : Planner->GetPendingComputeActors()) { + const auto& t = p.second.GetLastUsage(); + consumption += t; + } } for (const auto& p : LastStats) { @@ -618,8 +630,10 @@ protected: if (ru <= 100 && !force) return; - for (auto& p : PendingComputeActors) { - p.second.Update(); + if (Planner) { + for (auto& p : Planner->GetPendingComputeActors()) { + p.second.Update(); + } } for (auto& p : LastStats) { @@ -684,6 +698,7 @@ protected: task.Meta.Reads.ConstructInPlace(); task.Meta.Reads->emplace_back(std::move(readInfo)); task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse(); + task.Meta.Type = TTaskMeta::TTaskType::Compute; LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id); } @@ -706,6 +721,8 @@ protected: input.SourceType = externalSource.GetType(); task.Meta.DqTaskParams.emplace(externalSource.GetTaskParamKey(), partitionParam); + task.Meta.Type = TTaskMeta::TTaskType::Compute; + } } @@ -741,6 +758,7 @@ protected: YQL_ENSURE(!shardInfo.KeyWriteRanges); auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.Type = TTaskMeta::TTaskType::Scan; task.Meta.ExecuterId = this->SelfId(); if (auto ptr = ShardIdToNodeId.FindPtr(taskLocation)) { task.Meta.NodeId = *ptr; @@ -921,11 +939,13 @@ protected: virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues) { - for (auto computeActor : PendingComputeActors) { - LOG_D("terminate compute actor " << computeActor.first); + if (Planner) { + for (auto computeActor : Planner->GetPendingComputeActors()) { + LOG_D("terminate compute actor " << computeActor.first); - auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution"); - this->Send(computeActor.first, ev.Release()); + auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution"); + this->Send(computeActor.first, ev.Release()); + } } auto& response = *ResponseEv->Record.MutableResponse(); @@ -1095,7 +1115,6 @@ protected: TActorId KqpTableResolverId; TActorId KqpShardsResolverId; - THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS) THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData; TVector<TProgressStat> LastStats; @@ -1109,7 +1128,6 @@ protected: NWilson::TSpan ExecuterStateSpan; NWilson::TSpan ExecuterTableResolveSpan; - THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources TMap<ui64, ui64> ShardIdToNodeId; TMap<ui64, TVector<ui64>> ShardsOnNode; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index a3eda7ec17..b7d5257fb7 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -9,6 +9,8 @@ #include <util/generic/set.h> +using namespace NActors; + namespace NKikimr::NKqp { #define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream) @@ -59,7 +61,7 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - bool isDataQuery) + bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization) : TxId(txId) , ExecuterId(executer) , ComputeTasks(std::move(computeTasks)) @@ -76,6 +78,9 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu , ExecuterRetriesConfig(executerRetriesConfig) , TasksGraph(graph) , IsDataQuery(isDataQuery) + , MkqlMemoryLimit(mkqlMemoryLimit) + , AsyncIoFactory(asyncIoFactory) + , DoOptimization(doOptimization) { if (!Database) { // a piece of magic for tests @@ -281,27 +286,131 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() { } } +const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const { + return TasksGraph.GetMeta().Snapshot; +} + +void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) { + + auto& task = TasksGraph.GetTask(taskId); + NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task); + + NYql::NDq::TComputeRuntimeSettings settings; + if (Deadline) { + settings.Timeout = Deadline - TAppData::TimeProvider->Now(); + } + //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery; + settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified; + settings.FailOnUndelivery = true; + settings.StatsMode = GetDqStatsMode(StatsMode); + settings.UseSpilling = false; + + NYql::NDq::TComputeMemoryLimits limits; + limits.ChannelBufferSize = 50_MB; + limits.MkqlLightProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(500_MB, MkqlMemoryLimit) : 500_MB; + limits.MkqlHeavyProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(2_GB, MkqlMemoryLimit) : 2_GB; + + auto& taskOpts = taskDesc->GetProgram().GetSettings(); + auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/ + ? limits.MkqlHeavyProgramMemoryLimit + : limits.MkqlLightProgramMemoryLimit; + + limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit, limit); + + auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory, + AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); + + auto computeActorId = shareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor); + task.ComputeActorId = computeActorId; + + LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); + + auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat()); + YQL_ENSURE(result.second); +} + +ui32 TKqpPlanner::GetnScanTasks() { + return nScanTasks; +} + +ui32 TKqpPlanner::GetnComputeTasks() { + return nComputeTasks; +} + std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() { - auto err = AssignTasksToNodes(); - if (err) { - return err; + nScanTasks = 0; + + for (auto& task : TasksGraph.GetTasks()) { + switch (task.Meta.Type) { + case TTaskMeta::TTaskType::Compute: + ComputeTasks.emplace_back(task.Id); + break; + case TTaskMeta::TTaskType::Scan: + TasksPerNode[task.Meta.NodeId].emplace_back(task.Id); + nScanTasks++; + break; + } } - for(auto& [nodeId, tasks] : TasksPerNode) { - SortUnique(tasks); - auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); - request.SerializedRequest = SerializeRequest(request); - auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks()); - if (ev != nullptr) { - return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release()); + LOG_D("Total tasks: " << nScanTasks + nComputeTasks << ", readonly: true" // TODO ??? + << ", " << nScanTasks << " scan tasks on " << TasksPerNode.size() << " nodes" + << ", execType: " << (IsDataQuery ? "Data" : "Scan") + << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}"); + + nComputeTasks = ComputeTasks.size(); + + if (IsDataQuery) { + bool shareMailbox = (ComputeTasks.size() <= 1); + for (ui64 taskId : ComputeTasks) { + ExecuteDataComputeTask(taskId, shareMailbox); } + ComputeTasks.clear(); } + + if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization) { + // query affects a single key or shard, so it might be more effective + // to execute this task locally so we can avoid useless overhead for remote task launching. + for(auto& [shardId, tasks]: TasksPerNode) { + for(ui64 taskId: tasks) { + ExecuteDataComputeTask(taskId, true); + } + } + + } else { + for (ui64 taskId : ComputeTasks) { + PendingComputeTasks.insert(taskId); + } + + for (auto& [shardId, tasks] : TasksPerNode) { + for (ui64 taskId : tasks) { + PendingComputeTasks.insert(taskId); + } + } + + auto err = AssignTasksToNodes(); + if (err) { + return err; + } + + for(auto& [nodeId, tasks] : TasksPerNode) { + SortUnique(tasks); + auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); + request.SerializedRequest = SerializeRequest(request); + auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks()); + if (ev != nullptr) { + return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release()); + } + } + + } + + return nullptr; } TString TKqpPlanner::GetEstimationsInfo() const { TStringStream ss; - ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:"; + ss << "ComputeTasks:" << nComputeTasks << ";NodeTasks:"; if (auto it = TasksPerNode.find(ExecuterId.NodeId()); it != TasksPerNode.end()) { ss << it->second.size() << ";"; } else { @@ -311,12 +420,20 @@ TString TKqpPlanner::GetEstimationsInfo() const { } void TKqpPlanner::Unsubscribe() { - for(ui64 nodeId: TrackingNodes) { + for (ui64 nodeId: TrackingNodes) { TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>( TActivationContext::InterconnectProxy(nodeId), ExecuterId, new TEvents::TEvUnsubscribe())); } } +THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() { + return PendingComputeActors; +} + +THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() { + return PendingComputeTasks; +} + void TKqpPlanner::PrepareToProcess() { auto rmConfig = GetKqpResourceManager()->GetConfig(); @@ -361,11 +478,11 @@ std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 t const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - bool isDataQuery) + bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization) { return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot, database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan, - std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery); + std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery, mkqlMemoryLimit, asyncIoFactory, doOptimization); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 5803ca618e..997e2d4499 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -1,11 +1,15 @@ #pragma once +#include <ydb/core/base/appdata.h> #include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> +#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/rm_service/kqp_resource_estimation.h> +#include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/actors/core/hfunc.h> @@ -40,7 +44,7 @@ public: const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - bool isDataQuery); + bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization); bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); std::unique_ptr<IEventHandle> PlanExecution(); @@ -49,14 +53,24 @@ public: ui32 GetCurrentRetryDelay(ui32 requestId); void Unsubscribe(); -private: + THashMap<TActorId, TProgressStat>& GetPendingComputeActors(); + THashSet<ui64>& GetPendingComputeTasks(); + + ui32 GetnScanTasks(); + ui32 GetnComputeTasks(); +private: + + const IKqpGateway::TKqpSnapshot& GetSnapshot() const; + void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox); void PrepareToProcess(); TString GetEstimationsInfo() const; std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData); ui32 CalcSendMessageFlagsForNode(ui32 nodeId); + + private: const ui64 TxId; const TActorId ExecuterId; @@ -78,6 +92,16 @@ private: TVector<TRequestData> Requests; TKqpTasksGraph& TasksGraph; const bool IsDataQuery; + ui64 MkqlMemoryLimit; + NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; + ui32 nComputeTasks = 0; + ui32 nScanTasks = 0; + bool DoOptimization; + + THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS) + THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources + + }; std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, @@ -86,6 +110,7 @@ std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 t const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery); + const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery, + ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 575c2fdfa9..1a99aff3af 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -254,6 +254,7 @@ private: task.Meta.ExecuterId = SelfId(); task.Meta.NodeId = nodeId; task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; return task; } @@ -264,6 +265,7 @@ private: auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; tasks.push_back(task.Id); ++cnt; return task; @@ -400,6 +402,7 @@ private: task.Meta.ExecuterId = SelfId(); task.Meta.NodeId = nodeId; task.Meta.ScanTask = true; + task.Meta.Type = TTaskMeta::TTaskType::Scan; task.SetMetaId(metaGlueingId); } } @@ -465,6 +468,7 @@ private: for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.Type = TTaskMeta::TTaskType::Compute; LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id); } } @@ -570,33 +574,21 @@ private: return; } - // NodeId -> {Tasks} - THashMap<ui64, TVector<ui64>> scanTasks; ui32 nShardScans = 0; - ui32 nScanTasks = 0; - TVector<ui64> computeTasks; InitializeChannelProxies(); + // calc stats for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - + if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) { // Task with source if (!task.Meta.Reads) { - scanTasks[task.Meta.NodeId].emplace_back(task.Id); - nScanTasks++; continue; } - if (stageInfo.Meta.IsSysView()) { - computeTasks.emplace_back(task.Id); - } else { - scanTasks[task.Meta.NodeId].emplace_back(task.Id); - nScanTasks++; - } - nShardScans += task.Meta.Reads->size(); if (Stats) { for(const auto& read: *task.Meta.Reads) { @@ -604,17 +596,15 @@ private: } } - } else { - computeTasks.emplace_back(task.Id); } } - - if (computeTasks.size() + nScanTasks > Request.MaxComputeActors) { - LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks); + if (TasksGraph.GetTasks().size() > Request.MaxComputeActors) { + // LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks); + LOG_N("Too many compute actors: totalTasks=" << TasksGraph.GetTasks().size()); TBase::ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() - << "Requested too many execution units: " << (computeTasks.size() + nScanTasks))); + << "Requested too many execution units: " << TasksGraph.GetTasks().size())); return; } @@ -622,12 +612,9 @@ private: prepareTasksSpan.End(); } - LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true" - << ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes" - << ", totalShardScans: " << nShardScans << ", execType: Scan" - << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}"); + LOG_D("TotalShardScans: " << nShardScans); - ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot)); + ExecuteScanTx(std::move(snapshot)); Become(&TKqpScanExecuter::ExecuteState); if (ExecuterStateSpan) { @@ -667,30 +654,22 @@ public: } private: - void ExecuteScanTx(TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode, - TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { - LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), tasksPerNode.size()); - for (const auto& [_, tasks]: tasksPerNode) { - for (const auto& task : tasks) { - PendingComputeTasks.insert(task); - } - } - - for (auto& task : computeTasks) { - PendingComputeTasks.insert(task); - } - - Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks), - std::move(tasksPerNode), GetSnapshot(), + void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { + + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, + {}, GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling, - Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */); - LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size()); + Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false); + + LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size()); auto err = Planner->PlanExecution(); if (err) { TlsActivationContext->Send(err.release()); return; } + LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, Planner->GetnComputeTasks(), Planner->GetnComputeTasks()); + Planner->Submit(); } @@ -698,14 +677,16 @@ private: void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues) override { - if (!PendingComputeTasks.empty()) { - LOG_D("terminate pending resources request: " << Ydb::StatusIds::StatusCode_Name(status)); + if (Planner) { + if (!Planner->GetPendingComputeTasks().empty()) { + LOG_D("terminate pending resources request: " << Ydb::StatusIds::StatusCode_Name(status)); - auto ev = MakeHolder<TEvKqpNode::TEvCancelKqpTasksRequest>(); - ev->Record.SetTxId(TxId); - ev->Record.SetReason(Ydb::StatusIds::StatusCode_Name(status)); + auto ev = MakeHolder<TEvKqpNode::TEvCancelKqpTasksRequest>(); + ev->Record.SetTxId(TxId); + ev->Record.SetReason(Ydb::StatusIds::StatusCode_Name(status)); - Send(MakeKqpNodeServiceID(SelfId().NodeId()), ev.Release()); + Send(MakeKqpNodeServiceID(SelfId().NodeId()), ev.Release()); + } } TBase::ReplyErrorAndDie(status, issues); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 829ba1304f..fbde955300 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -159,10 +159,18 @@ public: ui64 NodeId = 0; // only in case of scans over persistent snapshots bool ScanTask = false; TActorId ExecuterId; + ui32 Type = Unknown; THashMap<TString, TString> DqTaskParams; // Params for sources/sinks THashMap<TString, TString> DqSecureParams; + enum TTaskType : ui32 { + Unknown = 0, + Compute = 1, + Scan = 2, + DataShard = 3, + }; + struct TColumn { ui32 Id = 0; NScheme::TTypeInfo Type; |