diff options
author | Igor Makunin <igor.makunin@gmail.com> | 2022-02-11 18:29:11 +0300 |
---|---|---|
committer | Igor Makunin <igor.makunin@gmail.com> | 2022-02-11 18:29:11 +0300 |
commit | 0d99fc6efe15562b8474a702ab18e406ac102cdf (patch) | |
tree | e90a8add5bf64405a8ca2bc9c2e4b2f5b8a009f5 | |
parent | 9239e3a134ea21e9ad69912968146743a467bcca (diff) | |
download | ydb-0d99fc6efe15562b8474a702ab18e406ac102cdf.tar.gz |
KIKIMR-14297: dont extract literals to the precompute stage for items limit
ref:0670aa18020017334f24a52604133d94cf0187a4
-rw-r--r-- | ydb/core/kqp/compile/kqp_compile.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer_impl.h | 52 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 29 | ||||
-rw-r--r-- | ydb/core/kqp/prepare/kqp_query_plan.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_table.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_newengine_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_query_ut.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_stats_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 6 | ||||
-rw-r--r-- | ydb/core/sys_view/ut_kqp.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_read_table.cpp | 6 |
14 files changed, 116 insertions, 70 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp index c7d305ab46..d778aec5be 100644 --- a/ydb/core/kqp/compile/kqp_compile.cpp +++ b/ydb/core/kqp/compile/kqp_compile.cpp @@ -139,8 +139,9 @@ void FillKeyRange(const TKqlKeyRange& range, NKqpProto::TKqpPhyKeyRange& rangePr FillKeyBound(range.To(), *rangeProto.MutableTo()); } -template <typename TReader, typename TProto> -void FillReadRange(const TReader& read, const TKikimrTableMetadata& tableMeta, TProto& readProto) { +void FillReadRange(const TKqpWideReadTable& read, const TKikimrTableMetadata& tableMeta, + NKqpProto::TKqpPhyOpReadRange& readProto) +{ FillKeyRange(read.Range(), *readProto.MutableKeyRange()); auto settings = TKqpReadTableSettings::Parse(read); @@ -154,8 +155,15 @@ void FillReadRange(const TReader& read, const TKikimrTableMetadata& tableMeta, T if (settings.ItemsLimit) { TExprBase expr(settings.ItemsLimit); - if (expr.template Maybe<TCoParameter>()) { - readProto.MutableItemsLimit()->SetParamName(TString(expr.template Cast<TCoParameter>().Name().Value())); + if (expr.Maybe<TCoUint64>()) { + auto* literal = readProto.MutableItemsLimit()->MutableLiteralValue(); + + literal->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); + literal->MutableType()->MutableData()->SetScheme(NScheme::NTypeIds::Uint64); + + literal->MutableValue()->SetUint64(FromString<ui64>(expr.Cast<TCoUint64>().Literal().Value())); + } else if (expr.Maybe<TCoParameter>()) { + readProto.MutableItemsLimit()->MutableParamValue()->SetParamName(expr.Cast<TCoParameter>().Name().StringValue()); } else { YQL_ENSURE(false, "Unexpected ItemsLimit callable " << expr.Ref().Content()); } @@ -165,15 +173,12 @@ void FillReadRange(const TReader& read, const TKikimrTableMetadata& tableMeta, T } template <typename TReader, typename TProto> -void FillReadRanges(const TReader& read, const TKikimrTableMetadata& tableMeta, TProto& readProto) -{ - Y_UNUSED(tableMeta); - +void FillReadRanges(const TReader& read, const TKikimrTableMetadata&, TProto& readProto) { auto ranges = read.Ranges().template Maybe<TCoParameter>(); if (ranges.IsValid()) { auto& rangesParam = *readProto.MutableKeyRanges(); - rangesParam.SetParamName(TString(ranges.Cast().Name())); + rangesParam.SetParamName(ranges.Cast().Name().StringValue()); } else { YQL_ENSURE( TCoVoid::Match(read.Ranges().Raw()), @@ -185,9 +190,15 @@ void FillReadRanges(const TReader& read, const TKikimrTableMetadata& tableMeta, if (settings.ItemsLimit) { TExprBase expr(settings.ItemsLimit); + if (expr.template Maybe<TCoUint64>()) { + auto* literal = readProto.MutableItemsLimit()->MutableLiteralValue(); + + literal->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); + literal->MutableType()->MutableData()->SetScheme(NScheme::NTypeIds::Uint64); - if (expr.template Maybe<TCoParameter>()) { - readProto.MutableItemsLimit()->SetParamName(TString(expr.template Cast<TCoParameter>().Name().Value())); + literal->MutableValue()->SetUint64(FromString<ui64>(expr.Cast<TCoUint64>().Literal().Value())); + } else if (expr.template Maybe<TCoParameter>()) { + readProto.MutableItemsLimit()->MutableParamValue()->SetParamName(expr.template Cast<TCoParameter>().Name().StringValue()); } else { YQL_ENSURE(false, "Unexpected ItemsLimit callable " << expr.Ref().Content()); } diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 003d4ffa20..5ab676b52c 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -930,8 +930,7 @@ private: } private: - void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) - { + void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) { if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { // Validate parameters YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit); @@ -1012,7 +1011,7 @@ private: readInfo.Ranges = std::move(*shardInfo.KeyReadRanges); readInfo.Columns = columns; - if (itemsLimit) { + if (itemsLimitParamName) { task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes); task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType); } diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index c2e872c4e8..e2bd251c77 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -449,28 +449,54 @@ protected: meta->SetTableKind((ui32)stageInfo.Meta.TableKind); } - void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyParamValue& paramValue, + void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes, NKikimr::NMiniKQL::TType*& itemsLimitType) { - itemsLimitParamName = paramValue.GetParamName(); + switch (protoItemsLimit.GetKindCase()) { + case NKqpProto::TKqpPhyValue::kLiteralValue: { + const auto& literalValue = protoItemsLimit.GetLiteralValue(); - if (!itemsLimitParamName) { - return; - } + auto [type, value] = NMiniKQL::ImportValueFromProto( + literalValue.GetType(), literalValue.GetValue(), typeEnv, holderFactory); + + YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data); + itemsLimit = value.Get<ui64>(); + itemsLimitType = type; + + return; + } + + case NKqpProto::TKqpPhyValue::kParamValue: { + itemsLimitParamName = protoItemsLimit.GetParamElementValue().GetParamName(); + if (!itemsLimitParamName) { + return; + } + + auto* itemsLimitParam = stageInfo.Meta.Tx.Params.Values.FindPtr(itemsLimitParamName); + YQL_ENSURE(itemsLimitParam); + + auto [type, value] = NMiniKQL::ImportValueFromProto( + itemsLimitParam->GetType(), itemsLimitParam->GetValue(), typeEnv, holderFactory); - auto* itemsLimitParam = stageInfo.Meta.Tx.Params.Values.FindPtr(itemsLimitParamName); - YQL_ENSURE(itemsLimitParam); + YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data); + itemsLimit = value.Get<ui64>(); - auto [type, value] = NMiniKQL::ImportValueFromProto(itemsLimitParam->GetType(), itemsLimitParam->GetValue(), typeEnv, holderFactory); + NYql::NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NYql::NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); + itemsLimitBytes = dataSerializer.Serialize(value, type); + itemsLimitType = type; - YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data); - itemsLimit = value.Get<ui64>(); + return; + } + + case NKqpProto::TKqpPhyValue::kParamElementValue: + case NKqpProto::TKqpPhyValue::kRowsList: + YQL_ENSURE(false, "Unexpected ItemsLimit kind " << protoItemsLimit.DebugString()); - NYql::NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NYql::NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); - itemsLimitBytes = dataSerializer.Serialize(value, type); - itemsLimitType = type; + case NKqpProto::TKqpPhyValue::KIND_NOT_SET: + return; + } } protected: diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 07c29f64e2..d162dcca1c 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -494,7 +494,7 @@ private: .ShardId = shardId, }; - if (itemsLimit && !task.Meta.Params.contains(itemsLimitParamName)) { + if (itemsLimitParamName && !task.Meta.Params.contains(itemsLimitParamName)) { task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes); task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType); } @@ -642,7 +642,7 @@ private: FillReadInfo(task.Meta, itemsLimit, reverse, readRange); - if (itemsLimit) { + if (itemsLimitParamName) { task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes); task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType); } diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h index 927aa910e8..459742b12f 100644 --- a/ydb/core/kqp/opt/kqp_opt_impl.h +++ b/ydb/core/kqp/opt/kqp_opt_impl.h @@ -12,7 +12,7 @@ namespace NKikimr::NKqp::NOpt { static inline void DumpAppliedRule(const TString& name, const NYql::TExprNode::TPtr& input, const NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx) { -//#define KQP_ENABLE_DUMP_APPLIED_RULE +// #define KQP_ENABLE_DUMP_APPLIED_RULE #ifdef KQP_ENABLE_DUMP_APPLIED_RULE if (input != output) { auto builder = TStringBuilder() << "Rule applied: " << name << Endl; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index e09357ea69..36b928d5fe 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -38,19 +38,14 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp } TMaybeNode<TExprBase> limitValue; - if (auto maybeTakeCount = take.Count().Maybe<TCoUint64>()) { - ui64 totalLimit; - ui64 takeValue = FromString<ui64>(maybeTakeCount.Cast().Literal().Value()); + auto maybeTakeCount = take.Count().Maybe<TCoUint64>(); + auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>(); - if (maybeSkip) { - if (auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>()) { - auto skipValue = FromString<ui64>(maybeSkipCount.Cast().Literal().Value()); - totalLimit = takeValue + skipValue; - } else { - return node; // ??? - } - } else { - totalLimit = takeValue; + if (maybeTakeCount && (!maybeSkip || maybeSkipCount)) { + ui64 totalLimit = FromString<ui64>(maybeTakeCount.Cast().Literal().Value()); + + if (maybeSkipCount) { + totalLimit += FromString<ui64>(maybeSkipCount.Cast().Literal().Value()); } limitValue = Build<TCoUint64>(ctx, node.Pos()) @@ -69,9 +64,13 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp YQL_CLOG(TRACE, ProviderKqp) << "-- set limit items value to " << limitValue.Cast().Ref().Dump(); - settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos()) - .Input(limitValue.Cast()) - .Done().Ptr()); + if (limitValue.Maybe<TCoUint64>()) { + settings.SetItemsLimit(limitValue.Cast().Ptr()); + } else { + settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos()) + .Input(limitValue.Cast()) + .Done().Ptr()); + } input = BuildReadNode(node.Pos(), ctx, input, settings); diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp index 346a8b15cb..00911b05e7 100644 --- a/ydb/core/kqp/prepare/kqp_query_plan.cpp +++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp @@ -76,12 +76,16 @@ struct TSerializerCtx { THashMap<ui32, TVector<NKikimrMiniKQL::TResult>> PureTxResults; }; -TString GetExprStr(const TExprBase& scalar) { +TString GetExprStr(const TExprBase& scalar, bool quoteStr = true) { if (auto maybeData = scalar.Maybe<TCoDataCtor>()) { auto literal = TString(maybeData.Cast().Literal()); CollapseText(literal, 32); - return TStringBuilder() << '"' << literal << '"'; + if (quoteStr) { + return TStringBuilder() << '"' << literal << '"'; + } else { + return literal; + } } if (auto maybeParam = scalar.Maybe<TCoParameter>()) { @@ -1049,7 +1053,7 @@ private: auto settings = NYql::TKqpReadTableSettings::Parse(read); if (settings.ItemsLimit && !readInfo.Limit) { - auto limit = GetExprStr(TExprBase(settings.ItemsLimit)); + auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false); if (auto maybeResultBinding = ContainResultBinding(limit)) { const auto [txId, resId] = *maybeResultBinding; if (auto result = GetResult(txId, resId)) { @@ -1180,7 +1184,7 @@ private: auto settings = NYql::TKqpReadTableSettings::Parse(read); if (settings.ItemsLimit && !readInfo.Limit) { - auto limit = GetExprStr(TExprBase(settings.ItemsLimit)); + auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false); if (auto maybeResultBinding = ContainResultBinding(limit)) { const auto [txId, resId] = *maybeResultBinding; if (auto result = GetResult(txId, resId)) { diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp index 358ebd0a3e..d382795590 100644 --- a/ydb/core/kqp/runtime/kqp_read_table.cpp +++ b/ydb/core/kqp/runtime/kqp_read_table.cpp @@ -175,8 +175,12 @@ TParseReadTableResult ParseWideReadTable(TCallable& callable) { MKQL_ENSURE_S(AS_TYPE(TDataType, AS_TYPE(TCallableType, node->GetType())->GetReturnType())->GetSchemeType() == NUdf::TDataType<ui64>::Id, "ItemsLimit must be () -> ui64"); result.ItemsLimit = node; + } else if (node->GetType()->GetKind() == TType::EKind::Data) { + MKQL_ENSURE_S(AS_TYPE(TDataType, node->GetType())->GetSchemeType() == NUdf::TDataType<ui64>::Id, + "ItemsLimit must be ui64"); + result.ItemsLimit = node; } else { - MKQL_ENSURE_S(node->GetType()->GetKind() == TType::EKind::Null, "ItemsLimit expected to be Callable or Null"); + MKQL_ENSURE_S(node->GetType()->GetKind() == TType::EKind::Null, "ItemsLimit expected to be Callable, Uint64 or Null"); } } @@ -218,10 +222,15 @@ TParseReadTableRangesResult ParseWideReadTableRanges(TCallable& callable) { ); result.ItemsLimit = limitNode; break; + case TType::EKind::Data: + MKQL_ENSURE_S(AS_TYPE(TDataType, limitNode->GetType())->GetSchemeType() == NUdf::TDataType<ui64>::Id, + "ItemsLimit must be ui64"); + result.ItemsLimit = limitNode; + break; case TType::EKind::Null: break; default: - MKQL_ENSURE(false, "ItemsLimit expected to be Callable or Null"); + MKQL_ENSURE(false, "ItemsLimit expected to be Callable, Data or Null"); } result.Reverse = AS_VALUE(TDataLiteral, reverse)->AsValue().Get<bool>(); diff --git a/ydb/core/kqp/ut/kqp_newengine_ut.cpp b/ydb/core/kqp/ut/kqp_newengine_ut.cpp index 3737687d8d..6a239188d5 100644 --- a/ydb/core/kqp/ut/kqp_newengine_ut.cpp +++ b/ydb/core/kqp/ut/kqp_newengine_ut.cpp @@ -1980,7 +1980,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto result = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, TStringBuilder() << query << Endl << "Failed with: " << result.GetIssues().ToString()); - UNIT_ASSERT_C(result.GetAst().Contains("('('\"ItemsLimit\" %kqp%tx_result_binding_0_0))"), + UNIT_ASSERT_C(result.GetAst().Contains("('('\"ItemsLimit\""), TStringBuilder() << query << Endl << "Failed with AST: " << result.GetAst()); NJson::TJsonValue plan; @@ -1998,7 +1998,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } auto readLimit = FindPlanNodeByKv(node, "ReadLimit", readLimitValue); - UNIT_ASSERT(readLimit.IsDefined()); + UNIT_ASSERT_C(readLimit.IsDefined(), result.GetPlan()); } } } diff --git a/ydb/core/kqp/ut/kqp_query_ut.cpp b/ydb/core/kqp/ut/kqp_query_ut.cpp index 5ef6d18f38..d2c7c29400 100644 --- a/ydb/core/kqp/ut/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/kqp_query_ut.cpp @@ -547,19 +547,10 @@ Y_UNIT_TEST_SUITE(KqpQuery) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - if (UseNewEngine) { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 0); - - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/Tmp"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1001); - } else { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Tmp"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1001); - } + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Tmp"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1001); } Y_UNIT_TEST(YqlSyntaxV0) { diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp index f7815c016e..0f889f0447 100644 --- a/ydb/core/kqp/ut/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp @@ -173,7 +173,7 @@ Y_UNIT_TEST(DeferredEffects) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); - UNIT_ASSERT_VALUES_EQUAL(plan.GetMapSafe().at("Plan").GetMapSafe().at("Plans").GetArraySafe().size(), 4); + UNIT_ASSERT_VALUES_EQUAL(plan.GetMapSafe().at("Plan").GetMapSafe().at("Plans").GetArraySafe().size(), 3); auto ru = result.GetResponseMetadata().find(NYdb::YDB_CONSUMED_UNITS_HEADER); UNIT_ASSERT(ru != result.GetResponseMetadata().end()); diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 44d8f58716..df7b979693 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -109,7 +109,7 @@ message TKqpPhyKeyRange { message TKqpPhyOpReadRange { TKqpPhyKeyRange KeyRange = 1; - TKqpPhyParamValue ItemsLimit = 2; + TKqpPhyValue ItemsLimit = 2; bool Reverse = 3; repeated bool SkipNullKeys = 4; } @@ -132,7 +132,7 @@ message TKqpPhyOpReadOlapRanges { // Where KeyColumns is values of start/end of range for corresponding key column in table TKqpPhyParamValue KeyRanges = 1; // Limit value, shard may stop after reading limit rows - TKqpPhyParamValue ItemsLimit = 2; + TKqpPhyValue ItemsLimit = 2; // Reverse sign, i.e. if user ask ORDER BY ... DESC we need to read table in reverse direction bool Reverse = 3; // Program in NKikimrSSA.TProgram format @@ -152,7 +152,7 @@ message TKqpPhyOpReadRanges { // Also it has special case - empty name. This means full scan. TKqpPhyParamValue KeyRanges = 1; // Limit value, shard may stop after reading limit rows - TKqpPhyParamValue ItemsLimit = 2; + TKqpPhyValue ItemsLimit = 2; // Reverse sign, i.e. if user ask ORDER BY ... DESC we need to read table in reverse direction bool Reverse = 3; } diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 5e90eb5992..c5f6c5470a 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1155,6 +1155,7 @@ Y_UNIT_TEST_SUITE(SystemView) { TTableClient client(env.GetDriver()); auto session = client.CreateSession().GetValueSync().GetSession(); auto result = session.ExecuteDataQuery( + "PRAGMA Kikimr.UseNewEngine='false'; " "SELECT * from `/Root/.sys/partition_stats`", TTxControl::BeginTx().CommitTx() ).GetValueSync(); diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp index 9fbd8ef0ad..5952dc15c0 100644 --- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp @@ -381,6 +381,9 @@ private: void RegisterDependencies() const final { this->FlowDependsOn(FromNode); this->FlowDependsOn(ToNode); + if (ItemsLimit) { + this->FlowDependsOn(ItemsLimit); + } } private: @@ -462,6 +465,9 @@ private: void RegisterDependencies() const final { this->FlowDependsOn(RangesNode); + if (ItemsLimit) { + this->FlowDependsOn(ItemsLimit); + } } TParseReadTableRangesResult ParseResult; |