diff options
author | ssmike <ssmike@ydb.tech> | 2022-11-23 12:23:42 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2022-11-23 12:23:42 +0300 |
commit | 175acac9f8fb4ac933308991e3972ac3ace675ce (patch) | |
tree | 3f08f5289c391775c8a1e9b41f8453f91f35f3f6 | |
parent | 7bb2992dfe38a243f1410d914780a2e6b5d0d741 (diff) | |
download | ydb-175acac9f8fb4ac933308991e3972ac3ace675ce.tar.gz |
support literal ranges in DqReadRangesSource
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 55 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 32 |
6 files changed, 121 insertions, 18 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index a065cffaeed..e80a98d5d07 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -602,8 +602,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, keyColumnTypes, source.GetRanges(), stageInfo, holderFactory, typeEnv ); } else if (source.HasKeyRange()) { - //TODO: support KeyRange - Y_ENSURE(false); + ranges.push_back(MakeKeyRange(keyColumnTypes, source.GetKeyRange(), stageInfo, holderFactory, typeEnv)); } else { ranges = BuildFullRange(keyColumnTypes); } diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 6033b54279a..6c8f2374957 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -259,12 +259,13 @@ TStatus AnnotateKqpSourceSettings(const TExprNode::TPtr& node, TExprContext& ctx !TCoParameter::Match(ranges) && !TCoRangeFinalize::Match(ranges) && !TDqPhyPrecompute::Match(ranges) && - !TKqpTxResultBinding::Match(ranges)) + !TKqpTxResultBinding::Match(ranges) && + !TKqlKeyRange::Match(ranges)) { ctx.AddError(TIssue( ctx.GetPosition(ranges->Pos()), TStringBuilder() - << "Expected Void, Parameter, Argument or RangeFinalize in ranges, but got: " + << "Expected KeyRange, Void, Parameter, Argument or RangeFinalize in ranges, but got: " << ranges->Content() )); return TStatus::Error; diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 67681c51db6..04cf2889bd5 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -274,10 +274,17 @@ private: bindingsMap.emplace(std::move(paramName), std::move(paramBinding)); + YQL_ENSURE(!TDqConnection::Match(node.Get())); return true; }; VisitExpr(stage.Program().Body().Ptr(), bindingsBuilder); + for (ui32 i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (input.Maybe<TDqSource>()) { + VisitExpr(input.Ptr(), bindingsBuilder); + } + } TVector<TExprBase> newInputs; TVector<TCoArgument> newArgs; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index da6a009c9f0..9d489cda33f 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -59,6 +59,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp return node; } const TKqlReadTable& read = node.Cast<TKqlReadTable>(); + bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery(); TVector<TExprBase> values; TNodeOnNodeOwnedMap replaceMap; @@ -125,26 +126,60 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp .Build() .Done(); - TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")}; - programArgs.push_back(arg); - inputs.push_back(precompute); + if (useSource) { + for (size_t i = 0; i < values.size(); ++i) { + auto replace = Build<TCoNth>(ctx, read.Pos()) + .Tuple(precompute) + .Index().Build(ToString(i)) + .Done() + .Ptr(); + + rangeReplaces[values[i].Raw()] = replace; + } + } else { + TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")}; + programArgs.push_back(arg); - for (size_t i = 0; i < values.size(); ++i) { - auto replace = Build<TCoNth>(ctx, read.Pos()) - .Tuple(arg) - .Index().Build(ToString(i)) - .Done() - .Ptr(); + for (size_t i = 0; i < values.size(); ++i) { + auto replace = Build<TCoNth>(ctx, read.Pos()) + .Tuple(arg) + .Index().Build(ToString(i)) + .Done() + .Ptr(); - rangeReplaces[values[i].Raw()] = replace; + rangeReplaces[values[i].Raw()] = replace; + } + inputs.push_back(precompute); } } + if (useSource) { + inputs.push_back( + Build<TDqSource>(ctx, read.Pos()) + .Settings<TKqpReadRangesSourceSettings>() + .Table(read.Table()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .RangesExpr(ctx.ReplaceNodes(read.Range().Ptr(), rangeReplaces)) + .Build() + .DataSource<TCoDataSource>() + .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build() + .Build() + .Done()); + } + auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path()); TMaybeNode<TExprBase> phyRead; switch (tableDesc.Metadata->Kind) { case EKikimrTableKind::Datashard: + if (useSource) { + TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_source_arg")}; + programArgs.push_back(arg); + + phyRead = arg; + break; + } case EKikimrTableKind::SysView: phyRead = Build<TKqpReadTable>(ctx, read.Pos()) .Table(read.Table()) diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 7e285ff8cf7..0e195bcc184 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -406,6 +406,33 @@ public: return true; } + const TStructExprType* CollectParameters(const TDqPhyStage& stage, TExprContext& ctx) { + TVector<const TItemExprType*> inputsParams; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (input.Maybe<TDqSource>()) { + VisitExpr(input.Ptr(), [&] (const TExprNode::TPtr& node) { + if (auto maybeParam = TMaybeNode<TCoParameter>(node)) { + auto param = maybeParam.Cast(); + + inputsParams.push_back(ctx.MakeType<TItemExprType>(param.Name(), param.Ref().GetTypeAnn())); + } + + return true; + }); + } + } + auto programParams = NDq::CollectParameters(stage.Program(), ctx); + if (inputsParams.empty()) { + return programParams; + } else { + for (auto member : programParams->GetItems()) { + inputsParams.push_back(member); + } + return ctx.MakeType<TStructExprType>(inputsParams); + } + } + private: NKikimr::NMiniKQL::TType* CompileType(TProgramBuilder& pgmBuilder, const TTypeAnnotationNode& inputType) { TStringStream errorStream; @@ -534,7 +561,7 @@ private: stageProto.SetOutputsCount(outputsCount); - auto paramsType = NDq::CollectParameters(stage.Program(), ctx); + auto paramsType = CollectParameters(stage, ctx); auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry, ctx, {}); @@ -668,11 +695,13 @@ private: if (ranges.IsValid()) { auto& rangesParam = *readProto.MutableRanges(); rangesParam.SetParamName(ranges.Cast().Name().StringValue()); - } else { + } else if (!TCoVoid::Match(settings.RangesExpr().Raw())) { YQL_ENSURE( - TCoVoid::Match(settings.RangesExpr().Raw()), - "Read ranges should be parameter or void, got: " << settings.RangesExpr().Cast().Ptr()->Content() + TKqlKeyRange::Match(settings.RangesExpr().Raw()), + "Read ranges should be parameter or KqlKeyRange, got: " << settings.RangesExpr().Cast().Ptr()->Content() ); + + FillKeyRange(settings.RangesExpr().Cast<TKqlKeyRange>(), *readProto.MutableKeyRange()); } if (readSettings.ItemsLimit) { diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index 81455851f99..9ef81036b53 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1980,6 +1980,38 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } + Y_UNIT_TEST(DqSourceLiteralRange) { + TKikimrSettings settings; + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + flags.SetEnableKqpScanQuerySourceRead(true); + settings.SetFeatureFlags(flags); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + CreateSampleTables(kikimr); + + { + auto result = db.StreamExecuteScanQuery(R"( + SELECT Key, Data FROM `/Root/EightShard` WHERE Key = 101 ORDER BY Key; + )").GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]]])", StreamResultToYson(result)); + } + + { + auto params = TParamsBuilder().AddParam("$param").Uint64(101).Build().Build(); + + auto result = db.StreamExecuteScanQuery(R"( + DECLARE $param as Uint64; + SELECT Key, Data FROM `/Root/EightShard` WHERE Key = $param ORDER BY Key; + )", + params).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]]])", StreamResultToYson(result)); + } + } + Y_UNIT_TEST(StreamLookup) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); |