aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-08-29 16:31:07 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-08-29 21:27:18 +0300
commitea6d868f6e215711597d9ca9d3e6b29a982c0e86 (patch)
treed649038923a553bd494caa3f7b880e88f93d7ba3
parent0fd0f13a0c94444dc4c94a52621620eecae8faf1 (diff)
downloadydb-ea6d868f6e215711597d9ca9d3e6b29a982c0e86.tar.gz
Implement EquiJoin for pg_syntax KIKIMR-18645
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp63
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp51
2 files changed, 82 insertions, 32 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index 38de2ebb3e9..1445fb74ac2 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -17,7 +17,7 @@ using namespace NYql::NNodes;
namespace {
bool GetEquiJoinKeyTypes(TExprBase leftInput, const TString& leftColumnName, const TKikimrTableDescription& rightTable,
- const TString& rightColumnName, const TDataExprType*& leftData, const TDataExprType*& rightData)
+ const TString& rightColumnName, const TTypeAnnotationNode*& leftData, const TTypeAnnotationNode*& rightData)
{
auto rightType = rightTable.GetColumnType(rightColumnName);
YQL_ENSURE(rightType);
@@ -25,12 +25,6 @@ bool GetEquiJoinKeyTypes(TExprBase leftInput, const TString& leftColumnName, con
rightType = rightType->Cast<TOptionalExprType>()->GetItemType();
}
- if (rightType->GetKind() != ETypeAnnotationKind::Data) {
- Y_ENSURE(rightType->GetKind() == ETypeAnnotationKind::Pg);
- return false;
- }
- rightData = rightType->Cast<TDataExprType>();
-
auto leftInputType = leftInput.Ref().GetTypeAnn();
YQL_ENSURE(leftInputType);
YQL_ENSURE(leftInputType->GetKind() == ETypeAnnotationKind::List);
@@ -45,10 +39,15 @@ bool GetEquiJoinKeyTypes(TExprBase leftInput, const TString& leftColumnName, con
leftType = leftType->Cast<TOptionalExprType>()->GetItemType();
}
- if (leftType->GetKind() != ETypeAnnotationKind::Data) {
- return false;
+ if (rightType->GetKind() != ETypeAnnotationKind::Data || leftType->GetKind() != ETypeAnnotationKind::Data) {
+ Y_ENSURE(rightType->GetKind() == ETypeAnnotationKind::Pg);
+ Y_ENSURE(leftType->GetKind() == ETypeAnnotationKind::Pg);
+ rightData = rightType->Cast<TPgExprType>();
+ leftData = leftType->Cast<TPgExprType>();
+ return true;
}
+ rightData = rightType->Cast<TDataExprType>();
leftData = leftType->Cast<TDataExprType>();
return true;
}
@@ -502,27 +501,41 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Name().Build(*leftColumn)
.Done().Ptr();
- const TDataExprType* leftDataType;
- const TDataExprType* rightDataType;
- if (!GetEquiJoinKeyTypes(join.LeftInput(), *leftColumn, rightTableDesc, rightColumnName, leftDataType, rightDataType)) {
+ const TTypeAnnotationNode* leftType;
+ const TTypeAnnotationNode* rightType;
+ if (!GetEquiJoinKeyTypes(join.LeftInput(), *leftColumn, rightTableDesc, rightColumnName, leftType, rightType)) {
return {};
}
- if (leftDataType != rightDataType) {
- bool canCast = IsDataTypeNumeric(leftDataType->GetSlot()) && IsDataTypeNumeric(rightDataType->GetSlot());
- if (!canCast) {
- canCast = leftDataType->GetName() == "Utf8" && rightDataType->GetName() == "String";
- }
- if (canCast) {
- DBG("------ cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
- member = Build<TCoConvert>(ctx, join.Pos())
- .Input(member)
- .Type().Build(rightDataType->GetName())
- .Done().Ptr();
- } else {
- DBG("------ can not cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
+ if (leftType->GetKind() == ETypeAnnotationKind::Pg) {
+ Y_ENSURE(rightType->GetKind() == ETypeAnnotationKind::Pg);
+ auto* leftPgType = static_cast<const TPgExprType*>(leftType);
+ auto* rightPgType = static_cast<const TPgExprType*>(rightType);
+ if (leftPgType != rightPgType) {
+ // TODO: Emit PgCast
return {};
}
+ } else {
+ Y_ENSURE(leftType->GetKind() == ETypeAnnotationKind::Data);
+ Y_ENSURE(rightType->GetKind() == ETypeAnnotationKind::Data);
+ auto* leftDataType = static_cast<const TDataExprType*>(leftType);
+ auto* rightDataType = static_cast<const TDataExprType*>(rightType);
+ if (leftDataType != rightDataType) {
+ bool canCast = IsDataTypeNumeric(leftDataType->GetSlot()) && IsDataTypeNumeric(rightDataType->GetSlot());
+ if (!canCast) {
+ canCast = leftDataType->GetName() == "Utf8" && rightDataType->GetName() == "String";
+ }
+ if (canCast) {
+ DBG("------ cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
+ member = Build<TCoConvert>(ctx, join.Pos())
+ .Input(member)
+ .Type().Build(rightDataType->GetName())
+ .Done().Ptr();
+ } else {
+ DBG("------ can not cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
+ return {};
+ }
+ }
}
}
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 9d077088f23..bf1d4ad2678 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -1839,7 +1839,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
- {
+ {
auto result = db.ExecuteQuery(R"(
SELECT * FROM test;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
@@ -1891,15 +1891,15 @@ Y_UNIT_TEST_SUITE(KqpPg) {
KeyColumnNames: ["key"],
SplitBoundary { KeyPrefix { Tuple { Optional { Text: "100" } } } }
)");
- auto db = kikimr.GetTableClient();
+ auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto describeResult = session.DescribeTable(
"/Root/PgTwoShard",
TDescribeTableSettings().WithTableStatistics(true).WithKeyShardBoundary(true)
).GetValueSync();
- UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(describeResult.GetTableDescription().GetPartitionsCount(), 2);
- }
+ }
{
auto result = db.ExecuteQuery(R"(
INSERT INTO PgTwoShard (key, value) VALUES (10, 10), (110, 110);
@@ -1912,7 +1912,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
- {
+ {
auto result = db.ExecuteQuery(R"(
SELECT * FROM PgTwoShard ORDER BY key;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
@@ -1922,7 +1922,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
)", FormatResultSetYson(result.GetResultSet(0)));
}
}
-
+
Y_UNIT_TEST(DropTableIfExists) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
{
@@ -1962,7 +1962,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
DROP TABLE IF EXISTS test;
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
- }
+ }
{
auto db = kikimr.GetQueryClient();
auto settings = NYdb::NQuery::TExecuteQuerySettings()
@@ -2018,6 +2018,43 @@ Y_UNIT_TEST_SUITE(KqpPg) {
CompareYson(R"([["1";"val1";"1";"val2"]])", FormatResultSetYson(result.GetResultSet(0)));
}
}
+
+ Y_UNIT_TEST(EquiJoin) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ const auto query = Q_(R"_(
+ --!syntax_pg
+ CREATE TABLE left_table(id int4, val text, primary key(id));
+
+ CREATE TABLE right_table(id int4, val2 text, primary key(id));
+ )_");
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO left_table (id, val) VALUES (1, 'a'), (2, 'b'), (3, 'c');
+ INSERT INTO right_table (id, val2) VALUES (1, 'd'), (2, 'e'), (3, 'f');
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT left_table.*, right_table.val2 FROM left_table, right_table WHERE left_table.id=right_table.id
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
+ CompareYson(R"(
+ [["1";"a";"d"];["2";"b";"e"];["3";"c";"f"]]
+ )", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
}
} // namespace NKqp