diff options
author | qrort <qrort@yandex-team.com> | 2023-08-28 18:55:31 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2023-08-28 20:08:31 +0300 |
commit | 10771af7a3c8c46305042f827c94eab4a2ba9a70 (patch) | |
tree | 146045b0ad225c4a90383f88658c3ad9a2f30e6b | |
parent | 525cbbe410dcf45c23c93533299857deae7e66ab (diff) | |
download | ydb-10771af7a3c8c46305042f827c94eab4a2ba9a70.tar.gz |
KIKIMR-18918: skip present ShardId for iterator read tasks
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 44 |
3 files changed, 51 insertions, 2 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp index 3439e10bf9..861b9caf5b 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp @@ -35,7 +35,11 @@ private: if (channel.DstTask) { auto& dstTask = TasksGraph.GetTask(channel.DstTask); - if (IsDataExec() && dstTask.Meta.ShardId) { + + auto& stageInfo = TasksGraph.GetStageInfo(dstTask.StageId); + auto& dstStage = stageInfo.Meta.GetStage(stageInfo.Id); + + if (IsDataExec() && dstTask.Meta.ShardId && dstStage.SourcesSize() == 0) { YQL_ENSURE(srcTask.Meta.ShardId, "Invalid channel from non-shard task to shard task" << ", channelId: " << channelId << ", srcTaskId: " << channel.SrcTask diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 87384c54e2..1f3a531fc9 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -379,8 +379,9 @@ protected: TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node); TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, - /*pushLeftStage =*/ !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node), KqpCtx.Config->GetHashJoinMode() + pushLeftStage, KqpCtx.Config->GetHashJoinMode() ); DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx); return output; diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 468b4fe3e5..9d077088f2 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1974,6 +1974,50 @@ Y_UNIT_TEST_SUITE(KqpPg) { UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot find table 'db.[/Root/test]'")); } } + + Y_UNIT_TEST(JoinWithQueryService) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + auto client = kikimr.GetTableClient(); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto session = client.CreateSession().GetValueSync().GetSession(); + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE t1( + id1 int4 PRIMARY KEY, + val1 text + ))"); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto session = client.CreateSession().GetValueSync().GetSession(); + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE t2( + id2 int4 PRIMARY KEY, + val2 text + ))"); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO t1(id1, val1) VALUES (1, 'val1'); + INSERT INTO t2(id2, val2) VALUES (1, 'val2'); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM t1 JOIN t2 ON t1.id1 = t2.id2; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([["1";"val1";"1";"val2"]])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKqp |