aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakostrov <makostrov@yandex-team.com>2023-07-19 14:13:25 +0300
committermakostrov <makostrov@yandex-team.com>2023-07-19 14:13:25 +0300
commit29ea4eeb548a98f60ebf35a20a38a10229628da0 (patch)
tree569052e6fd6ace4ddc2fcb196ec318f9411ba294
parentb03dce345d49158a866df832fc0e4cd257893146 (diff)
downloadydb-29ea4eeb548a98f60ebf35a20a38a10229628da0.tar.gz
kqp delegate tasks execution to planner KIKIMR-18631
KIKIMR-18631 Fix TTaskType Add Compute Tasks to Planner correctly Add optimization: execute locally scan tasks at one Node fix code Move on Fix Stats: Affected shards Shift calculation of scan tasks to Planner
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp99
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h150
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp147
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h31
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp77
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h8
6 files changed, 306 insertions, 206 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 31920b9cb0..0e023ec038 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -163,10 +163,12 @@ public:
if (IsDebugLogEnabled()) {
auto sb = TStringBuilder() << "ActorState: " << CurrentStateFuncName()
- << ", waiting for " << PendingComputeActors.size() << " compute actor(s) and "
+ << ", waiting for " << (Planner ? Planner->GetPendingComputeActors().size() : 0) << " compute actor(s) and "
<< notFinished << " datashard(s): ";
- for (const auto& shardId : PendingComputeActors) {
- sb << "CA " << shardId.first << ", ";
+ if (Planner) {
+ for (const auto& shardId : Planner->GetPendingComputeActors()) {
+ sb << "CA " << shardId.first << ", ";
+ }
}
for (const auto& [shardId, shardState] : ShardStates) {
if (shardState.State != TShardState::EState::Finished) {
@@ -1343,6 +1345,7 @@ private:
return TasksGraph.GetTask(it->second);
}
auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.Type = TTaskMeta::TTaskType::DataShard;
task.Meta.ExecuterId = SelfId();
task.Meta.ShardId = shardId;
shardTasks.emplace(shardId, task.Id);
@@ -1527,6 +1530,7 @@ private:
for (ui32 i = 0; i < partitionsCount; ++i) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = SelfId();
+ task.Meta.Type = TTaskMeta::TTaskType::Compute;
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
}
@@ -1623,44 +1627,6 @@ private:
YQL_ENSURE(result.second);
}
- void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) {
- auto& task = TasksGraph.GetTask(taskId);
- NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task);
-
- TComputeRuntimeSettings settings;
- if (Deadline) {
- settings.Timeout = *Deadline - TAppData::TimeProvider->Now();
- }
- //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery;
- settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified;
- settings.FailOnUndelivery = true;
- settings.StatsMode = GetDqStatsMode(Request.StatsMode);
- settings.UseSpilling = false;
-
- TComputeMemoryLimits limits;
- limits.ChannelBufferSize = 50_MB;
- limits.MkqlLightProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(500_MB, Request.MkqlMemoryLimit) : 500_MB;
- limits.MkqlHeavyProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(2_GB, Request.MkqlMemoryLimit) : 2_GB;
-
- auto& taskOpts = taskDesc->GetProgram().GetSettings();
- auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/
- ? limits.MkqlHeavyProgramMemoryLimit
- : limits.MkqlLightProgramMemoryLimit;
-
- limits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(limit, limit);
-
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, taskDesc, AsyncIoFactory,
- AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
-
- auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor);
- task.ComputeActorId = computeActorId;
-
- LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
-
- auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
- YQL_ENSURE(result.second);
- }
-
void Execute() {
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
@@ -2128,43 +2094,27 @@ private:
NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size());
- // first, start compute tasks
- bool shareMailbox = (ComputeTasks.size() <= 1);
- for (ui64 taskId : ComputeTasks) {
- ExecuteDataComputeTask(taskId, shareMailbox);
+ for (auto& [shardId, tasks] : RemoteComputeTasks) {
+ auto it = ShardIdToNodeId.find(shardId);
+ YQL_ENSURE(it != ShardIdToNodeId.end());
+ for (ui64 taskId : tasks) {
+ auto& task = TasksGraph.GetTask(taskId);
+ task.Meta.NodeId = it->second;
+ }
}
- if (ComputeTasks.size() == 0 && RemoteComputeTasks.size() == 1 && !UnknownAffectedShardCount) {
- // query affects a single key or shard, so it might be more effective
- // to execute this task locally so we can avoid useless overhead for remote task launching.
- for(auto& [shardId, tasks]: RemoteComputeTasks) {
- for(ui64 taskId: tasks) {
- ExecuteDataComputeTask(taskId, true);
- }
- }
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, {}, GetSnapshot(),
+ Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
+ ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, !UnknownAffectedShardCount);
- } else {
- THashMap<ui64, TVector<ui64>> tasksPerNode;
- for (auto& [shardId, tasks] : RemoteComputeTasks) {
- auto it = ShardIdToNodeId.find(shardId);
- YQL_ENSURE(it != ShardIdToNodeId.end());
- for (ui64 taskId : tasks) {
- PendingComputeTasks.insert(taskId);
- tasksPerNode[it->second].emplace_back(taskId);
- }
- }
-
- Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
- Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
- ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */);
- auto err = Planner->PlanExecution();
- if (err) {
- TlsActivationContext->Send(err.release());
- return;
- }
- Planner->Submit();
+ auto err = Planner->PlanExecution();
+ if (err) {
+ TlsActivationContext->Send(err.release());
+ return;
}
+ Planner->Submit();
+
// then start data tasks with known actor ids of compute tasks
for (auto& [shardId, shardTx] : DatashardTxs) {
shardTx->SetType(NKikimrTxDataShard::KQP_TX_TYPE_DATA);
@@ -2229,9 +2179,10 @@ private:
<< ", topicTxs: " << Request.TopicOperations.GetSize()
<< ", volatile: " << VolatileTx
<< ", immediate: " << ImmediateTx
- << ", pending compute tasks" << PendingComputeTasks.size()
+ << ", pending compute tasks" << (Planner ? Planner->GetPendingComputeTasks().size() : 0)
<< ", useFollowers: " << GetUseFollowers());
+ // error
LOG_T("Updating channels after the creation of compute actors");
THashMap<TActorId, THashSet<ui64>> updates;
for (ui64 taskId : ComputeTasks) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index bcb554a144..7fa3567aeb 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -245,25 +245,27 @@ protected:
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
// initial TEvState event from Compute Actor
// there can be race with RM answer
- if (PendingComputeTasks.erase(taskId)) {
- auto it = PendingComputeActors.emplace(computeActor, TProgressStat());
- YQL_ENSURE(it.second);
+ if (Planner) {
+ if (Planner->GetPendingComputeTasks().erase(taskId)) {
+ auto it = Planner->GetPendingComputeActors().emplace(computeActor, TProgressStat());
+ YQL_ENSURE(it.second);
- if (state.HasStats()) {
- it.first->second.Set(state.GetStats());
- }
-
- auto& task = TasksGraph.GetTask(taskId);
- task.ComputeActorId = computeActor;
-
- THashMap<TActorId, THashSet<ui64>> updates;
- CollectTaskChannelsUpdates(task, updates);
- PropagateChannelsUpdates(updates);
- } else {
- auto it = PendingComputeActors.find(computeActor);
- if (it != PendingComputeActors.end()) {
if (state.HasStats()) {
- it->second.Set(state.GetStats());
+ it.first->second.Set(state.GetStats());
+ }
+
+ auto& task = TasksGraph.GetTask(taskId);
+ task.ComputeActorId = computeActor;
+
+ THashMap<TActorId, THashSet<ui64>> updates;
+ CollectTaskChannelsUpdates(task, updates);
+ PropagateChannelsUpdates(updates);
+ } else {
+ auto it = Planner->GetPendingComputeActors().find(computeActor);
+ if (it != Planner->GetPendingComputeActors().end()) {
+ if (state.HasStats()) {
+ it->second.Set(state.GetStats());
+ }
}
}
}
@@ -279,26 +281,28 @@ protected:
LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
- auto it = PendingComputeActors.find(computeActor);
- if (it == PendingComputeActors.end()) {
- LOG_W("Got execution state for compute actor: " << computeActor
- << ", task: " << taskId
- << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
- << ", too early (waiting reply from RM)");
-
- if (PendingComputeTasks.erase(taskId)) {
- LOG_E("Got execution state for compute actor: " << computeActor
- << ", for unknown task: " << state.GetTaskId()
- << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
- return;
- }
- } else {
- if (state.HasStats()) {
- it->second.Set(state.GetStats());
+ if (Planner) {
+ auto it = Planner->GetPendingComputeActors().find(computeActor);
+ if (it == Planner->GetPendingComputeActors().end()) {
+ LOG_W("Got execution state for compute actor: " << computeActor
+ << ", task: " << taskId
+ << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
+ << ", too early (waiting reply from RM)");
+
+ if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) {
+ LOG_E("Got execution state for compute actor: " << computeActor
+ << ", for unknown task: " << state.GetTaskId()
+ << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
+ return;
+ }
+ } else {
+ if (state.HasStats()) {
+ it->second.Set(state.GetStats());
+ }
+ LastStats.emplace_back(std::move(it->second));
+ Planner->GetPendingComputeActors().erase(it);
+ YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end());
}
- LastStats.emplace_back(std::move(it->second));
- PendingComputeActors.erase(it);
- YQL_ENSURE(PendingComputeTasks.find(taskId) == PendingComputeTasks.end());
}
}
}
@@ -371,7 +375,7 @@ protected:
protected:
bool CheckExecutionComplete() {
- if (PendingComputeActors.empty() && PendingComputeTasks.empty()) {
+ if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) {
static_cast<TDerived*>(this)->Finalize();
UpdateResourcesUsage(true);
return true;
@@ -382,11 +386,13 @@ protected:
if (IsDebugLogEnabled()) {
TStringBuilder sb;
sb << "Waiting for: ";
- for (auto ct : PendingComputeTasks) {
- sb << "CT " << ct << ", ";
- }
- for (auto ca : PendingComputeActors) {
- sb << "CA " << ca.first << ", ";
+ if (Planner) {
+ for (auto ct : Planner->GetPendingComputeTasks()) {
+ sb << "CT " << ct << ", ";
+ }
+ for (auto ca : Planner->GetPendingComputeActors()) {
+ sb << "CA " << ca.first << ", ";
+ }
}
LOG_D(sb);
}
@@ -434,15 +440,17 @@ protected:
auto nodeId = ev->Get()->NodeId;
LOG_N("Disconnected node " << nodeId);
- for (auto computeActor : PendingComputeActors) {
- if (computeActor.first.NodeId() == nodeId) {
- return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ if (Planner) {
+ for (auto computeActor : Planner->GetPendingComputeActors()) {
+ if (computeActor.first.NodeId() == nodeId) {
+ return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ }
}
- }
- for (auto& task : TasksGraph.GetTasks()) {
- if (task.Meta.NodeId == nodeId && PendingComputeTasks.contains(task.Id)) {
- return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ for (auto& task : TasksGraph.GetTasks()) {
+ if (task.Meta.NodeId == nodeId && Planner->GetPendingComputeTasks().contains(task.Id)) {
+ return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ }
}
}
}
@@ -502,13 +510,15 @@ protected:
LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
- if (PendingComputeTasks.erase(taskId) == 0) {
- LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished");
- } else {
- auto result = PendingComputeActors.emplace(std::make_pair(task.ComputeActorId, TProgressStat()));
- YQL_ENSURE(result.second);
+ if (Planner) {
+ if (Planner->GetPendingComputeTasks().erase(taskId) == 0) {
+ LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished");
+ } else {
+ auto result = Planner->GetPendingComputeActors().emplace(std::make_pair(task.ComputeActorId, TProgressStat()));
+ YQL_ENSURE(result.second);
- CollectTaskChannelsUpdates(task, channelsUpdates);
+ CollectTaskChannelsUpdates(task, channelsUpdates);
+ }
}
}
@@ -602,9 +612,11 @@ protected:
LastResourceUsageUpdate = now;
TProgressStat::TEntry consumption;
- for (const auto& p : PendingComputeActors) {
- const auto& t = p.second.GetLastUsage();
- consumption += t;
+ if (Planner) {
+ for (const auto& p : Planner->GetPendingComputeActors()) {
+ const auto& t = p.second.GetLastUsage();
+ consumption += t;
+ }
}
for (const auto& p : LastStats) {
@@ -618,8 +630,10 @@ protected:
if (ru <= 100 && !force)
return;
- for (auto& p : PendingComputeActors) {
- p.second.Update();
+ if (Planner) {
+ for (auto& p : Planner->GetPendingComputeActors()) {
+ p.second.Update();
+ }
}
for (auto& p : LastStats) {
@@ -684,6 +698,7 @@ protected:
task.Meta.Reads.ConstructInPlace();
task.Meta.Reads->emplace_back(std::move(readInfo));
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
+ task.Meta.Type = TTaskMeta::TTaskType::Compute;
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
}
@@ -706,6 +721,8 @@ protected:
input.SourceType = externalSource.GetType();
task.Meta.DqTaskParams.emplace(externalSource.GetTaskParamKey(), partitionParam);
+ task.Meta.Type = TTaskMeta::TTaskType::Compute;
+
}
}
@@ -741,6 +758,7 @@ protected:
YQL_ENSURE(!shardInfo.KeyWriteRanges);
auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
task.Meta.ExecuterId = this->SelfId();
if (auto ptr = ShardIdToNodeId.FindPtr(taskLocation)) {
task.Meta.NodeId = *ptr;
@@ -921,11 +939,13 @@ protected:
virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
- for (auto computeActor : PendingComputeActors) {
- LOG_D("terminate compute actor " << computeActor.first);
+ if (Planner) {
+ for (auto computeActor : Planner->GetPendingComputeActors()) {
+ LOG_D("terminate compute actor " << computeActor.first);
- auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
- this->Send(computeActor.first, ev.Release());
+ auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
+ this->Send(computeActor.first, ev.Release());
+ }
}
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1095,7 +1115,6 @@ protected:
TActorId KqpTableResolverId;
TActorId KqpShardsResolverId;
- THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
TVector<TProgressStat> LastStats;
@@ -1109,7 +1128,6 @@ protected:
NWilson::TSpan ExecuterStateSpan;
NWilson::TSpan ExecuterTableResolveSpan;
- THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources
TMap<ui64, ui64> ShardIdToNodeId;
TMap<ui64, TVector<ui64>> ShardsOnNode;
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index a3eda7ec17..b7d5257fb7 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -9,6 +9,8 @@
#include <util/generic/set.h>
+using namespace NActors;
+
namespace NKikimr::NKqp {
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream)
@@ -59,7 +61,7 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- bool isDataQuery)
+ bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization)
: TxId(txId)
, ExecuterId(executer)
, ComputeTasks(std::move(computeTasks))
@@ -76,6 +78,9 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
, ExecuterRetriesConfig(executerRetriesConfig)
, TasksGraph(graph)
, IsDataQuery(isDataQuery)
+ , MkqlMemoryLimit(mkqlMemoryLimit)
+ , AsyncIoFactory(asyncIoFactory)
+ , DoOptimization(doOptimization)
{
if (!Database) {
// a piece of magic for tests
@@ -281,27 +286,131 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
}
}
+const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {
+ return TasksGraph.GetMeta().Snapshot;
+}
+
+void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) {
+
+ auto& task = TasksGraph.GetTask(taskId);
+ NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task);
+
+ NYql::NDq::TComputeRuntimeSettings settings;
+ if (Deadline) {
+ settings.Timeout = Deadline - TAppData::TimeProvider->Now();
+ }
+ //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery;
+ settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified;
+ settings.FailOnUndelivery = true;
+ settings.StatsMode = GetDqStatsMode(StatsMode);
+ settings.UseSpilling = false;
+
+ NYql::NDq::TComputeMemoryLimits limits;
+ limits.ChannelBufferSize = 50_MB;
+ limits.MkqlLightProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(500_MB, MkqlMemoryLimit) : 500_MB;
+ limits.MkqlHeavyProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(2_GB, MkqlMemoryLimit) : 2_GB;
+
+ auto& taskOpts = taskDesc->GetProgram().GetSettings();
+ auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/
+ ? limits.MkqlHeavyProgramMemoryLimit
+ : limits.MkqlLightProgramMemoryLimit;
+
+ limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit, limit);
+
+ auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
+ AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
+
+ auto computeActorId = shareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor);
+ task.ComputeActorId = computeActorId;
+
+ LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
+
+ auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
+ YQL_ENSURE(result.second);
+}
+
+ui32 TKqpPlanner::GetnScanTasks() {
+ return nScanTasks;
+}
+
+ui32 TKqpPlanner::GetnComputeTasks() {
+ return nComputeTasks;
+}
+
std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
- auto err = AssignTasksToNodes();
- if (err) {
- return err;
+ nScanTasks = 0;
+
+ for (auto& task : TasksGraph.GetTasks()) {
+ switch (task.Meta.Type) {
+ case TTaskMeta::TTaskType::Compute:
+ ComputeTasks.emplace_back(task.Id);
+ break;
+ case TTaskMeta::TTaskType::Scan:
+ TasksPerNode[task.Meta.NodeId].emplace_back(task.Id);
+ nScanTasks++;
+ break;
+ }
}
- for(auto& [nodeId, tasks] : TasksPerNode) {
- SortUnique(tasks);
- auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
- request.SerializedRequest = SerializeRequest(request);
- auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks());
- if (ev != nullptr) {
- return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release());
+ LOG_D("Total tasks: " << nScanTasks + nComputeTasks << ", readonly: true" // TODO ???
+ << ", " << nScanTasks << " scan tasks on " << TasksPerNode.size() << " nodes"
+ << ", execType: " << (IsDataQuery ? "Data" : "Scan")
+ << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}");
+
+ nComputeTasks = ComputeTasks.size();
+
+ if (IsDataQuery) {
+ bool shareMailbox = (ComputeTasks.size() <= 1);
+ for (ui64 taskId : ComputeTasks) {
+ ExecuteDataComputeTask(taskId, shareMailbox);
}
+ ComputeTasks.clear();
}
+
+ if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization) {
+ // query affects a single key or shard, so it might be more effective
+ // to execute this task locally so we can avoid useless overhead for remote task launching.
+ for(auto& [shardId, tasks]: TasksPerNode) {
+ for(ui64 taskId: tasks) {
+ ExecuteDataComputeTask(taskId, true);
+ }
+ }
+
+ } else {
+ for (ui64 taskId : ComputeTasks) {
+ PendingComputeTasks.insert(taskId);
+ }
+
+ for (auto& [shardId, tasks] : TasksPerNode) {
+ for (ui64 taskId : tasks) {
+ PendingComputeTasks.insert(taskId);
+ }
+ }
+
+ auto err = AssignTasksToNodes();
+ if (err) {
+ return err;
+ }
+
+ for(auto& [nodeId, tasks] : TasksPerNode) {
+ SortUnique(tasks);
+ auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
+ request.SerializedRequest = SerializeRequest(request);
+ auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks());
+ if (ev != nullptr) {
+ return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release());
+ }
+ }
+
+ }
+
+
return nullptr;
}
TString TKqpPlanner::GetEstimationsInfo() const {
TStringStream ss;
- ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:";
+ ss << "ComputeTasks:" << nComputeTasks << ";NodeTasks:";
if (auto it = TasksPerNode.find(ExecuterId.NodeId()); it != TasksPerNode.end()) {
ss << it->second.size() << ";";
} else {
@@ -311,12 +420,20 @@ TString TKqpPlanner::GetEstimationsInfo() const {
}
void TKqpPlanner::Unsubscribe() {
- for(ui64 nodeId: TrackingNodes) {
+ for (ui64 nodeId: TrackingNodes) {
TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(
TActivationContext::InterconnectProxy(nodeId), ExecuterId, new TEvents::TEvUnsubscribe()));
}
}
+THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() {
+ return PendingComputeActors;
+}
+
+THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() {
+ return PendingComputeTasks;
+}
+
void TKqpPlanner::PrepareToProcess() {
auto rmConfig = GetKqpResourceManager()->GetConfig();
@@ -361,11 +478,11 @@ std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 t
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- bool isDataQuery)
+ bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization)
{
return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot,
database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan,
- std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery);
+ std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery, mkqlMemoryLimit, asyncIoFactory, doOptimization);
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 5803ca618e..997e2d4499 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -1,11 +1,15 @@
#pragma once
+#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -40,7 +44,7 @@ public:
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- bool isDataQuery);
+ bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization);
bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target);
std::unique_ptr<IEventHandle> PlanExecution();
@@ -49,14 +53,24 @@ public:
ui32 GetCurrentRetryDelay(ui32 requestId);
void Unsubscribe();
-private:
+ THashMap<TActorId, TProgressStat>& GetPendingComputeActors();
+ THashSet<ui64>& GetPendingComputeTasks();
+
+ ui32 GetnScanTasks();
+ ui32 GetnComputeTasks();
+private:
+
+ const IKqpGateway::TKqpSnapshot& GetSnapshot() const;
+ void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox);
void PrepareToProcess();
TString GetEstimationsInfo() const;
std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData);
ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
+
+
private:
const ui64 TxId;
const TActorId ExecuterId;
@@ -78,6 +92,16 @@ private:
TVector<TRequestData> Requests;
TKqpTasksGraph& TasksGraph;
const bool IsDataQuery;
+ ui64 MkqlMemoryLimit;
+ NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
+ ui32 nComputeTasks = 0;
+ ui32 nScanTasks = 0;
+ bool DoOptimization;
+
+ THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
+ THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources
+
+
};
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
@@ -86,6 +110,7 @@ std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 t
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
- const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery);
+ const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery,
+ ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 575c2fdfa9..1a99aff3af 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -254,6 +254,7 @@ private:
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
return task;
}
@@ -264,6 +265,7 @@ private:
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
tasks.push_back(task.Id);
++cnt;
return task;
@@ -400,6 +402,7 @@ private:
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
task.SetMetaId(metaGlueingId);
}
}
@@ -465,6 +468,7 @@ private:
for (ui32 i = 0; i < partitionsCount; ++i) {
auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.Type = TTaskMeta::TTaskType::Compute;
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
}
@@ -570,33 +574,21 @@ private:
return;
}
- // NodeId -> {Tasks}
- THashMap<ui64, TVector<ui64>> scanTasks;
ui32 nShardScans = 0;
- ui32 nScanTasks = 0;
-
TVector<ui64> computeTasks;
InitializeChannelProxies();
+ // calc stats
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
-
+
if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {
// Task with source
if (!task.Meta.Reads) {
- scanTasks[task.Meta.NodeId].emplace_back(task.Id);
- nScanTasks++;
continue;
}
- if (stageInfo.Meta.IsSysView()) {
- computeTasks.emplace_back(task.Id);
- } else {
- scanTasks[task.Meta.NodeId].emplace_back(task.Id);
- nScanTasks++;
- }
-
nShardScans += task.Meta.Reads->size();
if (Stats) {
for(const auto& read: *task.Meta.Reads) {
@@ -604,17 +596,15 @@ private:
}
}
- } else {
- computeTasks.emplace_back(task.Id);
}
}
-
- if (computeTasks.size() + nScanTasks > Request.MaxComputeActors) {
- LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks);
+ if (TasksGraph.GetTasks().size() > Request.MaxComputeActors) {
+ // LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks);
+ LOG_N("Too many compute actors: totalTasks=" << TasksGraph.GetTasks().size());
TBase::ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
- << "Requested too many execution units: " << (computeTasks.size() + nScanTasks)));
+ << "Requested too many execution units: " << TasksGraph.GetTasks().size()));
return;
}
@@ -622,12 +612,9 @@ private:
prepareTasksSpan.End();
}
- LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true"
- << ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes"
- << ", totalShardScans: " << nShardScans << ", execType: Scan"
- << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}");
+ LOG_D("TotalShardScans: " << nShardScans);
- ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot));
+ ExecuteScanTx(std::move(snapshot));
Become(&TKqpScanExecuter::ExecuteState);
if (ExecuterStateSpan) {
@@ -667,30 +654,22 @@ public:
}
private:
- void ExecuteScanTx(TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode,
- TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
- LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), tasksPerNode.size());
- for (const auto& [_, tasks]: tasksPerNode) {
- for (const auto& task : tasks) {
- PendingComputeTasks.insert(task);
- }
- }
-
- for (auto& task : computeTasks) {
- PendingComputeTasks.insert(task);
- }
-
- Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks),
- std::move(tasksPerNode), GetSnapshot(),
+ void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
+
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {},
+ {}, GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
- Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */);
- LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size());
+ Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false);
+
+ LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
auto err = Planner->PlanExecution();
if (err) {
TlsActivationContext->Send(err.release());
return;
}
+ LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, Planner->GetnComputeTasks(), Planner->GetnComputeTasks());
+
Planner->Submit();
}
@@ -698,14 +677,16 @@ private:
void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues) override
{
- if (!PendingComputeTasks.empty()) {
- LOG_D("terminate pending resources request: " << Ydb::StatusIds::StatusCode_Name(status));
+ if (Planner) {
+ if (!Planner->GetPendingComputeTasks().empty()) {
+ LOG_D("terminate pending resources request: " << Ydb::StatusIds::StatusCode_Name(status));
- auto ev = MakeHolder<TEvKqpNode::TEvCancelKqpTasksRequest>();
- ev->Record.SetTxId(TxId);
- ev->Record.SetReason(Ydb::StatusIds::StatusCode_Name(status));
+ auto ev = MakeHolder<TEvKqpNode::TEvCancelKqpTasksRequest>();
+ ev->Record.SetTxId(TxId);
+ ev->Record.SetReason(Ydb::StatusIds::StatusCode_Name(status));
- Send(MakeKqpNodeServiceID(SelfId().NodeId()), ev.Release());
+ Send(MakeKqpNodeServiceID(SelfId().NodeId()), ev.Release());
+ }
}
TBase::ReplyErrorAndDie(status, issues);
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 829ba1304f..fbde955300 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -159,10 +159,18 @@ public:
ui64 NodeId = 0; // only in case of scans over persistent snapshots
bool ScanTask = false;
TActorId ExecuterId;
+ ui32 Type = Unknown;
THashMap<TString, TString> DqTaskParams; // Params for sources/sinks
THashMap<TString, TString> DqSecureParams;
+ enum TTaskType : ui32 {
+ Unknown = 0,
+ Compute = 1,
+ Scan = 2,
+ DataShard = 3,
+ };
+
struct TColumn {
ui32 Id = 0;
NScheme::TTypeInfo Type;