diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-07-18 12:51:44 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-07-18 12:51:44 +0300 |
commit | 543fd67e51dddae7293ce0a2af54a3a86b063c52 (patch) | |
tree | 7adb72ee07af1367edbce34e3b0378ab8d0c99a7 | |
parent | 14c177383c8fba3709cfc2f6114d1788184f1159 (diff) | |
download | ydb-543fd67e51dddae7293ce0a2af54a3a86b063c52.tar.gz |
add stream lookup node to query plan
feature(kqp_query_plan): add stream lookup to query plan
-rw-r--r-- | ydb/core/kqp/prepare/kqp_query_plan.cpp | 87 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_explain_ut.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_stats_ut.cpp | 12 |
6 files changed, 88 insertions, 53 deletions
diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp index 9d4c6a515d9..a363e5fe77d 100644 --- a/ydb/core/kqp/prepare/kqp_query_plan.cpp +++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp @@ -641,6 +641,67 @@ private: return str; } + void FillConnectionPlanNode(const TDqConnection& connection, TQueryPlanNode& planNode) { + planNode.Type = EPlanNodeType::Connection; + + if (connection.Maybe<TDqCnUnionAll>()) { + planNode.TypeName = "UnionAll"; + } else if (connection.Maybe<TDqCnBroadcast>()) { + planNode.TypeName = "Broadcast"; + } else if (connection.Maybe<TDqCnMap>()) { + planNode.TypeName = "Map"; + } else if (auto hashShuffle = connection.Maybe<TDqCnHashShuffle>()) { + planNode.TypeName = "HashShuffle"; + auto& keyColumns = planNode.NodeInfo["KeyColumns"]; + for (const auto& column : hashShuffle.Cast().KeyColumns()) { + keyColumns.AppendValue(TString(column.Value())); + } + } else if (auto merge = connection.Maybe<TDqCnMerge>()) { + planNode.TypeName = "Merge"; + auto& sortColumns = planNode.NodeInfo["SortColumns"]; + for (const auto& sortColumn : merge.Cast().SortColumns()) { + TStringBuilder sortColumnDesc; + sortColumnDesc << sortColumn.Column().Value() << " (" + << sortColumn.SortDirection().Value() << ")"; + + sortColumns.AppendValue(sortColumnDesc); + } + } else if (auto maybeTableLookup = connection.Maybe<TKqpCnStreamLookup>()) { + auto tableLookup = maybeTableLookup.Cast(); + + TTableRead readInfo; + readInfo.Type = ETableReadType::Lookup; + planNode.TypeName = "TableLookup"; + TString table(tableLookup.Table().Path().Value()); + auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table); + planNode.NodeInfo["Table"] = tableData.RelativePath ? *tableData.RelativePath : table; + + readInfo.Columns.reserve(tableLookup.Columns().Size()); + auto& columns = planNode.NodeInfo["Columns"]; + for (const auto& column : tableLookup.Columns()) { + columns.AppendValue(column.Value()); + readInfo.Columns.push_back(TString(column.Value())); + } + + const auto lookupKeysType = tableLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + YQL_ENSURE(lookupKeysType); + YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List); + const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType(); + YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct); + const auto& lookupKeyColumnsStruct = lookupKeysItemType->Cast<TStructExprType>()->GetItems(); + readInfo.LookupBy.reserve(lookupKeyColumnsStruct.size()); + auto& lookupKeyColumns = planNode.NodeInfo["LookupKeyColumns"]; + for (const auto keyColumn : lookupKeyColumnsStruct) { + lookupKeyColumns.AppendValue(keyColumn->GetName()); + readInfo.LookupBy.push_back(TString(keyColumn->GetName())); + } + + SerializerCtx.Tables[table].Reads.push_back(std::move(readInfo)); + } else { + planNode.TypeName = connection.Ref().Content(); + } + } + void Visit(const TExprBase& expr, TQueryPlanNode& planNode) { if (expr.Maybe<TDqPhyStage>()) { auto stageGuid = NDq::TDqStageSettings::Parse(expr.Cast<TDqPhyStage>()).Id; @@ -684,31 +745,7 @@ private: auto inputCn = input.Cast<TDqConnection>(); auto& inputPlanNode = AddPlanNode(stagePlanNode); - inputPlanNode.Type = EPlanNodeType::Connection; - - if (inputCn.Maybe<TDqCnUnionAll>()) { - inputPlanNode.TypeName = "UnionAll"; - } else if (inputCn.Maybe<TDqCnBroadcast>()) { - inputPlanNode.TypeName = "Broadcast"; - } else if (auto hashShuffle = inputCn.Maybe<TDqCnHashShuffle>()) { - inputPlanNode.TypeName = "HashShuffle"; - auto& keyColumns = inputPlanNode.NodeInfo["KeyColumns"]; - for (const auto& column : hashShuffle.Cast().KeyColumns()) { - keyColumns.AppendValue(TString(column.Value())); - } - } else if (auto merge = inputCn.Maybe<TDqCnMerge>()) { - inputPlanNode.TypeName = "Merge"; - auto& sortColumns = inputPlanNode.NodeInfo["SortColumns"]; - for (const auto& sortColumn : merge.Cast().SortColumns()) { - TStringBuilder sortColumnDesc; - sortColumnDesc << sortColumn.Column().Value() << " (" - << sortColumn.SortDirection().Value() << ")"; - - sortColumns.AppendValue(sortColumnDesc); - } - } else { - inputPlanNode.TypeName = inputCn.Ref().Content(); - } + FillConnectionPlanNode(inputCn, inputPlanNode); Visit(inputCn.Output().Stage(), inputPlanNode); } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 4402dca0371..b39fb6bba47 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -111,7 +111,6 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) { ServerSettings->SetKeepSnapshotTimeout(settings.KeepSnapshotTimeout); ServerSettings->SetFrFactory(&UdfFrFactory); ServerSettings->SetEnableNotNullColumns(true); - ServerSettings->SetEnableKqpScanQueryStreamLookup(false); ServerSettings->SetEnableMoveIndex(true); if (settings.LogStream) ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream)); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 2f13a31353a..b1b8a33996c 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -87,6 +87,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> { { // default value for tests, can be overwritten by SetFeatureFlags() this->SetEnableKqpSessionActor(false); + this->SetEnableKqpScanQueryStreamLookup(true); } TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; } diff --git a/ydb/core/kqp/ut/kqp_explain_ut.cpp b/ydb/core/kqp/ut/kqp_explain_ut.cpp index cd926b6aeb8..949e96167e4 100644 --- a/ydb/core/kqp/ut/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/kqp_explain_ut.cpp @@ -65,11 +65,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) { NJson::TJsonValue plan; NJson::ReadJsonTree(*res.PlanJson, &plan, true); - auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"); + auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter"); UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); - auto right = FindPlanNodeByKv(join, "Table", "KeyValue"); + auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup"); + UNIT_ASSERT(lookup.IsDefined()); + auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue"); UNIT_ASSERT(right.IsDefined()); } @@ -91,11 +93,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) { NJson::TJsonValue plan; NJson::ReadJsonTree(*res.PlanJson, &plan, true); - auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"); + auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter"); UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); - auto right = FindPlanNodeByKv(join, "Table", "KeyValue"); + auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup"); + UNIT_ASSERT(lookup.IsDefined()); + auto right = FindPlanNodeByKv(lookup, "Table", "KeyValue"); UNIT_ASSERT(right.IsDefined()); } @@ -179,12 +183,14 @@ Y_UNIT_TEST_SUITE(KqpExplain) { auto join = FindPlanNodeByKv( plan, "Node Type", - "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan" + "Aggregate-InnerJoin (MapJoin)-Filter" ); UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); - auto right = FindPlanNodeByKv(join, "Table", "FourShard"); + auto lookup = FindPlanNodeByKv(join, "Node Type", "TableLookup"); + UNIT_ASSERT(lookup.IsDefined()); + auto right = FindPlanNodeByKv(lookup, "Table", "FourShard"); UNIT_ASSERT(right.IsDefined()); } @@ -319,7 +325,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) { NJson::TJsonValue plan; NJson::ReadJsonTree(*res.PlanJson, &plan, true); - auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter-Aggregate"); + auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter"); UNIT_ASSERT(join1.IsDefined()); auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter"); UNIT_ASSERT(join2.IsDefined()); diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index 6c326a5dee4..62c9e64cd0d 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1234,7 +1234,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto part = it.ReadNext().GetValueSync(); UNIT_ASSERT_EQUAL_C(part.GetStatus(), EStatus::PRECONDITION_FAILED, part.GetStatus()); - UNIT_ASSERT_STRINGS_EQUAL(part.GetIssues().back().GetSubIssues().back()->Message, "Requested too many execution units: 12"); + UNIT_ASSERT_STRINGS_EQUAL(part.GetIssues().back().GetSubIssues().back()->Message, "Requested too many execution units: 32"); part = it.ReadNext().GetValueSync(); UNIT_ASSERT(part.EOS()); @@ -1680,11 +1680,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST_TWIN(SecondaryIndex, UseSessionActor) { - auto settings = TKikimrSettings() - .SetEnableKqpSessionActor(UseSessionActor) - .SetEnableKqpScanQueryStreamLookup(true); - - TKikimrRunner kikimr(settings); + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1966,11 +1962,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST_TWIN(StreamLookup, UseSessionActor) { - auto settings = TKikimrSettings() - .SetEnableKqpSessionActor(UseSessionActor) - .SetEnableKqpScanQueryStreamLookup(true); - - TKikimrRunner kikimr(settings); + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); auto db = kikimr.GetTableClient(); CreateSampleTables(kikimr); @@ -1999,11 +1991,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST_TWIN(StreamLookupByPkPrefix, UseSessionActor) { - auto settings = TKikimrSettings() - .SetEnableKqpSessionActor(UseSessionActor) - .SetEnableKqpScanQueryStreamLookup(true); - - TKikimrRunner kikimr(settings); + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); auto db = kikimr.GetTableClient(); CreateSampleTables(kikimr); diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp index 6202b0ba114..c330a42bf3a 100644 --- a/ydb/core/kqp/ut/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp @@ -58,14 +58,18 @@ Y_UNIT_TEST_TWIN(JoinNoStats, UseSessionActor) { } Y_UNIT_TEST_TWIN(JoinStatsBasic, UseSessionActor) { - auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); + auto settings = TKikimrSettings() + .SetEnableKqpSessionActor(UseSessionActor) + .SetEnableKqpScanQueryStreamLookup(false); // TODO: enable stream lookup KIKIMR-14294 + + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); - TStreamExecScanQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::Basic); + TStreamExecScanQuerySettings querySettings; + querySettings.CollectQueryStats(ECollectQueryStatsMode::Basic); auto it = db.StreamExecuteScanQuery(R"( SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key; - )", settings).GetValueSync(); + )", querySettings).GetValueSync(); auto res = CollectStreamResult(it); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); |