diff options
author | spuchin <spuchin@ydb.tech> | 2022-10-05 11:00:39 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-10-05 11:00:39 +0300 |
commit | 042eb79d581762c5c3c023887e62fc7f9578ac65 (patch) | |
tree | 651d769388cc7d097fd719b67df61277498f37d7 | |
parent | 3da5a97799524b18ff4d81d255a2ac84389c0c95 (diff) | |
download | ydb-042eb79d581762c5c3c023887e62fc7f9578ac65.tar.gz |
Fix undefined output on broadcast with no output channels. ()
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 19 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/tasks/dq_connection_builder.h | 8 |
3 files changed, 23 insertions, 5 deletions
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index f05e8ace315..68aa49f9267 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3600,6 +3600,25 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { CompareYson(R"([[[-1]];[[-1]];[[0]];[[0]];[[1]];[[1]]])", FormatResultSetYson(result.GetResultSet(1))); } + + Y_UNIT_TEST(EmptyMapWithBroadcast) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + + SELECT ts.Value1 AS c1, kv.Value AS c2, t.Name AS c3 + FROM TwoShard AS ts + INNER JOIN KeyValue AS kv ON ts.Value2 = kv.Key + INNER JOIN Test AS t ON ts.Key = t.Group + WHERE ts.Key = 30; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index d4c5702b673..eb2887d1604 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1775,7 +1775,6 @@ private: for (ui32 i = 0; i < Task.OutputsSize(); ++i) { const auto& outputDesc = Task.GetOutputs(i); Y_VERIFY(!outputDesc.HasSink() || outputDesc.ChannelsSize() == 0); // HasSink => no channels - Y_VERIFY(outputDesc.HasSink() || outputDesc.ChannelsSize() > 0); if (outputDesc.HasTransform()) { auto result = OutputTransformsMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple()); diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index ee7f73726ee..854cb6977b8 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -197,6 +197,10 @@ void BuildBroadcastChannels(TGraph& graph, const typename TGraph::TStageInfoType auto originTaskId = inputStageInfo.Tasks[0]; auto& targetTasks = stageInfo.Tasks; + auto& originTask = graph.GetTask(originTaskId); + auto& taskOutput = originTask.Outputs[outputIndex]; + taskOutput.Type = TTaskOutputType::Broadcast; + for (size_t i = 0; i < targetTasks.size(); ++i) { auto targetTaskId = targetTasks[i]; @@ -207,14 +211,10 @@ void BuildBroadcastChannels(TGraph& graph, const typename TGraph::TStageInfoType channel.DstInputIndex = inputIndex; channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1; - auto& originTask = graph.GetTask(originTaskId); auto& targetTask = graph.GetTask(targetTaskId); auto& taskInput = targetTask.Inputs[inputIndex]; taskInput.Channels.push_back(channel.Id); - - auto& taskOutput = originTask.Outputs[outputIndex]; - taskOutput.Type = TTaskOutputType::Broadcast; taskOutput.Channels.push_back(channel.Id); logFunc(channel.Id, originTaskId, targetTaskId, "Broadcast/Broadcast", !channel.InMemory); |