diff options
author | gvit <gvit@ydb.tech> | 2023-03-07 01:46:56 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-03-07 01:46:56 +0300 |
commit | db461a7619dcd5bc525a809660d32a5331638e85 (patch) | |
tree | 9be85ef31e8166188c3666ee698b2d355c3e2a95 | |
parent | cab62bb86c1d8bb2c5289f4a20a707307605d55c (diff) | |
download | ydb-db461a7619dcd5bc525a809660d32a5331638e85.tar.gz |
fill channel info for some tasks
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 525eb24a11..23fb451104 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2169,6 +2169,28 @@ private: YQL_ENSURE(it != ShardIdToNodeId.end()); for (auto& taskDesc : tasks) { + ui64 taskId = taskDesc.GetId(); + auto& task = TasksGraph.GetTask(taskId); + for (ui64 outputIndex = 0; outputIndex < task.Outputs.size(); ++outputIndex) { + auto& output = task.Outputs[outputIndex]; + auto* protoOutput = taskDesc.MutableOutputs(outputIndex); + + for (ui64 outputChannelIndex = 0; outputChannelIndex < output.Channels.size(); ++outputChannelIndex) { + ui64 outputChannelId = output.Channels[outputChannelIndex]; + auto* protoChannel = protoOutput->MutableChannels(outputChannelIndex); + + ui64 dstTaskId = TasksGraph.GetChannel(outputChannelId).DstTask; + if (dstTaskId == 0) { + continue; + } + + auto& dstTask = TasksGraph.GetTask(dstTaskId); + if (dstTask.ComputeActorId) { + protoChannel->MutableDstEndpoint()->Clear(); + ActorIdToProto(dstTask.ComputeActorId, protoChannel->MutableDstEndpoint()->MutableActorId()); + } + } + } remoteComputeTasksCnt += 1; FillInputSettings(taskDesc); PendingComputeTasks.insert(taskDesc.GetId()); |