aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-03-07 01:46:56 +0300
committergvit <gvit@ydb.tech>2023-03-07 01:46:56 +0300
commitdb461a7619dcd5bc525a809660d32a5331638e85 (patch)
tree9be85ef31e8166188c3666ee698b2d355c3e2a95
parentcab62bb86c1d8bb2c5289f4a20a707307605d55c (diff)
downloadydb-db461a7619dcd5bc525a809660d32a5331638e85.tar.gz
fill channel info for some tasks
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp22
1 files changed, 22 insertions, 0 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 525eb24a11..23fb451104 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2169,6 +2169,28 @@ private:
YQL_ENSURE(it != ShardIdToNodeId.end());
for (auto& taskDesc : tasks) {
+ ui64 taskId = taskDesc.GetId();
+ auto& task = TasksGraph.GetTask(taskId);
+ for (ui64 outputIndex = 0; outputIndex < task.Outputs.size(); ++outputIndex) {
+ auto& output = task.Outputs[outputIndex];
+ auto* protoOutput = taskDesc.MutableOutputs(outputIndex);
+
+ for (ui64 outputChannelIndex = 0; outputChannelIndex < output.Channels.size(); ++outputChannelIndex) {
+ ui64 outputChannelId = output.Channels[outputChannelIndex];
+ auto* protoChannel = protoOutput->MutableChannels(outputChannelIndex);
+
+ ui64 dstTaskId = TasksGraph.GetChannel(outputChannelId).DstTask;
+ if (dstTaskId == 0) {
+ continue;
+ }
+
+ auto& dstTask = TasksGraph.GetTask(dstTaskId);
+ if (dstTask.ComputeActorId) {
+ protoChannel->MutableDstEndpoint()->Clear();
+ ActorIdToProto(dstTask.ComputeActorId, protoChannel->MutableDstEndpoint()->MutableActorId());
+ }
+ }
+ }
remoteComputeTasksCnt += 1;
FillInputSettings(taskDesc);
PendingComputeTasks.insert(taskDesc.GetId());