summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <[email protected]>2022-12-05 14:33:29 +0300
committerssmike <[email protected]>2022-12-05 14:33:29 +0300
commite962cf51046a42137a0c3177ea395e2c7d502be2 (patch)
treeed2044fc8dd578061b5409838b70a90c5feb481c
parente73b7f27bad0c59484ae72d75cfc8a20f4ab8e79 (diff)
Resolve only shards used in remote tasks
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp123
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h42
2 files changed, 120 insertions, 45 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 3fef766480e..1130e7a08e4 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1389,6 +1389,10 @@ private:
}
public:
+ void OnTablesResolve() override {
+ Execute();
+ }
+
void Execute() {
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
@@ -1455,6 +1459,7 @@ public:
}
THashMap<ui64, TVector<NDqProto::TDqTask>> datashardTasks; // shardId -> [task]
+ THashMap<ui64, TVector<NDqProto::TDqTask>> remoteComputeTasks; // shardId -> [task]
TVector<NDqProto::TDqTask> computeTasks;
for (auto& task : TasksGraph.GetTasks()) {
@@ -1478,7 +1483,7 @@ public:
PrepareKqpTaskParameters(stage, stageInfo, task, taskDesc, typeEnv, holderFactory);
- if (task.Meta.ShardId) {
+ if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta;
FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
@@ -1578,7 +1583,11 @@ public:
taskDesc.MutableMeta()->PackFrom(protoTaskMeta);
computeTasks.emplace_back(std::move(taskDesc));
} else {
- computeTasks.emplace_back(std::move(taskDesc));
+ if (task.Meta.ShardId) {
+ remoteComputeTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc));
+ } else {
+ computeTasks.emplace_back(std::move(taskDesc));
+ }
}
}
@@ -1635,6 +1644,30 @@ public:
ImmediateTx = true;
}
+ ComputeTasks = std::move(computeTasks);
+ DatashardTxs = std::move(datashardTxs);
+ TopicTxs = std::move(topicTxs);
+ RemoteComputeTasks = std::move(remoteComputeTasks);
+
+ if (prepareTasksSpan) {
+ prepareTasksSpan.End();
+ }
+
+ if (RemoteComputeTasks) {
+ TSet<ui64> shardIds;
+ for (const auto& [shardId, _] : RemoteComputeTasks) {
+ shardIds.insert(shardId);
+ }
+
+ auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds));
+ RegisterWithSameMailbox(kqpShardsResolver);
+ Become(&TKqpDataExecuter::WaitResolveState);
+ } else {
+ OnShardsResolve();
+ }
+ }
+
+ void OnShardsResolve() override {
const bool forceSnapshot = (
ReadOnlyTx &&
!ImmediateTx &&
@@ -1643,10 +1676,6 @@ public:
AppData()->FeatureFlags.GetEnableMvccSnapshotReads());
if (forceSnapshot) {
- ComputeTasks = std::move(computeTasks);
- DatashardTxs = std::move(datashardTxs);
- TopicTxs = std::move(topicTxs);
-
auto longTxService = NLongTxService::MakeLongTxServiceID(SelfId().NodeId());
Send(longTxService, new NLongTxService::TEvLongTxService::TEvAcquireReadSnapshot(Database));
@@ -1660,11 +1689,7 @@ public:
return;
}
- if (prepareTasksSpan) {
- prepareTasksSpan.End();
- }
-
- ContinueExecute(computeTasks, datashardTxs, topicTxs);
+ ContinueExecute();
}
private:
@@ -1694,20 +1719,13 @@ private:
Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
ImmediateTx = true;
- auto computeTasks = std::move(ComputeTasks);
- auto datashardTxs = std::move(DatashardTxs);
- auto topicTxs = std::move(TopicTxs);
-
- ContinueExecute(computeTasks, datashardTxs, topicTxs);
+ ContinueExecute();
}
using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>;
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TKqpTransaction>;
- void ContinueExecute(TVector<NDqProto::TDqTask>& computeTasks,
- TDatashardTxs& datashardTxs,
- TTopicTabletTxs& topicTxs)
- {
+ void ContinueExecute() {
UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
if (!ImmediateTx) {
@@ -1725,11 +1743,11 @@ private:
if (Stats) {
//Stats->AffectedShards = datashardTxs.size();
- Stats->DatashardStats.reserve(datashardTxs.size());
+ Stats->DatashardStats.reserve(DatashardTxs.size());
//Stats->ComputeStats.reserve(computeTasks.size());
}
- Execute(computeTasks, datashardTxs, topicTxs);
+ ExecuteTasks();
if (ImmediateTx) {
LOG_T("Immediate tx, become ExecuteState");
@@ -1840,7 +1858,7 @@ private:
return datashardTxs;
}
- void Execute(TVector<NDqProto::TDqTask>& computeTasks, TDatashardTxs& datashardTxs, TTopicTabletTxs& topicTxs) {
+ void ExecuteTasks() {
auto lockTxId = Request.AcquireLocksTxId;
if (lockTxId.Defined() && *lockTxId == 0) {
lockTxId = TxId;
@@ -1848,18 +1866,60 @@ private:
}
NWilson::TSpan sendTasksSpan(TWilsonKqp::DataExecuterSendTasksAndTxs, ExecuterStateSpan.GetTraceId(), "SendTasksAndTxs", NWilson::EFlags::AUTO_END);
- LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, computeTasks.size(), datashardTxs.size());
+ LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size());
// first, start compute tasks
- TVector<ui64> computeTaskIds{Reserve(computeTasks.size())};
- for (auto&& taskDesc : computeTasks) {
+ TVector<ui64> computeTaskIds{Reserve(ComputeTasks.size())};
+ for (auto&& taskDesc : ComputeTasks) {
computeTaskIds.emplace_back(taskDesc.GetId());
FillInputSettings(taskDesc, lockTxId);
ExecuteDataComputeTask(std::move(taskDesc));
}
+ size_t remoteComputeTasksCnt = 0;
+ THashMap<ui64, TVector<NDqProto::TDqTask>> tasksPerNode;
+ for (auto& [shardId, tasks] : RemoteComputeTasks) {
+ auto it = ShardIdToNodeId.find(shardId);
+ YQL_ENSURE(it != ShardIdToNodeId.end());
+
+ for (auto& taskDesc : tasks) {
+ remoteComputeTasksCnt += 1;
+ FillInputSettings(taskDesc, lockTxId);
+ tasksPerNode[it->second].emplace_back(std::move(taskDesc));
+ }
+ }
+
+ for (auto& [nodeId, tasks] : tasksPerNode) {
+ auto ev = MakeHolder<TEvKqpNode::TEvStartKqpTasksRequest>();
+
+ ev->Record.SetTxId(TxId);
+ ActorIdToProto(SelfId(), ev->Record.MutableExecuterActorId());
+
+ if (Deadline) {
+ TDuration timeout = *Deadline - TAppData::TimeProvider->Now();
+ ev->Record.MutableRuntimeSettings()->SetTimeoutMs(timeout.MilliSeconds());
+ }
+
+ ev->Record.MutableRuntimeSettings()->SetExecType(NDqProto::TComputeRuntimeSettings::DATA);
+ ev->Record.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(Request.StatsMode));
+ ev->Record.MutableRuntimeSettings()->SetUseLLVM(false);
+ ev->Record.SetStartAllOrFail(true);
+
+ for (auto&& task : tasks) {
+ ev->Record.AddTasks()->Swap(&task);
+ }
+
+ auto target = MakeKqpNodeServiceID(nodeId);
+
+ ui32 flags = IEventHandle::FlagTrackDelivery;
+ if (SubscribedNodes.emplace(nodeId).second) {
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ }
+ TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags));
+ }
+
// then start data tasks with known actor ids of compute tasks
- for (auto& [shardId, shardTx] : datashardTxs) {
+ for (auto& [shardId, shardTx] : DatashardTxs) {
shardTx.SetType(NKikimrTxDataShard::KQP_TX_TYPE_DATA);
for (auto& protoTask : *shardTx.MutableTasks()) {
@@ -1907,7 +1967,7 @@ private:
ExecuteDatashardTransaction(shardId, shardTx, lockTxId);
}
- ExecuteTopicTabletTransactions(topicTxs);
+ ExecuteTopicTabletTransactions(TopicTxs);
if (sendTasksSpan) {
sendTasksSpan.End();
@@ -1915,9 +1975,10 @@ private:
LOG_I("Total tasks: " << TasksGraph.GetTasks().size()
<< ", readonly: " << ReadOnlyTx
- << ", datashardTxs: " << datashardTxs.size()
+ << ", datashardTxs: " << DatashardTxs.size()
<< ", topicTxs: " << Request.TopicOperations.GetSize()
<< ", immediate: " << ImmediateTx
+ << ", remote tasks" << remoteComputeTasksCnt
<< ", useFollowers: " << UseFollowers);
LOG_T("Updating channels after the creation of compute actors");
@@ -2112,7 +2173,9 @@ private:
// Either requested or temporarily acquired snapshot
TKqpSnapshot Snapshot;
- // Temporary storage during snapshot acquisition
+ THashSet<ui64> SubscribedNodes;
+
+ THashMap<ui64, TVector<NDqProto::TDqTask>> RemoteComputeTasks;
TVector<NDqProto::TDqTask> ComputeTasks;
TDatashardTxs DatashardTxs;
TTopicTabletTxs TopicTxs;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index cbf8ff70e43..d7606afe190 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -142,16 +142,7 @@ protected:
ReportEventElapsedTime();
}
- void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
- auto& reply = *ev->Get();
-
- KqpTableResolverId = {};
-
- if (reply.Status != Ydb::StatusIds::SUCCESS) {
- ReplyErrorAndDie(reply.Status, reply.Issues);
- return;
- }
-
+ TSet<ui64> CollectShards() {
TSet<ui64> shardIds;
for (auto& [stageId, stageInfo] : TasksGraph.GetStagesInfo()) {
if (stageInfo.Meta.ShardKey) {
@@ -160,11 +151,11 @@ protected:
}
}
}
+ return shardIds;
+ }
- if (ExecuterTableResolveSpan) {
- ExecuterTableResolveSpan.End();
- }
-
+ virtual void OnTablesResolve() {
+ TSet<ui64> shardIds = CollectShards();
if (shardIds.size() > 0) {
LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
auto kqpShardsResolver = CreateKqpShardsResolver(this->SelfId(), TxId, std::move(shardIds));
@@ -174,6 +165,27 @@ protected:
}
}
+ void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
+ auto& reply = *ev->Get();
+
+ KqpTableResolverId = {};
+
+ if (reply.Status != Ydb::StatusIds::SUCCESS) {
+ ReplyErrorAndDie(reply.Status, reply.Issues);
+ return;
+ }
+
+ if (ExecuterTableResolveSpan) {
+ ExecuterTableResolveSpan.End();
+ }
+
+ OnTablesResolve();
+ }
+
+ virtual void OnShardsResolve() {
+ static_cast<TDerived*>(this)->Execute();
+ }
+
void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
auto& reply = *ev->Get();
@@ -210,7 +222,7 @@ protected:
LOG_D(sb);
}
- static_cast<TDerived*>(this)->Execute();
+ OnShardsResolve();
}
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {