aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2024-07-18 18:24:54 +0200
committerGitHub <noreply@github.com>2024-07-18 18:24:54 +0200
commit6bb1a844163ee900ee36821a9ae355ea16f4a306 (patch)
treed81351e5e3ce756e126f9f742f8a5d279ad32cec
parenta22a03fda39dcde10f66b73acb1c93016c0ccc57 (diff)
downloadydb-6bb1a844163ee900ee36821a9ae355ea16f4a306.tar.gz
fix(kqp): return one result row per key from stream join actor for left semi strategy (#6642)
-rw-r--r--ydb/core/kqp/common/kqp_yql.h1
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp8
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp6
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp5
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp9
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h1
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp64
-rw-r--r--ydb/core/protos/kqp_physical.proto1
8 files changed, 89 insertions, 6 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index f1a52fc7a8..dfd4bdbc49 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -48,6 +48,7 @@ constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";
static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
+static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;
struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index ab1c98f876..e9d17c031e 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -502,7 +502,9 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
return TStatus::Error;
}
- if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName) {
+ if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName
+ || lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) {
+
if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
}
@@ -1682,7 +1684,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
- } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName) {
+ } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName
+ || lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) {
+
if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index a723be2aaa..5c46e5f816 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -395,11 +395,15 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
}
}
+ auto strategy = join.JoinType().Value() == "LeftSemi"
+ ? TKqpStreamLookupSemiJoinStrategyName
+ : TKqpStreamLookupJoinStrategyName;
+
TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
- .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
+ .LookupStrategy().Build(strategy)
.Done();
// Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 15f65fa957..b3d2cd4930 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -91,6 +91,8 @@ NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view
lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP;
} else if (strategy == "LookupJoinRows"sv) {
lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN;
+ } else if (strategy == "LookupSemiJoinRows"sv) {
+ lookupStrategy = NKqpProto::EStreamLookupStrategy::SEMI_JOIN;
}
YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED,
@@ -1275,7 +1277,8 @@ private:
break;
}
- case NKqpProto::EStreamLookupStrategy::JOIN: {
+ case NKqpProto::EStreamLookupStrategy::JOIN:
+ case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: {
YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple);
const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
YQL_ENSURE(inputTupleType->GetSize() == 2);
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index 4cfd3ea28c..af995daf05 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -142,7 +142,8 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
, HolderFactory(holderFactory)
, InputDesc(inputDesc)
, TablePath(settings.GetTable().GetPath())
- , TableId(MakeTableId(settings.GetTable())) {
+ , TableId(MakeTableId(settings.GetTable()))
+ , Strategy(settings.GetLookupStrategy()) {
KeyColumns.reserve(settings.GetKeyColumns().size());
i32 keyOrder = 0;
@@ -748,6 +749,11 @@ public:
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
+ if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
+ // Semi join should return one result row per key
+ continue;
+ }
+
TReadResultStats rowStats;
i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
@@ -962,6 +968,7 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq
case NKqpProto::EStreamLookupStrategy::LOOKUP:
return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
case NKqpProto::EStreamLookupStrategy::JOIN:
+ case NKqpProto::EStreamLookupStrategy::SEMI_JOIN:
return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
default:
return {};
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index 46b15745b3..6b9e35a107 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -71,6 +71,7 @@ protected:
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
std::vector<TSysTables::TTableColumnInfo> Columns;
+ const NKqpProto::EStreamLookupStrategy Strategy;
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 55ff9b15d3..4a2e667fe7 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -1023,7 +1023,7 @@ Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) {
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(appConfig);;
serverSettings.SetKqpSettings(settings);
- TKikimrRunner kikimr(settings);
+ TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1182,6 +1182,68 @@ Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) {
}
}
+Y_UNIT_TEST_TWIN(LeftSemiJoinWithDuplicatesInRightTable, StreamLookupJoin) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
+ auto settings = TKikimrSettings().SetAppConfig(appConfig);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ { // create tables
+ const TString query = R"(
+ CREATE TABLE `/Root/Left` (
+ Key1 Int64,
+ Key2 Int64,
+ Value String,
+ PRIMARY KEY (Key1, Key2)
+ );
+
+ CREATE TABLE `/Root/Right` (
+ Key1 Int64,
+ Key2 Int64,
+ Value String,
+ PRIMARY KEY (Key1, Key2)
+ );
+ )";
+ UNIT_ASSERT(session.ExecuteSchemeQuery(query).GetValueSync().IsSuccess());
+ }
+
+ { // fill tables
+ const TString query = R"(
+ REPLACE INTO `/Root/Left` (Key1, Key2, Value) VALUES
+ (1, 10, "value1"),
+ (2, 20, "value2"),
+ (3, 30, "value3");
+
+ REPLACE INTO `/Root/Right` (Key1, Key2, Value) VALUES
+ (10, 100, "value1"),
+ (10, 101, "value1"),
+ (10, 102, "value1"),
+ (20, 200, "value2"),
+ (20, 201, "value2"),
+ (30, 300, "value3");
+ )";
+ UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess());
+ }
+
+ {
+ const TString query = R"(
+ SELECT l.Key1, l.Key2, l.Value
+ FROM `/Root/Left` AS l
+ LEFT SEMI JOIN `/Root/Right` AS r
+ ON l.Key2 = r.Key1 ORDER BY l.Key1, l.Key2, l.Value
+ )";
+
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ CompareYson(R"([
+ [[1];[10];["value1"]];
+ [[2];[20];["value2"]];
+ [[3];[30];["value3"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+}
+
} // suite
} // namespace NKqp
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 0ec497cca0..9683814708 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -263,6 +263,7 @@ enum EStreamLookupStrategy {
UNSPECIFIED = 0;
LOOKUP = 1;
JOIN = 2;
+ SEMI_JOIN = 3;
};
message TKqpPhyCnStreamLookup {