aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2022-11-23 12:23:42 +0300
committerssmike <ssmike@ydb.tech>2022-11-23 12:23:42 +0300
commit175acac9f8fb4ac933308991e3972ac3ace675ce (patch)
tree3f08f5289c391775c8a1e9b41f8453f91f35f3f6
parent7bb2992dfe38a243f1410d914780a2e6b5d0d741 (diff)
downloadydb-175acac9f8fb4ac933308991e3972ac3ace675ce.tar.gz
support literal ranges in DqReadRangesSource
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp3
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp5
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp55
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp37
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp32
6 files changed, 121 insertions, 18 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index a065cffaeed..e80a98d5d07 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -602,8 +602,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
keyColumnTypes, source.GetRanges(), stageInfo, holderFactory, typeEnv
);
} else if (source.HasKeyRange()) {
- //TODO: support KeyRange
- Y_ENSURE(false);
+ ranges.push_back(MakeKeyRange(keyColumnTypes, source.GetKeyRange(), stageInfo, holderFactory, typeEnv));
} else {
ranges = BuildFullRange(keyColumnTypes);
}
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index 6033b54279a..6c8f2374957 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -259,12 +259,13 @@ TStatus AnnotateKqpSourceSettings(const TExprNode::TPtr& node, TExprContext& ctx
!TCoParameter::Match(ranges) &&
!TCoRangeFinalize::Match(ranges) &&
!TDqPhyPrecompute::Match(ranges) &&
- !TKqpTxResultBinding::Match(ranges))
+ !TKqpTxResultBinding::Match(ranges) &&
+ !TKqlKeyRange::Match(ranges))
{
ctx.AddError(TIssue(
ctx.GetPosition(ranges->Pos()),
TStringBuilder()
- << "Expected Void, Parameter, Argument or RangeFinalize in ranges, but got: "
+ << "Expected KeyRange, Void, Parameter, Argument or RangeFinalize in ranges, but got: "
<< ranges->Content()
));
return TStatus::Error;
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 67681c51db6..04cf2889bd5 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -274,10 +274,17 @@ private:
bindingsMap.emplace(std::move(paramName), std::move(paramBinding));
+ YQL_ENSURE(!TDqConnection::Match(node.Get()));
return true;
};
VisitExpr(stage.Program().Body().Ptr(), bindingsBuilder);
+ for (ui32 i = 0; i < stage.Inputs().Size(); ++i) {
+ auto input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqSource>()) {
+ VisitExpr(input.Ptr(), bindingsBuilder);
+ }
+ }
TVector<TExprBase> newInputs;
TVector<TCoArgument> newArgs;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index da6a009c9f0..9d489cda33f 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -59,6 +59,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
return node;
}
const TKqlReadTable& read = node.Cast<TKqlReadTable>();
+ bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery();
TVector<TExprBase> values;
TNodeOnNodeOwnedMap replaceMap;
@@ -125,26 +126,60 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
.Build()
.Done();
- TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")};
- programArgs.push_back(arg);
- inputs.push_back(precompute);
+ if (useSource) {
+ for (size_t i = 0; i < values.size(); ++i) {
+ auto replace = Build<TCoNth>(ctx, read.Pos())
+ .Tuple(precompute)
+ .Index().Build(ToString(i))
+ .Done()
+ .Ptr();
+
+ rangeReplaces[values[i].Raw()] = replace;
+ }
+ } else {
+ TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")};
+ programArgs.push_back(arg);
- for (size_t i = 0; i < values.size(); ++i) {
- auto replace = Build<TCoNth>(ctx, read.Pos())
- .Tuple(arg)
- .Index().Build(ToString(i))
- .Done()
- .Ptr();
+ for (size_t i = 0; i < values.size(); ++i) {
+ auto replace = Build<TCoNth>(ctx, read.Pos())
+ .Tuple(arg)
+ .Index().Build(ToString(i))
+ .Done()
+ .Ptr();
- rangeReplaces[values[i].Raw()] = replace;
+ rangeReplaces[values[i].Raw()] = replace;
+ }
+ inputs.push_back(precompute);
}
}
+ if (useSource) {
+ inputs.push_back(
+ Build<TDqSource>(ctx, read.Pos())
+ .Settings<TKqpReadRangesSourceSettings>()
+ .Table(read.Table())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .RangesExpr(ctx.ReplaceNodes(read.Range().Ptr(), rangeReplaces))
+ .Build()
+ .DataSource<TCoDataSource>()
+ .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
+ .Build()
+ .Done());
+ }
+
auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path());
TMaybeNode<TExprBase> phyRead;
switch (tableDesc.Metadata->Kind) {
case EKikimrTableKind::Datashard:
+ if (useSource) {
+ TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_source_arg")};
+ programArgs.push_back(arg);
+
+ phyRead = arg;
+ break;
+ }
case EKikimrTableKind::SysView:
phyRead = Build<TKqpReadTable>(ctx, read.Pos())
.Table(read.Table())
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 7e285ff8cf7..0e195bcc184 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -406,6 +406,33 @@ public:
return true;
}
+ const TStructExprType* CollectParameters(const TDqPhyStage& stage, TExprContext& ctx) {
+ TVector<const TItemExprType*> inputsParams;
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ auto input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqSource>()) {
+ VisitExpr(input.Ptr(), [&] (const TExprNode::TPtr& node) {
+ if (auto maybeParam = TMaybeNode<TCoParameter>(node)) {
+ auto param = maybeParam.Cast();
+
+ inputsParams.push_back(ctx.MakeType<TItemExprType>(param.Name(), param.Ref().GetTypeAnn()));
+ }
+
+ return true;
+ });
+ }
+ }
+ auto programParams = NDq::CollectParameters(stage.Program(), ctx);
+ if (inputsParams.empty()) {
+ return programParams;
+ } else {
+ for (auto member : programParams->GetItems()) {
+ inputsParams.push_back(member);
+ }
+ return ctx.MakeType<TStructExprType>(inputsParams);
+ }
+ }
+
private:
NKikimr::NMiniKQL::TType* CompileType(TProgramBuilder& pgmBuilder, const TTypeAnnotationNode& inputType) {
TStringStream errorStream;
@@ -534,7 +561,7 @@ private:
stageProto.SetOutputsCount(outputsCount);
- auto paramsType = NDq::CollectParameters(stage.Program(), ctx);
+ auto paramsType = CollectParameters(stage, ctx);
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
ctx, {});
@@ -668,11 +695,13 @@ private:
if (ranges.IsValid()) {
auto& rangesParam = *readProto.MutableRanges();
rangesParam.SetParamName(ranges.Cast().Name().StringValue());
- } else {
+ } else if (!TCoVoid::Match(settings.RangesExpr().Raw())) {
YQL_ENSURE(
- TCoVoid::Match(settings.RangesExpr().Raw()),
- "Read ranges should be parameter or void, got: " << settings.RangesExpr().Cast().Ptr()->Content()
+ TKqlKeyRange::Match(settings.RangesExpr().Raw()),
+ "Read ranges should be parameter or KqlKeyRange, got: " << settings.RangesExpr().Cast().Ptr()->Content()
);
+
+ FillKeyRange(settings.RangesExpr().Cast<TKqlKeyRange>(), *readProto.MutableKeyRange());
}
if (readSettings.ItemsLimit) {
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index 81455851f99..9ef81036b53 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1980,6 +1980,38 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
}
+ Y_UNIT_TEST(DqSourceLiteralRange) {
+ TKikimrSettings settings;
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ flags.SetEnableKqpScanQuerySourceRead(true);
+ settings.SetFeatureFlags(flags);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ CreateSampleTables(kikimr);
+
+ {
+ auto result = db.StreamExecuteScanQuery(R"(
+ SELECT Key, Data FROM `/Root/EightShard` WHERE Key = 101 ORDER BY Key;
+ )").GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]]])", StreamResultToYson(result));
+ }
+
+ {
+ auto params = TParamsBuilder().AddParam("$param").Uint64(101).Build().Build();
+
+ auto result = db.StreamExecuteScanQuery(R"(
+ DECLARE $param as Uint64;
+ SELECT Key, Data FROM `/Root/EightShard` WHERE Key = $param ORDER BY Key;
+ )",
+ params).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]]])", StreamResultToYson(result));
+ }
+ }
+
Y_UNIT_TEST(StreamLookup) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();