aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-10-05 11:00:39 +0300
committerspuchin <spuchin@ydb.tech>2022-10-05 11:00:39 +0300
commit042eb79d581762c5c3c023887e62fc7f9578ac65 (patch)
tree651d769388cc7d097fd719b67df61277498f37d7
parent3da5a97799524b18ff4d81d255a2ac84389c0c95 (diff)
downloadydb-042eb79d581762c5c3c023887e62fc7f9578ac65.tar.gz
Fix undefined output on broadcast with no output channels. ()
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp19
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h1
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h8
3 files changed, 23 insertions, 5 deletions
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index f05e8ace315..68aa49f9267 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3600,6 +3600,25 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
CompareYson(R"([[[-1]];[[-1]];[[0]];[[0]];[[1]];[[1]]])",
FormatResultSetYson(result.GetResultSet(1)));
}
+
+ Y_UNIT_TEST(EmptyMapWithBroadcast) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+
+ SELECT ts.Value1 AS c1, kv.Value AS c2, t.Name AS c3
+ FROM TwoShard AS ts
+ INNER JOIN KeyValue AS kv ON ts.Value2 = kv.Key
+ INNER JOIN Test AS t ON ts.Key = t.Group
+ WHERE ts.Key = 30;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index d4c5702b673..eb2887d1604 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1775,7 +1775,6 @@ private:
for (ui32 i = 0; i < Task.OutputsSize(); ++i) {
const auto& outputDesc = Task.GetOutputs(i);
Y_VERIFY(!outputDesc.HasSink() || outputDesc.ChannelsSize() == 0); // HasSink => no channels
- Y_VERIFY(outputDesc.HasSink() || outputDesc.ChannelsSize() > 0);
if (outputDesc.HasTransform()) {
auto result = OutputTransformsMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple());
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index ee7f73726ee..854cb6977b8 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -197,6 +197,10 @@ void BuildBroadcastChannels(TGraph& graph, const typename TGraph::TStageInfoType
auto originTaskId = inputStageInfo.Tasks[0];
auto& targetTasks = stageInfo.Tasks;
+ auto& originTask = graph.GetTask(originTaskId);
+ auto& taskOutput = originTask.Outputs[outputIndex];
+ taskOutput.Type = TTaskOutputType::Broadcast;
+
for (size_t i = 0; i < targetTasks.size(); ++i) {
auto targetTaskId = targetTasks[i];
@@ -207,14 +211,10 @@ void BuildBroadcastChannels(TGraph& graph, const typename TGraph::TStageInfoType
channel.DstInputIndex = inputIndex;
channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1;
- auto& originTask = graph.GetTask(originTaskId);
auto& targetTask = graph.GetTask(targetTaskId);
auto& taskInput = targetTask.Inputs[inputIndex];
taskInput.Channels.push_back(channel.Id);
-
- auto& taskOutput = originTask.Outputs[outputIndex];
- taskOutput.Type = TTaskOutputType::Broadcast;
taskOutput.Channels.push_back(channel.Id);
logFunc(channel.Id, originTaskId, targetTaskId, "Broadcast/Broadcast", !channel.InMemory);