aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <igor.makunin@gmail.com>2022-02-11 18:29:11 +0300
committerIgor Makunin <igor.makunin@gmail.com>2022-02-11 18:29:11 +0300
commit0d99fc6efe15562b8474a702ab18e406ac102cdf (patch)
treee90a8add5bf64405a8ca2bc9c2e4b2f5b8a009f5
parent9239e3a134ea21e9ad69912968146743a467bcca (diff)
downloadydb-0d99fc6efe15562b8474a702ab18e406ac102cdf.tar.gz
KIKIMR-14297: dont extract literals to the precompute stage for items limit
ref:0670aa18020017334f24a52604133d94cf0187a4
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp33
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp5
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h52
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp4
-rw-r--r--ydb/core/kqp/opt/kqp_opt_impl.h2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp29
-rw-r--r--ydb/core/kqp/prepare/kqp_query_plan.cpp12
-rw-r--r--ydb/core/kqp/runtime/kqp_read_table.cpp13
-rw-r--r--ydb/core/kqp/ut/kqp_newengine_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/kqp_query_ut.cpp17
-rw-r--r--ydb/core/kqp/ut/kqp_stats_ut.cpp2
-rw-r--r--ydb/core/protos/kqp_physical.proto6
-rw-r--r--ydb/core/sys_view/ut_kqp.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_read_table.cpp6
14 files changed, 116 insertions, 70 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index c7d305ab46..d778aec5be 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -139,8 +139,9 @@ void FillKeyRange(const TKqlKeyRange& range, NKqpProto::TKqpPhyKeyRange& rangePr
FillKeyBound(range.To(), *rangeProto.MutableTo());
}
-template <typename TReader, typename TProto>
-void FillReadRange(const TReader& read, const TKikimrTableMetadata& tableMeta, TProto& readProto) {
+void FillReadRange(const TKqpWideReadTable& read, const TKikimrTableMetadata& tableMeta,
+ NKqpProto::TKqpPhyOpReadRange& readProto)
+{
FillKeyRange(read.Range(), *readProto.MutableKeyRange());
auto settings = TKqpReadTableSettings::Parse(read);
@@ -154,8 +155,15 @@ void FillReadRange(const TReader& read, const TKikimrTableMetadata& tableMeta, T
if (settings.ItemsLimit) {
TExprBase expr(settings.ItemsLimit);
- if (expr.template Maybe<TCoParameter>()) {
- readProto.MutableItemsLimit()->SetParamName(TString(expr.template Cast<TCoParameter>().Name().Value()));
+ if (expr.Maybe<TCoUint64>()) {
+ auto* literal = readProto.MutableItemsLimit()->MutableLiteralValue();
+
+ literal->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
+ literal->MutableType()->MutableData()->SetScheme(NScheme::NTypeIds::Uint64);
+
+ literal->MutableValue()->SetUint64(FromString<ui64>(expr.Cast<TCoUint64>().Literal().Value()));
+ } else if (expr.Maybe<TCoParameter>()) {
+ readProto.MutableItemsLimit()->MutableParamValue()->SetParamName(expr.Cast<TCoParameter>().Name().StringValue());
} else {
YQL_ENSURE(false, "Unexpected ItemsLimit callable " << expr.Ref().Content());
}
@@ -165,15 +173,12 @@ void FillReadRange(const TReader& read, const TKikimrTableMetadata& tableMeta, T
}
template <typename TReader, typename TProto>
-void FillReadRanges(const TReader& read, const TKikimrTableMetadata& tableMeta, TProto& readProto)
-{
- Y_UNUSED(tableMeta);
-
+void FillReadRanges(const TReader& read, const TKikimrTableMetadata&, TProto& readProto) {
auto ranges = read.Ranges().template Maybe<TCoParameter>();
if (ranges.IsValid()) {
auto& rangesParam = *readProto.MutableKeyRanges();
- rangesParam.SetParamName(TString(ranges.Cast().Name()));
+ rangesParam.SetParamName(ranges.Cast().Name().StringValue());
} else {
YQL_ENSURE(
TCoVoid::Match(read.Ranges().Raw()),
@@ -185,9 +190,15 @@ void FillReadRanges(const TReader& read, const TKikimrTableMetadata& tableMeta,
if (settings.ItemsLimit) {
TExprBase expr(settings.ItemsLimit);
+ if (expr.template Maybe<TCoUint64>()) {
+ auto* literal = readProto.MutableItemsLimit()->MutableLiteralValue();
+
+ literal->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
+ literal->MutableType()->MutableData()->SetScheme(NScheme::NTypeIds::Uint64);
- if (expr.template Maybe<TCoParameter>()) {
- readProto.MutableItemsLimit()->SetParamName(TString(expr.template Cast<TCoParameter>().Name().Value()));
+ literal->MutableValue()->SetUint64(FromString<ui64>(expr.Cast<TCoUint64>().Literal().Value()));
+ } else if (expr.template Maybe<TCoParameter>()) {
+ readProto.MutableItemsLimit()->MutableParamValue()->SetParamName(expr.template Cast<TCoParameter>().Name().StringValue());
} else {
YQL_ENSURE(false, "Unexpected ItemsLimit callable " << expr.Ref().Content());
}
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 003d4ffa20..5ab676b52c 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -930,8 +930,7 @@ private:
}
private:
- void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse)
- {
+ void FillGeneralReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse) {
if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
// Validate parameters
YQL_ENSURE(taskMeta.ReadInfo.ItemsLimit == itemsLimit);
@@ -1012,7 +1011,7 @@ private:
readInfo.Ranges = std::move(*shardInfo.KeyReadRanges);
readInfo.Columns = columns;
- if (itemsLimit) {
+ if (itemsLimitParamName) {
task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes);
task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType);
}
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index c2e872c4e8..e2bd251c77 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -449,28 +449,54 @@ protected:
meta->SetTableKind((ui32)stageInfo.Meta.TableKind);
}
- void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyParamValue& paramValue,
+ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes,
NKikimr::NMiniKQL::TType*& itemsLimitType)
{
- itemsLimitParamName = paramValue.GetParamName();
+ switch (protoItemsLimit.GetKindCase()) {
+ case NKqpProto::TKqpPhyValue::kLiteralValue: {
+ const auto& literalValue = protoItemsLimit.GetLiteralValue();
- if (!itemsLimitParamName) {
- return;
- }
+ auto [type, value] = NMiniKQL::ImportValueFromProto(
+ literalValue.GetType(), literalValue.GetValue(), typeEnv, holderFactory);
+
+ YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data);
+ itemsLimit = value.Get<ui64>();
+ itemsLimitType = type;
+
+ return;
+ }
+
+ case NKqpProto::TKqpPhyValue::kParamValue: {
+ itemsLimitParamName = protoItemsLimit.GetParamElementValue().GetParamName();
+ if (!itemsLimitParamName) {
+ return;
+ }
+
+ auto* itemsLimitParam = stageInfo.Meta.Tx.Params.Values.FindPtr(itemsLimitParamName);
+ YQL_ENSURE(itemsLimitParam);
+
+ auto [type, value] = NMiniKQL::ImportValueFromProto(
+ itemsLimitParam->GetType(), itemsLimitParam->GetValue(), typeEnv, holderFactory);
- auto* itemsLimitParam = stageInfo.Meta.Tx.Params.Values.FindPtr(itemsLimitParamName);
- YQL_ENSURE(itemsLimitParam);
+ YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data);
+ itemsLimit = value.Get<ui64>();
- auto [type, value] = NMiniKQL::ImportValueFromProto(itemsLimitParam->GetType(), itemsLimitParam->GetValue(), typeEnv, holderFactory);
+ NYql::NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NYql::NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+ itemsLimitBytes = dataSerializer.Serialize(value, type);
+ itemsLimitType = type;
- YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data);
- itemsLimit = value.Get<ui64>();
+ return;
+ }
+
+ case NKqpProto::TKqpPhyValue::kParamElementValue:
+ case NKqpProto::TKqpPhyValue::kRowsList:
+ YQL_ENSURE(false, "Unexpected ItemsLimit kind " << protoItemsLimit.DebugString());
- NYql::NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NYql::NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
- itemsLimitBytes = dataSerializer.Serialize(value, type);
- itemsLimitType = type;
+ case NKqpProto::TKqpPhyValue::KIND_NOT_SET:
+ return;
+ }
}
protected:
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 07c29f64e2..d162dcca1c 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -494,7 +494,7 @@ private:
.ShardId = shardId,
};
- if (itemsLimit && !task.Meta.Params.contains(itemsLimitParamName)) {
+ if (itemsLimitParamName && !task.Meta.Params.contains(itemsLimitParamName)) {
task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes);
task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType);
}
@@ -642,7 +642,7 @@ private:
FillReadInfo(task.Meta, itemsLimit, reverse, readRange);
- if (itemsLimit) {
+ if (itemsLimitParamName) {
task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes);
task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType);
}
diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h
index 927aa910e8..459742b12f 100644
--- a/ydb/core/kqp/opt/kqp_opt_impl.h
+++ b/ydb/core/kqp/opt/kqp_opt_impl.h
@@ -12,7 +12,7 @@ namespace NKikimr::NKqp::NOpt {
static inline void DumpAppliedRule(const TString& name, const NYql::TExprNode::TPtr& input,
const NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx)
{
-//#define KQP_ENABLE_DUMP_APPLIED_RULE
+// #define KQP_ENABLE_DUMP_APPLIED_RULE
#ifdef KQP_ENABLE_DUMP_APPLIED_RULE
if (input != output) {
auto builder = TStringBuilder() << "Rule applied: " << name << Endl;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index e09357ea69..36b928d5fe 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -38,19 +38,14 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp
}
TMaybeNode<TExprBase> limitValue;
- if (auto maybeTakeCount = take.Count().Maybe<TCoUint64>()) {
- ui64 totalLimit;
- ui64 takeValue = FromString<ui64>(maybeTakeCount.Cast().Literal().Value());
+ auto maybeTakeCount = take.Count().Maybe<TCoUint64>();
+ auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>();
- if (maybeSkip) {
- if (auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>()) {
- auto skipValue = FromString<ui64>(maybeSkipCount.Cast().Literal().Value());
- totalLimit = takeValue + skipValue;
- } else {
- return node; // ???
- }
- } else {
- totalLimit = takeValue;
+ if (maybeTakeCount && (!maybeSkip || maybeSkipCount)) {
+ ui64 totalLimit = FromString<ui64>(maybeTakeCount.Cast().Literal().Value());
+
+ if (maybeSkipCount) {
+ totalLimit += FromString<ui64>(maybeSkipCount.Cast().Literal().Value());
}
limitValue = Build<TCoUint64>(ctx, node.Pos())
@@ -69,9 +64,13 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp
YQL_CLOG(TRACE, ProviderKqp) << "-- set limit items value to " << limitValue.Cast().Ref().Dump();
- settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos())
- .Input(limitValue.Cast())
- .Done().Ptr());
+ if (limitValue.Maybe<TCoUint64>()) {
+ settings.SetItemsLimit(limitValue.Cast().Ptr());
+ } else {
+ settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos())
+ .Input(limitValue.Cast())
+ .Done().Ptr());
+ }
input = BuildReadNode(node.Pos(), ctx, input, settings);
diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp
index 346a8b15cb..00911b05e7 100644
--- a/ydb/core/kqp/prepare/kqp_query_plan.cpp
+++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp
@@ -76,12 +76,16 @@ struct TSerializerCtx {
THashMap<ui32, TVector<NKikimrMiniKQL::TResult>> PureTxResults;
};
-TString GetExprStr(const TExprBase& scalar) {
+TString GetExprStr(const TExprBase& scalar, bool quoteStr = true) {
if (auto maybeData = scalar.Maybe<TCoDataCtor>()) {
auto literal = TString(maybeData.Cast().Literal());
CollapseText(literal, 32);
- return TStringBuilder() << '"' << literal << '"';
+ if (quoteStr) {
+ return TStringBuilder() << '"' << literal << '"';
+ } else {
+ return literal;
+ }
}
if (auto maybeParam = scalar.Maybe<TCoParameter>()) {
@@ -1049,7 +1053,7 @@ private:
auto settings = NYql::TKqpReadTableSettings::Parse(read);
if (settings.ItemsLimit && !readInfo.Limit) {
- auto limit = GetExprStr(TExprBase(settings.ItemsLimit));
+ auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false);
if (auto maybeResultBinding = ContainResultBinding(limit)) {
const auto [txId, resId] = *maybeResultBinding;
if (auto result = GetResult(txId, resId)) {
@@ -1180,7 +1184,7 @@ private:
auto settings = NYql::TKqpReadTableSettings::Parse(read);
if (settings.ItemsLimit && !readInfo.Limit) {
- auto limit = GetExprStr(TExprBase(settings.ItemsLimit));
+ auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false);
if (auto maybeResultBinding = ContainResultBinding(limit)) {
const auto [txId, resId] = *maybeResultBinding;
if (auto result = GetResult(txId, resId)) {
diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp
index 358ebd0a3e..d382795590 100644
--- a/ydb/core/kqp/runtime/kqp_read_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_table.cpp
@@ -175,8 +175,12 @@ TParseReadTableResult ParseWideReadTable(TCallable& callable) {
MKQL_ENSURE_S(AS_TYPE(TDataType, AS_TYPE(TCallableType, node->GetType())->GetReturnType())->GetSchemeType()
== NUdf::TDataType<ui64>::Id, "ItemsLimit must be () -> ui64");
result.ItemsLimit = node;
+ } else if (node->GetType()->GetKind() == TType::EKind::Data) {
+ MKQL_ENSURE_S(AS_TYPE(TDataType, node->GetType())->GetSchemeType() == NUdf::TDataType<ui64>::Id,
+ "ItemsLimit must be ui64");
+ result.ItemsLimit = node;
} else {
- MKQL_ENSURE_S(node->GetType()->GetKind() == TType::EKind::Null, "ItemsLimit expected to be Callable or Null");
+ MKQL_ENSURE_S(node->GetType()->GetKind() == TType::EKind::Null, "ItemsLimit expected to be Callable, Uint64 or Null");
}
}
@@ -218,10 +222,15 @@ TParseReadTableRangesResult ParseWideReadTableRanges(TCallable& callable) {
);
result.ItemsLimit = limitNode;
break;
+ case TType::EKind::Data:
+ MKQL_ENSURE_S(AS_TYPE(TDataType, limitNode->GetType())->GetSchemeType() == NUdf::TDataType<ui64>::Id,
+ "ItemsLimit must be ui64");
+ result.ItemsLimit = limitNode;
+ break;
case TType::EKind::Null:
break;
default:
- MKQL_ENSURE(false, "ItemsLimit expected to be Callable or Null");
+ MKQL_ENSURE(false, "ItemsLimit expected to be Callable, Data or Null");
}
result.Reverse = AS_VALUE(TDataLiteral, reverse)->AsValue().Get<bool>();
diff --git a/ydb/core/kqp/ut/kqp_newengine_ut.cpp b/ydb/core/kqp/ut/kqp_newengine_ut.cpp
index 3737687d8d..6a239188d5 100644
--- a/ydb/core/kqp/ut/kqp_newengine_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_newengine_ut.cpp
@@ -1980,7 +1980,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto result = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS,
TStringBuilder() << query << Endl << "Failed with: " << result.GetIssues().ToString());
- UNIT_ASSERT_C(result.GetAst().Contains("('('\"ItemsLimit\" %kqp%tx_result_binding_0_0))"),
+ UNIT_ASSERT_C(result.GetAst().Contains("('('\"ItemsLimit\""),
TStringBuilder() << query << Endl << "Failed with AST: " << result.GetAst());
NJson::TJsonValue plan;
@@ -1998,7 +1998,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
auto readLimit = FindPlanNodeByKv(node, "ReadLimit", readLimitValue);
- UNIT_ASSERT(readLimit.IsDefined());
+ UNIT_ASSERT_C(readLimit.IsDefined(), result.GetPlan());
}
}
}
diff --git a/ydb/core/kqp/ut/kqp_query_ut.cpp b/ydb/core/kqp/ut/kqp_query_ut.cpp
index 5ef6d18f38..d2c7c29400 100644
--- a/ydb/core/kqp/ut/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_query_ut.cpp
@@ -547,19 +547,10 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- if (UseNewEngine) {
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 0);
-
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/Tmp");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1001);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Tmp");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1001);
- }
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Tmp");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1001);
}
Y_UNIT_TEST(YqlSyntaxV0) {
diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp
index f7815c016e..0f889f0447 100644
--- a/ydb/core/kqp/ut/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp
@@ -173,7 +173,7 @@ Y_UNIT_TEST(DeferredEffects) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
- UNIT_ASSERT_VALUES_EQUAL(plan.GetMapSafe().at("Plan").GetMapSafe().at("Plans").GetArraySafe().size(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(plan.GetMapSafe().at("Plan").GetMapSafe().at("Plans").GetArraySafe().size(), 3);
auto ru = result.GetResponseMetadata().find(NYdb::YDB_CONSUMED_UNITS_HEADER);
UNIT_ASSERT(ru != result.GetResponseMetadata().end());
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 44d8f58716..df7b979693 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -109,7 +109,7 @@ message TKqpPhyKeyRange {
message TKqpPhyOpReadRange {
TKqpPhyKeyRange KeyRange = 1;
- TKqpPhyParamValue ItemsLimit = 2;
+ TKqpPhyValue ItemsLimit = 2;
bool Reverse = 3;
repeated bool SkipNullKeys = 4;
}
@@ -132,7 +132,7 @@ message TKqpPhyOpReadOlapRanges {
// Where KeyColumns is values of start/end of range for corresponding key column in table
TKqpPhyParamValue KeyRanges = 1;
// Limit value, shard may stop after reading limit rows
- TKqpPhyParamValue ItemsLimit = 2;
+ TKqpPhyValue ItemsLimit = 2;
// Reverse sign, i.e. if user ask ORDER BY ... DESC we need to read table in reverse direction
bool Reverse = 3;
// Program in NKikimrSSA.TProgram format
@@ -152,7 +152,7 @@ message TKqpPhyOpReadRanges {
// Also it has special case - empty name. This means full scan.
TKqpPhyParamValue KeyRanges = 1;
// Limit value, shard may stop after reading limit rows
- TKqpPhyParamValue ItemsLimit = 2;
+ TKqpPhyValue ItemsLimit = 2;
// Reverse sign, i.e. if user ask ORDER BY ... DESC we need to read table in reverse direction
bool Reverse = 3;
}
diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp
index 5e90eb5992..c5f6c5470a 100644
--- a/ydb/core/sys_view/ut_kqp.cpp
+++ b/ydb/core/sys_view/ut_kqp.cpp
@@ -1155,6 +1155,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
TTableClient client(env.GetDriver());
auto session = client.CreateSession().GetValueSync().GetSession();
auto result = session.ExecuteDataQuery(
+ "PRAGMA Kikimr.UseNewEngine='false'; "
"SELECT * from `/Root/.sys/partition_stats`", TTxControl::BeginTx().CommitTx()
).GetValueSync();
diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
index 9fbd8ef0ad..5952dc15c0 100644
--- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
@@ -381,6 +381,9 @@ private:
void RegisterDependencies() const final {
this->FlowDependsOn(FromNode);
this->FlowDependsOn(ToNode);
+ if (ItemsLimit) {
+ this->FlowDependsOn(ItemsLimit);
+ }
}
private:
@@ -462,6 +465,9 @@ private:
void RegisterDependencies() const final {
this->FlowDependsOn(RangesNode);
+ if (ItemsLimit) {
+ this->FlowDependsOn(ItemsLimit);
+ }
}
TParseReadTableRangesResult ParseResult;