diff options
| author | ssmike <[email protected]> | 2022-12-05 14:33:29 +0300 |
|---|---|---|
| committer | ssmike <[email protected]> | 2022-12-05 14:33:29 +0300 |
| commit | e962cf51046a42137a0c3177ea395e2c7d502be2 (patch) | |
| tree | ed2044fc8dd578061b5409838b70a90c5feb481c | |
| parent | e73b7f27bad0c59484ae72d75cfc8a20f4ab8e79 (diff) | |
Resolve only shards used in remote tasks
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 123 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 42 |
2 files changed, 120 insertions, 45 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 3fef766480e..1130e7a08e4 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1389,6 +1389,10 @@ private: } public: + void OnTablesResolve() override { + Execute(); + } + void Execute() { NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); @@ -1455,6 +1459,7 @@ public: } THashMap<ui64, TVector<NDqProto::TDqTask>> datashardTasks; // shardId -> [task] + THashMap<ui64, TVector<NDqProto::TDqTask>> remoteComputeTasks; // shardId -> [task] TVector<NDqProto::TDqTask> computeTasks; for (auto& task : TasksGraph.GetTasks()) { @@ -1478,7 +1483,7 @@ public: PrepareKqpTaskParameters(stage, stageInfo, task, taskDesc, typeEnv, holderFactory); - if (task.Meta.ShardId) { + if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta; FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); @@ -1578,7 +1583,11 @@ public: taskDesc.MutableMeta()->PackFrom(protoTaskMeta); computeTasks.emplace_back(std::move(taskDesc)); } else { - computeTasks.emplace_back(std::move(taskDesc)); + if (task.Meta.ShardId) { + remoteComputeTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc)); + } else { + computeTasks.emplace_back(std::move(taskDesc)); + } } } @@ -1635,6 +1644,30 @@ public: ImmediateTx = true; } + ComputeTasks = std::move(computeTasks); + DatashardTxs = std::move(datashardTxs); + TopicTxs = std::move(topicTxs); + RemoteComputeTasks = std::move(remoteComputeTasks); + + if (prepareTasksSpan) { + prepareTasksSpan.End(); + } + + if (RemoteComputeTasks) { + TSet<ui64> shardIds; + for (const auto& [shardId, _] : RemoteComputeTasks) { + shardIds.insert(shardId); + } + + auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds)); + RegisterWithSameMailbox(kqpShardsResolver); + Become(&TKqpDataExecuter::WaitResolveState); + } else { + OnShardsResolve(); + } + } + + void OnShardsResolve() override { const bool forceSnapshot = ( ReadOnlyTx && !ImmediateTx && @@ -1643,10 +1676,6 @@ public: AppData()->FeatureFlags.GetEnableMvccSnapshotReads()); if (forceSnapshot) { - ComputeTasks = std::move(computeTasks); - DatashardTxs = std::move(datashardTxs); - TopicTxs = std::move(topicTxs); - auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId()); Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database)); @@ -1660,11 +1689,7 @@ public: return; } - if (prepareTasksSpan) { - prepareTasksSpan.End(); - } - - ContinueExecute(computeTasks, datashardTxs, topicTxs); + ContinueExecute(); } private: @@ -1694,20 +1719,13 @@ private: Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); ImmediateTx = true; - auto computeTasks = std::move(ComputeTasks); - auto datashardTxs = std::move(DatashardTxs); - auto topicTxs = std::move(TopicTxs); - - ContinueExecute(computeTasks, datashardTxs, topicTxs); + ContinueExecute(); } using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>; using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TKqpTransaction>; - void ContinueExecute(TVector<NDqProto::TDqTask>& computeTasks, - TDatashardTxs& datashardTxs, - TTopicTabletTxs& topicTxs) - { + void ContinueExecute() { UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE; if (!ImmediateTx) { @@ -1725,11 +1743,11 @@ private: if (Stats) { //Stats->AffectedShards = datashardTxs.size(); - Stats->DatashardStats.reserve(datashardTxs.size()); + Stats->DatashardStats.reserve(DatashardTxs.size()); //Stats->ComputeStats.reserve(computeTasks.size()); } - Execute(computeTasks, datashardTxs, topicTxs); + ExecuteTasks(); if (ImmediateTx) { LOG_T("Immediate tx, become ExecuteState"); @@ -1840,7 +1858,7 @@ private: return datashardTxs; } - void Execute(TVector<NDqProto::TDqTask>& computeTasks, TDatashardTxs& datashardTxs, TTopicTabletTxs& topicTxs) { + void ExecuteTasks() { auto lockTxId = Request.AcquireLocksTxId; if (lockTxId.Defined() && *lockTxId == 0) { lockTxId = TxId; @@ -1848,18 +1866,60 @@ private: } NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END); - LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size()); + LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size()); // first, start compute tasks - TVector<ui64> computeTaskIds{Reserve(computeTasks.size())}; - for (auto&& taskDesc : computeTasks) { + TVector<ui64> computeTaskIds{Reserve(ComputeTasks.size())}; + for (auto&& taskDesc : ComputeTasks) { computeTaskIds.emplace_back(taskDesc.GetId()); FillInputSettings(taskDesc, lockTxId); ExecuteDataComputeTask(std::move(taskDesc)); } + size_t remoteComputeTasksCnt = 0; + THashMap<ui64, TVector<NDqProto::TDqTask>> tasksPerNode; + for (auto& [shardId, tasks] : RemoteComputeTasks) { + auto it = ShardIdToNodeId.find(shardId); + YQL_ENSURE(it != ShardIdToNodeId.end()); + + for (auto& taskDesc : tasks) { + remoteComputeTasksCnt += 1; + FillInputSettings(taskDesc, lockTxId); + tasksPerNode[it->second].emplace_back(std::move(taskDesc)); + } + } + + for (auto& [nodeId, tasks] : tasksPerNode) { + auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>(); + + ev->Record.SetTxId(TxId); + ActorIdToProto(SelfId(), ev->Record.MutableExecuterActorId()); + + if (Deadline) { + TDuration timeout = *Deadline - TAppData::TimeProvider->Now(); + ev->Record.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds()); + } + + ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA); + ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(Request.StatsMode)); + ev->Record.MutableRuntimeSettings()->SetUseLLVM(false); + ev->Record.SetStartAllOrFail(true); + + for (auto&& task : tasks) { + ev->Record.AddTasks()->Swap(&task); + } + + auto target = MakeKqpNodeServiceID(nodeId); + + ui32 flags = IEventHandle::FlagTrackDelivery; + if (SubscribedNodes.emplace(nodeId).second) { + flags |= IEventHandle::FlagSubscribeOnSession; + } + TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags)); + } + // then start data tasks with known actor ids of compute tasks - for (auto& [shardId, shardTx] : datashardTxs) { + for (auto& [shardId, shardTx] : DatashardTxs) { shardTx.SetType(NKikimrTxDataShard::KQP_TX_TYPE_DATA); for (auto& protoTask : *shardTx.MutableTasks()) { @@ -1907,7 +1967,7 @@ private: ExecuteDatashardTransaction(shardId, shardTx, lockTxId); } - ExecuteTopicTabletTransactions(topicTxs); + ExecuteTopicTabletTransactions(TopicTxs); if (sendTasksSpan) { sendTasksSpan.End(); @@ -1915,9 +1975,10 @@ private: LOG_I("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: " << ReadOnlyTx - << ", datashardTxs: " << datashardTxs.size() + << ", datashardTxs: " << DatashardTxs.size() << ", topicTxs: " << Request.TopicOperations.GetSize() << ", immediate: " << ImmediateTx + << ", remote tasks" << remoteComputeTasksCnt << ", useFollowers: " << UseFollowers); LOG_T("Updating channels after the creation of compute actors"); @@ -2112,7 +2173,9 @@ private: // Either requested or temporarily acquired snapshot TKqpSnapshot Snapshot; - // Temporary storage during snapshot acquisition + THashSet<ui64> SubscribedNodes; + + THashMap<ui64, TVector<NDqProto::TDqTask>> RemoteComputeTasks; TVector<NDqProto::TDqTask> ComputeTasks; TDatashardTxs DatashardTxs; TTopicTabletTxs TopicTxs; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index cbf8ff70e43..d7606afe190 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -142,16 +142,7 @@ protected: ReportEventElapsedTime(); } - void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { - auto& reply = *ev->Get(); - - KqpTableResolverId = {}; - - if (reply.Status != Ydb::StatusIds::SUCCESS) { - ReplyErrorAndDie(reply.Status, reply.Issues); - return; - } - + TSet<ui64> CollectShards() { TSet<ui64> shardIds; for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) { if (stageInfo.Meta.ShardKey) { @@ -160,11 +151,11 @@ protected: } } } + return shardIds; + } - if (ExecuterTableResolveSpan) { - ExecuterTableResolveSpan.End(); - } - + virtual void OnTablesResolve() { + TSet<ui64> shardIds = CollectShards(); if (shardIds.size() > 0) { LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds)); @@ -174,6 +165,27 @@ protected: } } + void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { + auto& reply = *ev->Get(); + + KqpTableResolverId = {}; + + if (reply.Status != Ydb::StatusIds::SUCCESS) { + ReplyErrorAndDie(reply.Status, reply.Issues); + return; + } + + if (ExecuterTableResolveSpan) { + ExecuterTableResolveSpan.End(); + } + + OnTablesResolve(); + } + + virtual void OnShardsResolve() { + static_cast<TDerived*>(this)->Execute(); + } + void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { auto& reply = *ev->Get(); @@ -210,7 +222,7 @@ protected: LOG_D(sb); } - static_cast<TDerived*>(this)->Execute(); + OnShardsResolve(); } void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { |
