From 29ea4eeb548a98f60ebf35a20a38a10229628da0 Mon Sep 17 00:00:00 2001
From: makostrov <makostrov@yandex-team.com>
Date: Wed, 19 Jul 2023 14:13:25 +0300
Subject: kqp delegate tasks execution to planner KIKIMR-18631

KIKIMR-18631

Fix TTaskType

Add Compute Tasks to Planner correctly

Add optimization: execute locally scan tasks at one Node

fix code

Move on

Fix Stats: Affected shards

Shift calculation of scan tasks to Planner
---
 ydb/core/kqp/executer_actor/kqp_data_executer.cpp |  99 ++++----------
 ydb/core/kqp/executer_actor/kqp_executer_impl.h   | 150 ++++++++++++----------
 ydb/core/kqp/executer_actor/kqp_planner.cpp       | 147 ++++++++++++++++++---
 ydb/core/kqp/executer_actor/kqp_planner.h         |  31 ++++-
 ydb/core/kqp/executer_actor/kqp_scan_executer.cpp |  77 +++++------
 ydb/core/kqp/executer_actor/kqp_tasks_graph.h     |   8 ++
 6 files changed, 306 insertions(+), 206 deletions(-)

diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 31920b9cb0..0e023ec038 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -163,10 +163,12 @@ public:
 
         if (IsDebugLogEnabled()) {
             auto sb = TStringBuilder() << "ActorState: " << CurrentStateFuncName()
-                << ", waiting for " << PendingComputeActors.size() << " compute actor(s) and "
+                << ", waiting for " << (Planner ? Planner->GetPendingComputeActors().size() : 0) << " compute actor(s) and "
                 << notFinished << " datashard(s): ";
-            for (const auto& shardId : PendingComputeActors) {
-                sb << "CA " << shardId.first << ", ";
+            if (Planner) {
+                for (const auto& shardId : Planner->GetPendingComputeActors()) {
+                    sb << "CA " << shardId.first << ", ";
+                }
             }
             for (const auto& [shardId, shardState] : ShardStates) {
                 if (shardState.State != TShardState::EState::Finished) {
@@ -1343,6 +1345,7 @@ private:
                 return TasksGraph.GetTask(it->second);
             }
             auto& task = TasksGraph.AddTask(stageInfo);
+            task.Meta.Type = TTaskMeta::TTaskType::DataShard;
             task.Meta.ExecuterId = SelfId();
             task.Meta.ShardId = shardId;
             shardTasks.emplace(shardId, task.Id);
@@ -1527,6 +1530,7 @@ private:
         for (ui32 i = 0; i < partitionsCount; ++i) {
             auto& task = TasksGraph.AddTask(stageInfo);
             task.Meta.ExecuterId = SelfId();
+            task.Meta.Type = TTaskMeta::TTaskType::Compute;
             LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
         }
     }
@@ -1623,44 +1627,6 @@ private:
         YQL_ENSURE(result.second);
     }
 
-    void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) {
-        auto& task = TasksGraph.GetTask(taskId);
-        NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task);
-
-        TComputeRuntimeSettings settings;
-        if (Deadline) {
-            settings.Timeout = *Deadline - TAppData::TimeProvider->Now();
-        }
-        //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery;
-        settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified;
-        settings.FailOnUndelivery = true;
-        settings.StatsMode = GetDqStatsMode(Request.StatsMode);
-        settings.UseSpilling = false;
-
-        TComputeMemoryLimits limits;
-        limits.ChannelBufferSize = 50_MB;
-        limits.MkqlLightProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(500_MB, Request.MkqlMemoryLimit) : 500_MB;
-        limits.MkqlHeavyProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(2_GB, Request.MkqlMemoryLimit) : 2_GB;
-
-        auto& taskOpts = taskDesc->GetProgram().GetSettings();
-        auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/
-            ? limits.MkqlHeavyProgramMemoryLimit
-            : limits.MkqlLightProgramMemoryLimit;
-
-        limits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(limit, limit);
-
-        auto computeActor = CreateKqpComputeActor(SelfId(), TxId, taskDesc, AsyncIoFactory,
-            AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
-
-        auto computeActorId = shareMailbox ? RegisterWithSameMailbox(computeActor) : Register(computeActor);
-        task.ComputeActorId = computeActorId;
-
-        LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
-
-        auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
-        YQL_ENSURE(result.second);
-    }
-
     void Execute() {
         NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
         LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
@@ -2128,43 +2094,27 @@ private:
         NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END);
         LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size());
 
-        // first, start compute tasks
-        bool shareMailbox = (ComputeTasks.size() <= 1);
-        for (ui64 taskId : ComputeTasks) {
-            ExecuteDataComputeTask(taskId, shareMailbox);
+        for (auto& [shardId, tasks] : RemoteComputeTasks) {
+            auto it = ShardIdToNodeId.find(shardId);
+            YQL_ENSURE(it != ShardIdToNodeId.end());
+            for (ui64 taskId : tasks) {
+                auto& task = TasksGraph.GetTask(taskId);
+                task.Meta.NodeId = it->second;
+            }
         }
 
-        if (ComputeTasks.size() == 0 && RemoteComputeTasks.size() == 1 && !UnknownAffectedShardCount) {
-            // query affects a single key or shard, so it might be more effective
-            // to execute this task locally so we can avoid useless overhead for remote task launching.
-            for(auto& [shardId, tasks]: RemoteComputeTasks) {
-                for(ui64 taskId: tasks) {
-                    ExecuteDataComputeTask(taskId, true);
-                }
-            }
+        Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, {}, GetSnapshot(),
+            Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
+            ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, !UnknownAffectedShardCount);
 
-        } else {
-            THashMap<ui64, TVector<ui64>> tasksPerNode;
-            for (auto& [shardId, tasks] : RemoteComputeTasks) {
-                auto it = ShardIdToNodeId.find(shardId);
-                YQL_ENSURE(it != ShardIdToNodeId.end());
-                for (ui64 taskId : tasks) {
-                    PendingComputeTasks.insert(taskId);
-                    tasksPerNode[it->second].emplace_back(taskId);
-                }
-            }
-
-            Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
-                Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
-                ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */);
-            auto err = Planner->PlanExecution();
-            if (err) {
-                TlsActivationContext->Send(err.release());
-                return;
-            }
-            Planner->Submit();
+        auto err = Planner->PlanExecution();
+        if (err) {
+            TlsActivationContext->Send(err.release());
+            return;
         }
 
+        Planner->Submit();
+
         // then start data tasks with known actor ids of compute tasks
         for (auto& [shardId, shardTx] : DatashardTxs) {
             shardTx->SetType(NKikimrTxDataShard::KQP_TX_TYPE_DATA);
@@ -2229,9 +2179,10 @@ private:
             << ", topicTxs: " << Request.TopicOperations.GetSize()
             << ", volatile: " << VolatileTx
             << ", immediate: " << ImmediateTx
-            << ", pending compute tasks" << PendingComputeTasks.size()
+            << ", pending compute tasks" << (Planner ? Planner->GetPendingComputeTasks().size() : 0)
             << ", useFollowers: " << GetUseFollowers());
 
+        // error
         LOG_T("Updating channels after the creation of compute actors");
         THashMap<TActorId, THashSet<ui64>> updates;
         for (ui64 taskId : ComputeTasks) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index bcb554a144..7fa3567aeb 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -245,25 +245,27 @@ protected:
             case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
                 // initial TEvState event from Compute Actor
                 // there can be race with RM answer
-                if (PendingComputeTasks.erase(taskId)) {
-                    auto it = PendingComputeActors.emplace(computeActor, TProgressStat());
-                    YQL_ENSURE(it.second);
+                if (Planner) {
+                    if (Planner->GetPendingComputeTasks().erase(taskId)) {
+                        auto it = Planner->GetPendingComputeActors().emplace(computeActor, TProgressStat());
+                        YQL_ENSURE(it.second);
 
-                    if (state.HasStats()) {
-                        it.first->second.Set(state.GetStats());
-                    }
-
-                    auto& task = TasksGraph.GetTask(taskId);
-                    task.ComputeActorId = computeActor;
-
-                    THashMap<TActorId, THashSet<ui64>> updates;
-                    CollectTaskChannelsUpdates(task, updates);
-                    PropagateChannelsUpdates(updates);
-                } else {
-                    auto it = PendingComputeActors.find(computeActor);
-                    if (it != PendingComputeActors.end()) {
                         if (state.HasStats()) {
-                            it->second.Set(state.GetStats());
+                            it.first->second.Set(state.GetStats());
+                        }
+
+                        auto& task = TasksGraph.GetTask(taskId);
+                        task.ComputeActorId = computeActor;
+
+                        THashMap<TActorId, THashSet<ui64>> updates;
+                        CollectTaskChannelsUpdates(task, updates);
+                        PropagateChannelsUpdates(updates);
+                    } else {
+                        auto it = Planner->GetPendingComputeActors().find(computeActor);
+                        if (it != Planner->GetPendingComputeActors().end()) {
+                            if (state.HasStats()) {
+                                it->second.Set(state.GetStats());
+                            }
                         }
                     }
                 }
@@ -279,26 +281,28 @@ protected:
                 LastTaskId = taskId;
                 LastComputeActorId = computeActor.ToString();
 
-                auto it = PendingComputeActors.find(computeActor);
-                if (it == PendingComputeActors.end()) {
-                    LOG_W("Got execution state for compute actor: " << computeActor
-                        << ", task: " << taskId
-                        << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
-                        << ", too early (waiting reply from RM)");
-
-                    if (PendingComputeTasks.erase(taskId)) {
-                        LOG_E("Got execution state for compute actor: " << computeActor
-                            << ", for unknown task: " << state.GetTaskId()
-                            << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
-                        return;
-                    }
-                } else {
-                    if (state.HasStats()) {
-                        it->second.Set(state.GetStats());
+                if (Planner) {
+                    auto it = Planner->GetPendingComputeActors().find(computeActor);
+                    if (it == Planner->GetPendingComputeActors().end()) {
+                        LOG_W("Got execution state for compute actor: " << computeActor
+                            << ", task: " << taskId
+                            << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
+                            << ", too early (waiting reply from RM)");
+
+                        if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) {
+                            LOG_E("Got execution state for compute actor: " << computeActor
+                                << ", for unknown task: " << state.GetTaskId()
+                                << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
+                            return;
+                        }
+                    } else {
+                        if (state.HasStats()) {
+                            it->second.Set(state.GetStats());
+                        }
+                        LastStats.emplace_back(std::move(it->second));
+                        Planner->GetPendingComputeActors().erase(it);
+                        YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end());
                     }
-                    LastStats.emplace_back(std::move(it->second));
-                    PendingComputeActors.erase(it);
-                    YQL_ENSURE(PendingComputeTasks.find(taskId) == PendingComputeTasks.end());
                 }
             }
         }
@@ -371,7 +375,7 @@ protected:
 
 protected:
     bool CheckExecutionComplete() {
-        if (PendingComputeActors.empty() && PendingComputeTasks.empty()) {
+        if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) {
             static_cast<TDerived*>(this)->Finalize();
             UpdateResourcesUsage(true);
             return true;
@@ -382,11 +386,13 @@ protected:
         if (IsDebugLogEnabled()) {
             TStringBuilder sb;
             sb << "Waiting for: ";
-            for (auto ct : PendingComputeTasks) {
-                sb << "CT " << ct << ", ";
-            }
-            for (auto ca : PendingComputeActors) {
-                sb << "CA " << ca.first << ", ";
+            if (Planner) {
+                for (auto ct : Planner->GetPendingComputeTasks()) {
+                    sb << "CT " << ct << ", ";
+                }
+                for (auto ca : Planner->GetPendingComputeActors()) {
+                    sb << "CA " << ca.first << ", ";
+                }
             }
             LOG_D(sb);
         }
@@ -434,15 +440,17 @@ protected:
         auto nodeId = ev->Get()->NodeId;
         LOG_N("Disconnected node " << nodeId);
 
-        for (auto computeActor : PendingComputeActors) {
-            if (computeActor.first.NodeId() == nodeId) {
-                return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+        if (Planner) {
+            for (auto computeActor : Planner->GetPendingComputeActors()) {
+                if (computeActor.first.NodeId() == nodeId) {
+                    return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+                }
             }
-        }
 
-        for (auto& task : TasksGraph.GetTasks()) {
-            if (task.Meta.NodeId == nodeId && PendingComputeTasks.contains(task.Id)) {
-                return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+            for (auto& task : TasksGraph.GetTasks()) {
+                if (task.Meta.NodeId == nodeId && Planner->GetPendingComputeTasks().contains(task.Id)) {
+                    return ReplyUnavailable(TStringBuilder() << "Connection with node " << nodeId << " lost.");
+                }
             }
         }
     }
@@ -502,13 +510,15 @@ protected:
 
             LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
 
-            if (PendingComputeTasks.erase(taskId) == 0) {
-                LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished");
-            } else {
-                auto result = PendingComputeActors.emplace(std::make_pair(task.ComputeActorId, TProgressStat()));
-                YQL_ENSURE(result.second);
+            if (Planner) {
+                if (Planner->GetPendingComputeTasks().erase(taskId) == 0) {
+                    LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished");
+                } else {
+                    auto result = Planner->GetPendingComputeActors().emplace(std::make_pair(task.ComputeActorId, TProgressStat()));
+                    YQL_ENSURE(result.second);
 
-                CollectTaskChannelsUpdates(task, channelsUpdates);
+                    CollectTaskChannelsUpdates(task, channelsUpdates);
+                }
             }
         }
 
@@ -602,9 +612,11 @@ protected:
         LastResourceUsageUpdate = now;
 
         TProgressStat::TEntry consumption;
-        for (const auto& p : PendingComputeActors) {
-            const auto& t = p.second.GetLastUsage();
-            consumption += t;
+        if (Planner) {
+            for (const auto& p : Planner->GetPendingComputeActors()) {
+                const auto& t = p.second.GetLastUsage();
+                consumption += t;
+            }
         }
 
         for (const auto& p : LastStats) {
@@ -618,8 +630,10 @@ protected:
         if (ru <= 100 && !force)
             return;
 
-        for (auto& p : PendingComputeActors) {
-            p.second.Update();
+        if (Planner) {
+            for (auto& p : Planner->GetPendingComputeActors()) {
+                p.second.Update();
+            }
         }
 
         for (auto& p : LastStats) {
@@ -684,6 +698,7 @@ protected:
             task.Meta.Reads.ConstructInPlace();
             task.Meta.Reads->emplace_back(std::move(readInfo));
             task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
+            task.Meta.Type = TTaskMeta::TTaskType::Compute;
 
             LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
         }
@@ -706,6 +721,8 @@ protected:
             input.SourceType = externalSource.GetType();
 
             task.Meta.DqTaskParams.emplace(externalSource.GetTaskParamKey(), partitionParam);
+            task.Meta.Type = TTaskMeta::TTaskType::Compute;
+
         }
     }
 
@@ -741,6 +758,7 @@ protected:
             YQL_ENSURE(!shardInfo.KeyWriteRanges);
 
             auto& task = TasksGraph.AddTask(stageInfo);
+            task.Meta.Type = TTaskMeta::TTaskType::Scan;
             task.Meta.ExecuterId = this->SelfId();
             if (auto ptr = ShardIdToNodeId.FindPtr(taskLocation)) {
                 task.Meta.NodeId = *ptr;
@@ -921,11 +939,13 @@ protected:
     virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
         google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
     {
-        for (auto computeActor : PendingComputeActors) {
-            LOG_D("terminate compute actor " << computeActor.first);
+        if (Planner) {
+            for (auto computeActor : Planner->GetPendingComputeActors()) {
+                LOG_D("terminate compute actor " << computeActor.first);
 
-            auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
-            this->Send(computeActor.first, ev.Release());
+                auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
+                this->Send(computeActor.first, ev.Release());
+            }
         }
 
         auto& response = *ResponseEv->Record.MutableResponse();
@@ -1095,7 +1115,6 @@ protected:
 
     TActorId KqpTableResolverId;
     TActorId KqpShardsResolverId;
-    THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
     THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
 
     TVector<TProgressStat> LastStats;
@@ -1109,7 +1128,6 @@ protected:
     NWilson::TSpan ExecuterStateSpan;
     NWilson::TSpan ExecuterTableResolveSpan;
 
-    THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources
     TMap<ui64, ui64> ShardIdToNodeId;
     TMap<ui64, TVector<ui64>> ShardsOnNode;
 
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index a3eda7ec17..b7d5257fb7 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -9,6 +9,8 @@
 
 #include <util/generic/set.h>
 
+using namespace NActors;
+
 namespace NKikimr::NKqp {
 
 #define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream)
@@ -59,7 +61,7 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
     bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
     TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
     const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
-    bool isDataQuery)
+    bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization)
     : TxId(txId)
     , ExecuterId(executer)
     , ComputeTasks(std::move(computeTasks))
@@ -76,6 +78,9 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
     , ExecuterRetriesConfig(executerRetriesConfig)
     , TasksGraph(graph)
     , IsDataQuery(isDataQuery)
+    , MkqlMemoryLimit(mkqlMemoryLimit)
+    , AsyncIoFactory(asyncIoFactory)
+    , DoOptimization(doOptimization)
 {
     if (!Database) {
         // a piece of magic for tests
@@ -281,27 +286,131 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
     }
 }
 
+const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {
+    return TasksGraph.GetMeta().Snapshot;
+}
+
+void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) {
+
+    auto& task = TasksGraph.GetTask(taskId);
+    NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task);
+
+    NYql::NDq::TComputeRuntimeSettings settings;
+    if (Deadline) {
+        settings.Timeout = Deadline - TAppData::TimeProvider->Now();
+    }
+    //settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery;
+    settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified;
+    settings.FailOnUndelivery = true;
+    settings.StatsMode = GetDqStatsMode(StatsMode);
+    settings.UseSpilling = false;
+
+    NYql::NDq::TComputeMemoryLimits limits;
+    limits.ChannelBufferSize = 50_MB;
+    limits.MkqlLightProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(500_MB, MkqlMemoryLimit) : 500_MB;
+    limits.MkqlHeavyProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(2_GB, MkqlMemoryLimit) : 2_GB;
+
+    auto& taskOpts = taskDesc->GetProgram().GetSettings();
+    auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/
+        ? limits.MkqlHeavyProgramMemoryLimit
+        : limits.MkqlLightProgramMemoryLimit;
+
+    limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit, limit);
+
+    auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
+        AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
+
+    auto computeActorId = shareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor);
+    task.ComputeActorId = computeActorId;
+
+    LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);
+    
+    auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
+    YQL_ENSURE(result.second);
+}
+
+ui32 TKqpPlanner::GetnScanTasks() {
+    return nScanTasks;
+}
+
+ui32 TKqpPlanner::GetnComputeTasks() {
+    return nComputeTasks;
+}
+
 std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
-    auto err = AssignTasksToNodes();
-    if (err) {
-        return err;
+    nScanTasks = 0;
+
+    for (auto& task : TasksGraph.GetTasks()) {
+        switch (task.Meta.Type) {
+            case TTaskMeta::TTaskType::Compute:
+                ComputeTasks.emplace_back(task.Id);
+                break;
+            case TTaskMeta::TTaskType::Scan:
+                TasksPerNode[task.Meta.NodeId].emplace_back(task.Id);
+                nScanTasks++;
+                break;
+        }
     }
 
-    for(auto& [nodeId, tasks] : TasksPerNode) {
-        SortUnique(tasks);
-        auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
-        request.SerializedRequest = SerializeRequest(request);
-        auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks());
-        if (ev != nullptr) {
-            return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release());
+    LOG_D("Total tasks: " << nScanTasks + nComputeTasks << ", readonly: true"  // TODO ???
+        << ", " << nScanTasks << " scan tasks on " << TasksPerNode.size() << " nodes"
+        << ", execType: " << (IsDataQuery ? "Data" : "Scan")
+        << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}");
+
+    nComputeTasks = ComputeTasks.size();
+
+    if (IsDataQuery) {
+        bool shareMailbox = (ComputeTasks.size() <= 1);
+        for (ui64 taskId : ComputeTasks) {
+            ExecuteDataComputeTask(taskId, shareMailbox);
         }
+        ComputeTasks.clear();
     }
+    
+    if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization) {
+        // query affects a single key or shard, so it might be more effective
+        // to execute this task locally so we can avoid useless overhead for remote task launching.
+        for(auto& [shardId, tasks]: TasksPerNode) {
+            for(ui64 taskId: tasks) {
+                ExecuteDataComputeTask(taskId, true);
+            }
+        }
+
+    } else {
+        for (ui64 taskId : ComputeTasks) {
+            PendingComputeTasks.insert(taskId);
+        }
+
+        for (auto& [shardId, tasks] : TasksPerNode) {
+            for (ui64 taskId : tasks) {
+                PendingComputeTasks.insert(taskId);
+            }
+        }
+
+        auto err = AssignTasksToNodes();
+        if (err) {
+            return err;
+        }
+
+        for(auto& [nodeId, tasks] : TasksPerNode) {
+            SortUnique(tasks);
+            auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
+            request.SerializedRequest = SerializeRequest(request);
+            auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks());
+            if (ev != nullptr) {
+                return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release());
+            }
+        }
+
+    }
+
+
     return nullptr;
 }
 
 TString TKqpPlanner::GetEstimationsInfo() const {
     TStringStream ss;
-    ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:";
+    ss << "ComputeTasks:" << nComputeTasks << ";NodeTasks:";
     if (auto it = TasksPerNode.find(ExecuterId.NodeId()); it != TasksPerNode.end()) {
         ss << it->second.size() << ";";
     } else {
@@ -311,12 +420,20 @@ TString TKqpPlanner::GetEstimationsInfo() const {
 }
 
 void TKqpPlanner::Unsubscribe() {
-    for(ui64 nodeId: TrackingNodes) {
+    for (ui64 nodeId: TrackingNodes) {
         TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(
             TActivationContext::InterconnectProxy(nodeId), ExecuterId, new TEvents::TEvUnsubscribe()));
     }
 }
 
+THashMap<TActorId, TProgressStat>& TKqpPlanner::GetPendingComputeActors() {
+    return PendingComputeActors;
+}
+
+THashSet<ui64>& TKqpPlanner::GetPendingComputeTasks() {
+    return PendingComputeTasks;
+}
+
 void TKqpPlanner::PrepareToProcess() {
     auto rmConfig = GetKqpResourceManager()->GetConfig();
 
@@ -361,11 +478,11 @@ std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 t
     const Ydb::Table::QueryStatsCollection::Mode& statsMode,
     bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
     TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
-    bool isDataQuery)
+    bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization)
 {
     return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot,
         database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan,
-        std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery);
+        std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery, mkqlMemoryLimit, asyncIoFactory, doOptimization);
 }
 
 } // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 5803ca618e..997e2d4499 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -1,11 +1,15 @@
 #pragma once
 
+#include <ydb/core/base/appdata.h>
 #include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
 #include <ydb/core/kqp/gateway/kqp_gateway.h>
 #include <ydb/core/kqp/node_service/kqp_node_service.h>
 #include <ydb/core/kqp/rm_service/kqp_rm_service.h>
 #include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
 
+#include <library/cpp/actors/core/actor.h>
 #include <library/cpp/actors/core/actor_bootstrapped.h>
 #include <library/cpp/actors/wilson/wilson_span.h>
 #include <library/cpp/actors/core/hfunc.h>
@@ -40,7 +44,7 @@ public:
         const Ydb::Table::QueryStatsCollection::Mode& statsMode,
         bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
         TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
-        bool isDataQuery);
+        bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization);
 
     bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target);
     std::unique_ptr<IEventHandle> PlanExecution();
@@ -49,14 +53,24 @@ public:
     ui32 GetCurrentRetryDelay(ui32 requestId);
     void Unsubscribe();
 
-private:
+    THashMap<TActorId, TProgressStat>& GetPendingComputeActors();
+    THashSet<ui64>& GetPendingComputeTasks();
+
+    ui32 GetnScanTasks();
+    ui32 GetnComputeTasks();
 
+private:
+    
+    const IKqpGateway::TKqpSnapshot& GetSnapshot() const;
+    void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox);
     void PrepareToProcess();
     TString GetEstimationsInfo() const;
 
     std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData);
     ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
 
+    
+
 private:
     const ui64 TxId;
     const TActorId ExecuterId;
@@ -78,6 +92,16 @@ private:
     TVector<TRequestData> Requests;
     TKqpTasksGraph& TasksGraph;
     const bool IsDataQuery;
+    ui64 MkqlMemoryLimit;
+    NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
+    ui32 nComputeTasks = 0;
+    ui32 nScanTasks = 0;
+    bool DoOptimization;
+
+    THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
+    THashSet<ui64> PendingComputeTasks; // Not started yet, waiting resources
+
+
 };
 
 std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
@@ -86,6 +110,7 @@ std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 t
     const Ydb::Table::QueryStatsCollection::Mode& statsMode,
     bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
     TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
-    const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery);
+    const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool isDataQuery,
+    ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization);
 
 } // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 575c2fdfa9..1a99aff3af 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -254,6 +254,7 @@ private:
             task.Meta.ExecuterId = SelfId();
             task.Meta.NodeId = nodeId;
             task.Meta.ScanTask = true;
+            task.Meta.Type = TTaskMeta::TTaskType::Scan;
             return task;
         }
 
@@ -264,6 +265,7 @@ private:
             auto& task = TasksGraph.AddTask(stageInfo);
             task.Meta.NodeId = nodeId;
             task.Meta.ScanTask = true;
+            task.Meta.Type = TTaskMeta::TTaskType::Scan;
             tasks.push_back(task.Id);
             ++cnt;
             return task;
@@ -400,6 +402,7 @@ private:
                         task.Meta.ExecuterId = SelfId();
                         task.Meta.NodeId = nodeId;
                         task.Meta.ScanTask = true;
+                        task.Meta.Type = TTaskMeta::TTaskType::Scan;
                         task.SetMetaId(metaGlueingId);
                     }
                 }
@@ -465,6 +468,7 @@ private:
 
         for (ui32 i = 0; i < partitionsCount; ++i) {
             auto& task = TasksGraph.AddTask(stageInfo);
+            task.Meta.Type = TTaskMeta::TTaskType::Compute;
             LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
         }
     }
@@ -570,33 +574,21 @@ private:
             return;
         }
 
-        // NodeId -> {Tasks}
-        THashMap<ui64, TVector<ui64>> scanTasks;
         ui32 nShardScans = 0;
-        ui32 nScanTasks = 0;
-
         TVector<ui64> computeTasks;
 
         InitializeChannelProxies();
 
+        // calc stats
         for (auto& task : TasksGraph.GetTasks()) {
             auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
-
+            
             if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {
                 // Task with source
                 if (!task.Meta.Reads) {
-                    scanTasks[task.Meta.NodeId].emplace_back(task.Id);
-                    nScanTasks++;
                     continue;
                 }
 
-                if (stageInfo.Meta.IsSysView()) {
-                    computeTasks.emplace_back(task.Id);
-                } else {
-                    scanTasks[task.Meta.NodeId].emplace_back(task.Id);
-                    nScanTasks++;
-                }
-
                 nShardScans += task.Meta.Reads->size();
                 if (Stats) {
                     for(const auto& read: *task.Meta.Reads) {
@@ -604,17 +596,15 @@ private:
                     }
                 }
 
-            } else {
-                computeTasks.emplace_back(task.Id);
             }
         }
 
-
-        if (computeTasks.size() + nScanTasks > Request.MaxComputeActors) {
-            LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks);
+        if (TasksGraph.GetTasks().size() > Request.MaxComputeActors) {
+            // LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks);
+            LOG_N("Too many compute actors: totalTasks=" << TasksGraph.GetTasks().size());
             TBase::ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
                 YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
-                    << "Requested too many execution units: " << (computeTasks.size() + nScanTasks)));
+                    << "Requested too many execution units: " << TasksGraph.GetTasks().size()));
             return;
         }
 
@@ -622,12 +612,9 @@ private:
             prepareTasksSpan.End();
         }
 
-        LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true"
-            << ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes"
-            << ", totalShardScans: " << nShardScans << ", execType: Scan"
-            << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}");
+        LOG_D("TotalShardScans: " << nShardScans);
 
-        ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot));
+        ExecuteScanTx(std::move(snapshot));
 
         Become(&TKqpScanExecuter::ExecuteState);
         if (ExecuterStateSpan) {
@@ -667,30 +654,22 @@ public:
     }
 
 private:
-    void ExecuteScanTx(TVector<ui64>&& computeTasks, THashMap<ui64, TVector<ui64>>&& tasksPerNode,
-        TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
-        LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), tasksPerNode.size());
-        for (const auto& [_, tasks]: tasksPerNode) {
-            for (const auto& task : tasks) {
-                PendingComputeTasks.insert(task);
-            }
-        }
-
-        for (auto& task : computeTasks) {
-            PendingComputeTasks.insert(task);
-        }
-
-        Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), std::move(computeTasks),
-            std::move(tasksPerNode), GetSnapshot(),
+    void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
+        
+        Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {},
+            {}, GetSnapshot(),
             Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
-            Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */);
-        LOG_D("Execute scan tx, PendingComputeTasks: " << PendingComputeTasks.size());
+            Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false);
+        
+        LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
         auto err = Planner->PlanExecution();
         if (err) {
             TlsActivationContext->Send(err.release());
             return;
         }
 
+        LWTRACK(KqpScanExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, Planner->GetnComputeTasks(), Planner->GetnComputeTasks());
+
         Planner->Submit();
     }
 
@@ -698,14 +677,16 @@ private:
     void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
         google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues) override
     {
-        if (!PendingComputeTasks.empty()) {
-            LOG_D("terminate pending resources request: " << Ydb::StatusIds::StatusCode_Name(status));
+        if (Planner) {
+            if (!Planner->GetPendingComputeTasks().empty()) {
+                LOG_D("terminate pending resources request: " << Ydb::StatusIds::StatusCode_Name(status));
 
-            auto ev = MakeHolder<TEvKqpNode::TEvCancelKqpTasksRequest>();
-            ev->Record.SetTxId(TxId);
-            ev->Record.SetReason(Ydb::StatusIds::StatusCode_Name(status));
+                auto ev = MakeHolder<TEvKqpNode::TEvCancelKqpTasksRequest>();
+                ev->Record.SetTxId(TxId);
+                ev->Record.SetReason(Ydb::StatusIds::StatusCode_Name(status));
 
-            Send(MakeKqpNodeServiceID(SelfId().NodeId()), ev.Release());
+                Send(MakeKqpNodeServiceID(SelfId().NodeId()), ev.Release());
+            }
         }
 
         TBase::ReplyErrorAndDie(status, issues);
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 829ba1304f..fbde955300 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -159,10 +159,18 @@ public:
     ui64 NodeId = 0;  // only in case of scans over persistent snapshots
     bool ScanTask = false;
     TActorId ExecuterId;
+    ui32 Type = Unknown;
 
     THashMap<TString, TString> DqTaskParams; // Params for sources/sinks
     THashMap<TString, TString> DqSecureParams;
 
+    enum TTaskType : ui32 {
+        Unknown = 0,
+        Compute = 1,
+        Scan = 2,
+        DataShard = 3,
+    };
+
     struct TColumn {
         ui32 Id = 0;
         NScheme::TTypeInfo Type;
-- 
cgit v1.2.3