aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2022-07-18 12:51:44 +0300
committerulya-sidorina <yulia@ydb.tech>2022-07-18 12:51:44 +0300
commit543fd67e51dddae7293ce0a2af54a3a86b063c52 (patch)
tree7adb72ee07af1367edbce34e3b0378ab8d0c99a7
parent14c177383c8fba3709cfc2f6114d1788184f1159 (diff)
downloadydb-543fd67e51dddae7293ce0a2af54a3a86b063c52.tar.gz
add stream lookup node to query plan
feature(kqp_query_plan): add stream lookup to query plan
-rw-r--r--ydb/core/kqp/prepare/kqp_query_plan.cpp87
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp1
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h1
-rw-r--r--ydb/core/kqp/ut/kqp_explain_ut.cpp20
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp20
-rw-r--r--ydb/core/kqp/ut/kqp_stats_ut.cpp12
6 files changed, 88 insertions, 53 deletions
diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp
index 9d4c6a515d9..a363e5fe77d 100644
--- a/ydb/core/kqp/prepare/kqp_query_plan.cpp
+++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp
@@ -641,6 +641,67 @@ private:
return str;
}
+ void FillConnectionPlanNode(const TDqConnection& connection, TQueryPlanNode& planNode) {
+ planNode.Type = EPlanNodeType::Connection;
+
+ if (connection.Maybe<TDqCnUnionAll>()) {
+ planNode.TypeName = "UnionAll";
+ } else if (connection.Maybe<TDqCnBroadcast>()) {
+ planNode.TypeName = "Broadcast";
+ } else if (connection.Maybe<TDqCnMap>()) {
+ planNode.TypeName = "Map";
+ } else if (auto hashShuffle = connection.Maybe<TDqCnHashShuffle>()) {
+ planNode.TypeName = "HashShuffle";
+ auto& keyColumns = planNode.NodeInfo["KeyColumns"];
+ for (const auto& column : hashShuffle.Cast().KeyColumns()) {
+ keyColumns.AppendValue(TString(column.Value()));
+ }
+ } else if (auto merge = connection.Maybe<TDqCnMerge>()) {
+ planNode.TypeName = "Merge";
+ auto& sortColumns = planNode.NodeInfo["SortColumns"];
+ for (const auto& sortColumn : merge.Cast().SortColumns()) {
+ TStringBuilder sortColumnDesc;
+ sortColumnDesc << sortColumn.Column().Value() << " ("
+ << sortColumn.SortDirection().Value() << ")";
+
+ sortColumns.AppendValue(sortColumnDesc);
+ }
+ } else if (auto maybeTableLookup = connection.Maybe<TKqpCnStreamLookup>()) {
+ auto tableLookup = maybeTableLookup.Cast();
+
+ TTableRead readInfo;
+ readInfo.Type = ETableReadType::Lookup;
+ planNode.TypeName = "TableLookup";
+ TString table(tableLookup.Table().Path().Value());
+ auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table);
+ planNode.NodeInfo["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
+
+ readInfo.Columns.reserve(tableLookup.Columns().Size());
+ auto& columns = planNode.NodeInfo["Columns"];
+ for (const auto& column : tableLookup.Columns()) {
+ columns.AppendValue(column.Value());
+ readInfo.Columns.push_back(TString(column.Value()));
+ }
+
+ const auto lookupKeysType = tableLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ YQL_ENSURE(lookupKeysType);
+ YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List);
+ const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
+ YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& lookupKeyColumnsStruct = lookupKeysItemType->Cast<TStructExprType>()->GetItems();
+ readInfo.LookupBy.reserve(lookupKeyColumnsStruct.size());
+ auto& lookupKeyColumns = planNode.NodeInfo["LookupKeyColumns"];
+ for (const auto keyColumn : lookupKeyColumnsStruct) {
+ lookupKeyColumns.AppendValue(keyColumn->GetName());
+ readInfo.LookupBy.push_back(TString(keyColumn->GetName()));
+ }
+
+ SerializerCtx.Tables[table].Reads.push_back(std::move(readInfo));
+ } else {
+ planNode.TypeName = connection.Ref().Content();
+ }
+ }
+
void Visit(const TExprBase& expr, TQueryPlanNode& planNode) {
if (expr.Maybe<TDqPhyStage>()) {
auto stageGuid = NDq::TDqStageSettings::Parse(expr.Cast<TDqPhyStage>()).Id;
@@ -684,31 +745,7 @@ private:
auto inputCn = input.Cast<TDqConnection>();
auto& inputPlanNode = AddPlanNode(stagePlanNode);
- inputPlanNode.Type = EPlanNodeType::Connection;
-
- if (inputCn.Maybe<TDqCnUnionAll>()) {
- inputPlanNode.TypeName = "UnionAll";
- } else if (inputCn.Maybe<TDqCnBroadcast>()) {
- inputPlanNode.TypeName = "Broadcast";
- } else if (auto hashShuffle = inputCn.Maybe<TDqCnHashShuffle>()) {
- inputPlanNode.TypeName = "HashShuffle";
- auto& keyColumns = inputPlanNode.NodeInfo["KeyColumns"];
- for (const auto& column : hashShuffle.Cast().KeyColumns()) {
- keyColumns.AppendValue(TString(column.Value()));
- }
- } else if (auto merge = inputCn.Maybe<TDqCnMerge>()) {
- inputPlanNode.TypeName = "Merge";
- auto& sortColumns = inputPlanNode.NodeInfo["SortColumns"];
- for (const auto& sortColumn : merge.Cast().SortColumns()) {
- TStringBuilder sortColumnDesc;
- sortColumnDesc << sortColumn.Column().Value() << " ("
- << sortColumn.SortDirection().Value() << ")";
-
- sortColumns.AppendValue(sortColumnDesc);
- }
- } else {
- inputPlanNode.TypeName = inputCn.Ref().Content();
- }
+ FillConnectionPlanNode(inputCn, inputPlanNode);
Visit(inputCn.Output().Stage(), inputPlanNode);
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 4402dca0371..b39fb6bba47 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -111,7 +111,6 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
ServerSettings->SetKeepSnapshotTimeout(settings.KeepSnapshotTimeout);
ServerSettings->SetFrFactory(&UdfFrFactory);
ServerSettings->SetEnableNotNullColumns(true);
- ServerSettings->SetEnableKqpScanQueryStreamLookup(false);
ServerSettings->SetEnableMoveIndex(true);
if (settings.LogStream)
ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream));
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 2f13a31353a..b1b8a33996c 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -87,6 +87,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
{
// default value for tests, can be overwritten by SetFeatureFlags()
this->SetEnableKqpSessionActor(false);
+ this->SetEnableKqpScanQueryStreamLookup(true);
}
TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; }
diff --git a/ydb/core/kqp/ut/kqp_explain_ut.cpp b/ydb/core/kqp/ut/kqp_explain_ut.cpp
index cd926b6aeb8..949e96167e4 100644
--- a/ydb/core/kqp/ut/kqp_explain_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_explain_ut.cpp
@@ -65,11 +65,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
NJson::TJsonValue plan;
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
- auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
+ auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
- auto right = FindPlanNodeByKv(join, "Table", "KeyValue");
+ auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup");
+ UNIT_ASSERT(lookup.IsDefined());
+ auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue");
UNIT_ASSERT(right.IsDefined());
}
@@ -91,11 +93,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
NJson::TJsonValue plan;
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
- auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
+ auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
- auto right = FindPlanNodeByKv(join, "Table", "KeyValue");
+ auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup");
+ UNIT_ASSERT(lookup.IsDefined());
+ auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue");
UNIT_ASSERT(right.IsDefined());
}
@@ -179,12 +183,14 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
auto join = FindPlanNodeByKv(
plan,
"Node Type",
- "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"
+ "Aggregate-InnerJoin (MapJoin)-Filter"
);
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
- auto right = FindPlanNodeByKv(join, "Table", "FourShard");
+ auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup");
+ UNIT_ASSERT(lookup.IsDefined());
+ auto right = FindPlanNodeByKv(lookup, "Table", "FourShard");
UNIT_ASSERT(right.IsDefined());
}
@@ -319,7 +325,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
NJson::TJsonValue plan;
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
- auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter-Aggregate");
+ auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter");
UNIT_ASSERT(join1.IsDefined());
auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
UNIT_ASSERT(join2.IsDefined());
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index 6c326a5dee4..62c9e64cd0d 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1234,7 +1234,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto part = it.ReadNext().GetValueSync();
UNIT_ASSERT_EQUAL_C(part.GetStatus(), EStatus::PRECONDITION_FAILED, part.GetStatus());
- UNIT_ASSERT_STRINGS_EQUAL(part.GetIssues().back().GetSubIssues().back()->Message, "Requested too many execution units: 12");
+ UNIT_ASSERT_STRINGS_EQUAL(part.GetIssues().back().GetSubIssues().back()->Message, "Requested too many execution units: 32");
part = it.ReadNext().GetValueSync();
UNIT_ASSERT(part.EOS());
@@ -1680,11 +1680,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST_TWIN(SecondaryIndex, UseSessionActor) {
- auto settings = TKikimrSettings()
- .SetEnableKqpSessionActor(UseSessionActor)
- .SetEnableKqpScanQueryStreamLookup(true);
-
- TKikimrRunner kikimr(settings);
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1966,11 +1962,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST_TWIN(StreamLookup, UseSessionActor) {
- auto settings = TKikimrSettings()
- .SetEnableKqpSessionActor(UseSessionActor)
- .SetEnableKqpScanQueryStreamLookup(true);
-
- TKikimrRunner kikimr(settings);
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
auto db = kikimr.GetTableClient();
CreateSampleTables(kikimr);
@@ -1999,11 +1991,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST_TWIN(StreamLookupByPkPrefix, UseSessionActor) {
- auto settings = TKikimrSettings()
- .SetEnableKqpSessionActor(UseSessionActor)
- .SetEnableKqpScanQueryStreamLookup(true);
-
- TKikimrRunner kikimr(settings);
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
auto db = kikimr.GetTableClient();
CreateSampleTables(kikimr);
diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp
index 6202b0ba114..c330a42bf3a 100644
--- a/ydb/core/kqp/ut/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp
@@ -58,14 +58,18 @@ Y_UNIT_TEST_TWIN(JoinNoStats, UseSessionActor) {
}
Y_UNIT_TEST_TWIN(JoinStatsBasic, UseSessionActor) {
- auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
+ auto settings = TKikimrSettings()
+ .SetEnableKqpSessionActor(UseSessionActor)
+ .SetEnableKqpScanQueryStreamLookup(false); // TODO: enable stream lookup KIKIMR-14294
+
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
- TStreamExecScanQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ TStreamExecScanQuerySettings querySettings;
+ querySettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
auto it = db.StreamExecuteScanQuery(R"(
SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
- )", settings).GetValueSync();
+ )", querySettings).GetValueSync();
auto res = CollectStreamResult(it);
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());