diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-30 21:22:25 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-30 21:22:25 +0300 |
commit | 6e0c0bbc2c14b56205ce2293a951c7edd06d7587 (patch) | |
tree | 73a1f4fef6e01f1d6998225e28066eb2d66f1ee0 | |
parent | f68f245e2281c24c14d352eff67c6e639faa150e (diff) | |
download | ydb-6e0c0bbc2c14b56205ce2293a951c7edd06d7587.tar.gz |
Support reverse and skipnullkeys for source-based reads
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 88 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp | 119 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 89 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_extr_members.cpp | 2 |
11 files changed, 326 insertions, 59 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index eb2d22948e..0f5236461d 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -59,6 +59,8 @@ struct TKqpReadTableSettings { void SetReverse() { Reverse = true; } void SetSorted() { Sorted = true; } + bool operator == (const TKqpReadTableSettings&) const = default; + static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableBase& node); static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableRangesBase& node); static TKqpReadTableSettings Parse(const NNodes::TCoNameValueTupleList& node); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 91e5a335be..34dea52606 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -670,15 +670,12 @@ protected: auto columns = BuildKqpColumns(source, table); auto partitions = PrunePartitions(TableKeys, source, stageInfo, HolderFactory(), TypeEnv()); - bool reverse = false; ui64 itemsLimit = 0; TString itemsLimitParamName; NYql::NDqProto::TData itemsLimitBytes; NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; - YQL_ENSURE(!source.GetReverse(), "reverse not supported yet"); - for (auto& [shardId, shardInfo] : partitions) { YQL_ENSURE(!shardInfo.KeyWriteRanges); @@ -696,9 +693,6 @@ protected: NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; FillTableMeta(stageInfo, settings.MutableTable()); - for (auto& key : source.GetSkipNullKeys()) { - settings.AddSkipNullKeys(key); - } for (auto& keyColumn : keyTypes) { settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId())); @@ -727,7 +721,7 @@ protected: } shardInfo.KeyReadRanges->SerializeTo(&settings); - settings.SetReverse(reverse); + settings.SetReverse(source.GetReverse()); settings.SetSorted(source.GetSorted()); settings.SetShardIdHint(shardId); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index d9a1ba27e3..90ec709c7a 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -34,6 +34,7 @@ public: AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages)); AddHandler(0, [](const TExprNode* node) { return TCoSort::Match(node) || TCoTopSort::Match(node); }, HNDL(RemoveRedundantSortByPk)); + AddHandler(0, &TDqStage::Match, HNDL(RemoveRedundantSortByPkOverSource)); AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter)); AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage)); @@ -106,6 +107,8 @@ public: AddHandler(1, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<true>)); AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>)); AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>)); + + AddHandler(2, &TDqStage::Match, HNDL(ExpandNullMembersForReadTableSource)); #undef HNDL SetGlobal(1u); @@ -137,12 +140,24 @@ protected: return output; } + TMaybeNode<TExprBase> RemoveRedundantSortByPkOverSource(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRemoveRedundantSortByPkOverSource(node, ctx, KqpCtx); + DumpAppliedRule("RemoveRedundantSortByPkOverSource", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> RemoveRedundantSortByPk(TExprBase node, TExprContext& ctx) { TExprBase output = KqpRemoveRedundantSortByPk(node, ctx, KqpCtx); DumpAppliedRule("RemoveRedundantSortByPk", node.Ptr(), output.Ptr(), ctx); return output; } + TMaybeNode<TExprBase> ExpandNullMembersForReadTableSource(TExprBase node, TExprContext& ctx) { + TExprBase output = ExpandSkipNullMembersForReadTableSource(node, ctx, KqpCtx); + DumpAppliedRule("ExpandSkipNullMembersForReadTableSource", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> ApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx) { TExprBase output = KqpApplyLimitToReadTableSource(node, ctx, KqpCtx); DumpAppliedRule("ApplyLimitToReadTableSource", node.Ptr(), output.Ptr(), ctx); @@ -329,7 +344,7 @@ protected: TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, /*pushLeftStage =*/ !KqpCtx.IsDataQuery()); + TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, /*pushLeftStage =*/ !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node)); DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index c463db658a..7a9d8cd8f7 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -54,6 +54,94 @@ TMaybeNode<TDqPhyPrecompute> BuildLookupKeysPrecompute(const TExprBase& input, T .Done(); } +// ReadRangesSource can't deal with skipnullkeys, so we should expand it to (ExtractMembers (SkipNullKeys)) +//FIXME: simplify KIKIMR-16987 +NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext&) { + auto stage = node.Cast<TDqStage>(); + TMaybe<size_t> tableSourceIndex; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { + tableSourceIndex = i; + } + } + if (!tableSourceIndex) { + return node; + } + + auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>(); + auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); + auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings()); + + if (settings.SkipNullKeys.empty()) { + return node; + } + + auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); + + THashSet<TString> seenColumns; + TVector<TCoAtom> columns; + TVector<TCoAtom> skipNullColumns; + for (size_t i = 0; i < readRangesSource.Columns().Size(); ++i) { + auto atom = readRangesSource.Columns().Item(i); + auto column = atom.StringValue(); + if (seenColumns.insert(column).second) { + columns.push_back(atom); + } + } + for (auto& column : settings.SkipNullKeys) { + TCoAtom atom(ctx.NewAtom(readRangesSource.Settings().Pos(), column)); + skipNullColumns.push_back(atom); + if (seenColumns.insert(column).second) { + columns.push_back(atom); + } + } + + TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")}; + NYql::TNodeOnNodeOwnedMap bodyReplaces; + + bodyReplaces[sourceArg.Raw()] = + Build<TCoExtractMembers>(ctx, node.Pos()) + .Members(readRangesSource.Columns()) + .Input<TCoSkipNullMembers>() + .Input(replaceArg) + .Members().Add(skipNullColumns).Build() + .Build() + .Done().Ptr(); + + NYql::TNodeOnNodeOwnedMap inputsReplaces; + settings.SkipNullKeys.clear(); + + auto newSource = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos()) + .Table(readRangesSource.Table()) + .Columns().Add(columns).Build() + .Settings(settings.BuildNode(ctx, source.Settings().Pos())) + .RangesExpr(readRangesSource.RangesExpr()) + .ExplainPrompt(readRangesSource.ExplainPrompt()) + .Done(); + inputsReplaces[readRangesSource.Raw()] = newSource.Ptr(); + + TVector<TCoArgument> args; + for (auto arg : stage.Program().Args()) { + if (arg.Raw() == sourceArg.Raw()) { + args.push_back(replaceArg); + } else { + args.push_back(arg); + } + } + + return Build<TDqStage>(ctx, node.Pos()) + .Settings(stage.Settings()) + .Inputs(TExprList(ctx.ReplaceNodes(stage.Inputs().Ptr(), inputsReplaces))) + .Outputs(stage.Outputs()) + .Program<TCoLambda>() + .Args(args) + .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces)) + .Build() + .Done(); +} + +//FIXME: simplify KIKIMR-16987 TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TKqlReadTable>()) { return node; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp index bdb7029c64..a29f7df2bc 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp @@ -84,4 +84,23 @@ TCoAtomList BuildColumnsList(const TVector<TString>& columns, TPositionHandle po return BuildColumnsListImpl(columns, pos, ctx); } +bool AllowFuseJoinInputs(TExprBase node) { + if (!node.Maybe<TDqJoin>()) { + return false; + } + auto join = node.Cast<TDqJoin>(); + for (auto& input : {join.LeftInput(), join.RightInput()}) { + if (auto conn = input.Maybe<TDqConnection>()) { + auto stage = conn.Cast().Output().Stage(); + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { + return false; + } + } + } + } + return true; +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index f7b22dbb31..c3579fdbbb 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -8,6 +8,7 @@ namespace NKikimr::NKqp::NOpt { using namespace NYql; using namespace NYql::NNodes; +//FIXME: simplify KIKIMR-16987 TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { auto stage = node.Cast<TDqStage>(); TMaybe<size_t> tableSourceIndex; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index 4e0a239bd2..512251ddd9 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -24,11 +24,16 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpApplyLimitToReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx); @@ -43,4 +48,6 @@ NYql::NNodes::TExprBase KqpFloatUpStage(NYql::NNodes::TExprBase node, NYql::TExp NYql::NNodes::TExprBase KqpPropagatePrecomuteScalarRowset(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage); +bool AllowFuseJoinInputs(NYql::NNodes::TExprBase node); + } // NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp index 17fa9d1813..71b1d565cc 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp @@ -13,7 +13,16 @@ using namespace NYql::NNodes; // Temporary solution, should be replaced with constraints // copy-past from old engine algo: https://a.yandex-team.ru/arc_vcs/yql/providers/kikimr/yql_kikimr_opt.cpp?rev=e592a5a9509952f1c29f1ec02343dd4c05fe426d#L122 -TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + +using TTableData = std::pair<const NYql::TKikimrTableDescription*, NYql::TKqpReadTableSettings>; + +TExprBase KqpRemoveRedundantSortByPkBase( + TExprBase node, + TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx, + std::function<TMaybe<TTableData>(TExprBase)> tableAccessor, + std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput) +{ auto maybeSort = node.Maybe<TCoSort>(); auto maybeTopSort = node.Maybe<TCoTopSort>(); @@ -38,13 +47,6 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK input = flatmap.Input(); } - bool isReadTable = input.Maybe<TKqpReadTable>().IsValid(); - bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid(); - - if (!isReadTable && !isReadTableRanges) { - return node; - } - enum : ui32 { SortDirectionNone = 0, SortDirectionForward = 1, @@ -80,7 +82,12 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK } } - auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges)); + auto tableData = tableAccessor(input); + if (!tableData) { + return node; + } + auto& tableDesc = *tableData->first; + auto settings = tableData->second; auto checkKey = [keySelector, &tableDesc, &passthroughFields] (TExprBase key, ui32 index) { if (!key.Maybe<TCoMember>()) { @@ -125,8 +132,6 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK return node; } - auto settings = GetReadTableSettings(input, isReadTableRanges); - if (settings.Reverse) { return node; } @@ -134,12 +139,11 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK settings.SetReverse(); settings.SetSorted(); - input = BuildReadNode(input.Pos(), ctx, input, settings); + input = rebuildInput(input, settings); } else if (direction == SortDirectionForward) { if (olapTable) { - auto settings = GetReadTableSettings(input, isReadTableRanges); settings.SetSorted(); - input = BuildReadNode(input.Pos(), ctx, input, settings); + input = rebuildInput(input, settings); } } @@ -160,5 +164,92 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK } } +TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + return KqpRemoveRedundantSortByPkBase(node, ctx, kqpCtx, + [&](TExprBase input) -> TMaybe<TTableData> { + bool isReadTable = input.Maybe<TKqpReadTable>().IsValid(); + bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid(); + if (!isReadTable && !isReadTableRanges) { + return Nothing(); + } + auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges)); + auto settings = GetReadTableSettings(input, isReadTableRanges); + return TTableData{&tableDesc, settings}; + }, + [&](TExprBase input, NYql::TKqpReadTableSettings settings) { + return BuildReadNode(input.Pos(), ctx, input, settings); + }); +} + +//FIXME: simplify KIKIMR-16987 +NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource( + NYql::NNodes::TExprBase node, NYql::TExprContext& exprCtx, const TKqpOptimizeContext& kqpCtx) +{ + auto stage = node.Cast<TDqStage>(); + TMaybe<size_t> tableSourceIndex; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + auto input = stage.Inputs().Item(i); + if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { + tableSourceIndex = i; + } + } + if (!tableSourceIndex) { + return node; + } + + auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>(); + auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); + auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings()); + + auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path()); + + TVector<NYql::TKqpReadTableSettings> newSettings; + NYql::TNodeOnNodeOwnedMap replaces; + auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); + VisitExpr(stage.Program().Body().Ptr(), + [&](const TExprNode::TPtr& exprPtr) -> bool { + TExprBase expr(exprPtr); + if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) { + return false; + } + if (TCoSort::Match(expr.Raw()) || TCoTopSort::Match(expr.Raw())) { + auto newExpr = KqpRemoveRedundantSortByPkBase(expr, exprCtx, kqpCtx, + [&](TExprBase node) -> TMaybe<TTableData> { + if (node.Ptr() != node.Ptr()) { + return Nothing(); + } + return TTableData{&tableDesc, settings}; + }, + [&](TExprBase input, NYql::TKqpReadTableSettings settings) { + newSettings.push_back(settings); + return input; + }); + if (newExpr.Ptr() != expr.Ptr()) { + replaces[expr.Raw()] = newExpr.Ptr(); + } + } + return true; + }); + + if (newSettings) { + for (size_t i = 1; i < newSettings.size(); ++i) { + if (newSettings[0] != newSettings[i]) { + return node; + } + } + + auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos()) + .Table(readRangesSource.Table()) + .Columns(readRangesSource.Columns()) + .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos())) + .RangesExpr(readRangesSource.RangesExpr()) + .ExplainPrompt(readRangesSource.ExplainPrompt()) + .Done(); + replaces[readRangesSource.Raw()] = newSource.Ptr(); + } + + return TExprBase(exprCtx.ReplaceNodes(node.Ptr(), replaces)); +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index ab44faba02..942c6d47b9 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -156,8 +156,8 @@ void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& col } } -template <typename TProto> -void FillColumns(const TCoAtomList& columns, const TKikimrTableMetadata& tableMeta, +template <typename TProto, typename TContainer> +void FillColumns(const TContainer& columns, const TKikimrTableMetadata& tableMeta, TProto& opProto, bool allowSystemColumns) { for (const auto& columnNode : columns) { @@ -518,6 +518,17 @@ public: for (auto member : programParams->GetItems()) { inputsParams.push_back(member); } + + std::sort(inputsParams.begin(), inputsParams.end(), + [](const TItemExprType* first, const TItemExprType* second) { + return first->GetName() < second->GetName(); + }); + inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), + [](const TItemExprType* first, const TItemExprType* second) { + return first->GetName() == second->GetName(); + }), + inputsParams.end()); + return ctx.MakeType<TStructExprType>(inputsParams); } } @@ -790,14 +801,24 @@ private: auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata; YQL_ENSURE(tableMeta); - FillColumns(settings.Columns().Cast(), *tableMeta, readProto, allowSystemColumns); + { + + THashMap<TString, const TExprNode*> columnsMap; + for (auto item : settings.Columns().Cast()) { + columnsMap[item.StringValue()] = item.Raw(); + } + TVector<TCoAtom> columns; + auto type = settings.Raw()->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>(); + for (auto item : type->GetItems()) { + columns.push_back(TCoAtom(columnsMap.at(item->GetName()))); + } + FillColumns(columns, *tableMeta, readProto, allowSystemColumns); + } auto readSettings = TKqpReadTableSettings::Parse(settings.Settings().Cast()); readProto.SetReverse(readSettings.Reverse); readProto.SetSorted(readSettings.Sorted); - for (auto&& key : readSettings.SkipNullKeys) { - readProto.AddSkipNullKeys(key); - } + YQL_ENSURE(readSettings.SkipNullKeys.empty()); auto ranges = settings.RangesExpr().template Maybe<TCoParameter>(); if (ranges.IsValid()) { diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 5b227e0ec3..65a2a00801 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -48,7 +48,7 @@ public: TSmallVec<TSerializedCellVec> Points; TOwnedCellVec LastKey; - ui32 FirstUnprocessedRequest = 0; + TMaybe<ui32> FirstUnprocessedRequest; TMaybe<ui32> ReadId; ui64 TabletId; @@ -57,9 +57,9 @@ public: bool NeedResolve = false; - void CopyContinuationToken(TShardState* state) { + void AssignContinuationToken(TShardState* state) { if (state->LastKey.DataSize() != 0) { - LastKey = state->LastKey; + LastKey = std::move(state->LastKey); } FirstUnprocessedRequest = state->FirstUnprocessedRequest; } @@ -136,7 +136,8 @@ public: void FillUnprocessedRanges( TVector<TSerializedTableRange>& result, - TConstArrayRef<NScheme::TTypeInfo> keyTypes) const + TConstArrayRef<NScheme::TTypeInfo> keyTypes, + bool reverse) const { // Form new vector. Skip ranges already read. bool lastKeyEmpty = LastKey.DataSize() == 0; @@ -145,32 +146,50 @@ public: YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key"); } - auto rangeIt = Ranges.begin() + FirstUnprocessedRequest; + if (reverse) { + auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(Ranges.size()); - if (!lastKeyEmpty) { - // It is range, where read was interrupted. Restart operation from last read key. - result.emplace_back(std::move(TSerializedTableRange( - TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive - ))); - ++rangeIt; - } + if (!lastKeyEmpty) { + // It is range, where read was interrupted. Restart operation from last read key. + result.emplace_back(std::move(TSerializedTableRange( + rangeIt->From.GetBuffer(), TSerializedCellVec::Serialize(LastKey), rangeIt->ToInclusive, false + ))); + } + result.insert(result.begin(), Ranges.begin(), rangeIt); + } else { + auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(0); + + if (!lastKeyEmpty) { + // It is range, where read was interrupted. Restart operation from last read key. + result.emplace_back(std::move(TSerializedTableRange( + TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive + ))); + ++rangeIt; + } - // And push all others - result.insert(result.end(), rangeIt, Ranges.end()); + // And push all others + result.insert(result.end(), rangeIt, Ranges.end()); + } for (auto& range : result) { MakePrefixRange(range, keyTypes.size()); } } - void FillUnprocessedPoints(TVector<TSerializedCellVec>& result) const { - result.insert(result.begin(), Points.begin() + FirstUnprocessedRequest, Points.end()); + void FillUnprocessedPoints(TVector<TSerializedCellVec>& result, bool reverse) const { + if (reverse) { + auto it = FirstUnprocessedRequest ? Points.begin() + *FirstUnprocessedRequest + 1 : Points.end(); + result.insert(result.begin(), Points.begin(), it); + } else { + auto it = FirstUnprocessedRequest ? Points.begin() + *FirstUnprocessedRequest : Points.begin(); + result.insert(result.begin(), it, Points.end()); + } } - void FillEvRead(TEvDataShard::TEvRead& ev, TConstArrayRef<NScheme::TTypeInfo> keyTypes) { + void FillEvRead(TEvDataShard::TEvRead& ev, TConstArrayRef<NScheme::TTypeInfo> keyTypes, bool reversed) { if (Ranges.empty()) { - FillUnprocessedPoints(ev.Keys); + FillUnprocessedPoints(ev.Keys, reversed); } else { - FillUnprocessedRanges(ev.Ranges, keyTypes); + FillUnprocessedRanges(ev.Ranges, keyTypes, reversed); } } @@ -300,9 +319,15 @@ public: CA_LOG_D("BEFORE: " << PendingShards.Size() << "." << RunningReads()); isFirst = false; } - auto state = THolder<TShardState>(PendingShards.PopFront()); - InFlightShards.PushFront(state.Get()); - StartRead(state.Release()); + if (Settings.GetReverse()) { + auto state = THolder<TShardState>(PendingShards.PopBack()); + InFlightShards.PushBack(state.Get()); + StartRead(state.Release()); + } else { + auto state = THolder<TShardState>(PendingShards.PopFront()); + InFlightShards.PushFront(state.Get()); + StartRead(state.Release()); + } } if (!isFirst) { CA_LOG_D("AFTER: " << PendingShards.Size() << "." << RunningReads()); @@ -434,8 +459,8 @@ public: auto newShard = MakeHolder<TShardState>(partition.ShardId); - if (idx == 0 && state) { - newShard->CopyContinuationToken(state.Get()); + if (((!Settings.GetReverse() && idx == 0) || (Settings.GetReverse() && idx + 1 == keyDesc->GetPartitions().size())) && state) { + newShard->AssignContinuationToken(state.Get()); } for (ui64 j = i; j < state->Ranges.size(); ++j) { @@ -464,12 +489,14 @@ public: } YQL_ENSURE(!newShards.empty()); - for (int i = newShards.ysize() - 1; i >= 0; --i) { - PendingShards.PushFront(newShards[i].Release()); - } - - if (!state->LastKey.empty()) { - PendingShards.Front()->LastKey = std::move(state->LastKey); + if (Settings.GetReverse()) { + for (size_t i = 0; i < newShards.size(); ++i) { + PendingShards.PushBack(newShards[i].Release()); + } + } else { + for (int i = newShards.ysize() - 1; i >= 0; --i) { + PendingShards.PushFront(newShards[i].Release()); + } } if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE) @@ -541,7 +568,7 @@ public: THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead()); auto& record = ev->Record; - state->FillEvRead(*ev, KeyColumnTypes); + state->FillEvRead(*ev, KeyColumnTypes, Settings.GetReverse()); for (const auto& column : Settings.GetColumns()) { if (!IsSystemColumn(column.GetId())) { record.AddColumns(column.GetId()); diff --git a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp index df940da9d0..f4155d060c 100644 --- a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp @@ -70,6 +70,8 @@ TExprNode::TPtr ApplyExtractMembersToSkipNullMembers(const TExprNode::TPtr& node if (hasMember) { filteredMembers.push_back(x.Ptr()); + } else { + return nullptr; } } |