aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2023-08-28 18:55:31 +0300
committerqrort <qrort@yandex-team.com>2023-08-28 20:08:31 +0300
commit10771af7a3c8c46305042f827c94eab4a2ba9a70 (patch)
tree146045b0ad225c4a90383f88658c3ad9a2f30e6b
parent525cbbe410dcf45c23c93533299857deae7e66ab (diff)
downloadydb-10771af7a3c8c46305042f827c94eab4a2ba9a70.tar.gz
KIKIMR-18918: skip present ShardId for iterator read tasks
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp6
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp3
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp44
3 files changed, 51 insertions, 2 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp
index 3439e10bf9..861b9caf5b 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_validate.cpp
@@ -35,7 +35,11 @@ private:
if (channel.DstTask) {
auto& dstTask = TasksGraph.GetTask(channel.DstTask);
- if (IsDataExec() && dstTask.Meta.ShardId) {
+
+ auto& stageInfo = TasksGraph.GetStageInfo(dstTask.StageId);
+ auto& dstStage = stageInfo.Meta.GetStage(stageInfo.Id);
+
+ if (IsDataExec() && dstTask.Meta.ShardId && dstStage.SourcesSize() == 0) {
YQL_ENSURE(srcTask.Meta.ShardId, "Invalid channel from non-shard task to shard task"
<< ", channelId: " << channelId
<< ", srcTaskId: " << channel.SrcTask
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 87384c54e2..1f3a531fc9 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -379,8 +379,9 @@ protected:
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
+ bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
- /*pushLeftStage =*/ !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node), KqpCtx.Config->GetHashJoinMode()
+ pushLeftStage, KqpCtx.Config->GetHashJoinMode()
);
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
return output;
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 468b4fe3e5..9d077088f2 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -1974,6 +1974,50 @@ Y_UNIT_TEST_SUITE(KqpPg) {
UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot find table 'db.[/Root/test]'"));
}
}
+
+ Y_UNIT_TEST(JoinWithQueryService) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ auto client = kikimr.GetTableClient();
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings()
+ .Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ const auto query = Q_(R"(
+ --!syntax_pg
+ CREATE TABLE t1(
+ id1 int4 PRIMARY KEY,
+ val1 text
+ ))");
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ {
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ const auto query = Q_(R"(
+ --!syntax_pg
+ CREATE TABLE t2(
+ id2 int4 PRIMARY KEY,
+ val2 text
+ ))");
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO t1(id1, val1) VALUES (1, 'val1');
+ INSERT INTO t2(id2, val2) VALUES (1, 'val2');
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM t1 JOIN t2 ON t1.id1 = t2.id2;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([["1";"val1";"1";"val2"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
}
} // namespace NKqp