aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-30 21:22:25 +0300
committerssmike <ssmike@ydb.tech>2023-01-30 21:22:25 +0300
commit6e0c0bbc2c14b56205ce2293a951c7edd06d7587 (patch)
tree73a1f4fef6e01f1d6998225e28066eb2d66f1ee0
parentf68f245e2281c24c14d352eff67c6e639faa150e (diff)
downloadydb-6e0c0bbc2c14b56205ce2293a951c7edd06d7587.tar.gz
Support reverse and skipnullkeys for source-based reads
-rw-r--r--ydb/core/kqp/common/kqp_yql.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h8
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp17
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp88
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp19
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp1
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp119
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp33
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp89
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_extr_members.cpp2
11 files changed, 326 insertions, 59 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index eb2d22948e..0f5236461d 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -59,6 +59,8 @@ struct TKqpReadTableSettings {
void SetReverse() { Reverse = true; }
void SetSorted() { Sorted = true; }
+ bool operator == (const TKqpReadTableSettings&) const = default;
+
static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableBase& node);
static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableRangesBase& node);
static TKqpReadTableSettings Parse(const NNodes::TCoNameValueTupleList& node);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 91e5a335be..34dea52606 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -670,15 +670,12 @@ protected:
auto columns = BuildKqpColumns(source, table);
auto partitions = PrunePartitions(TableKeys, source, stageInfo, HolderFactory(), TypeEnv());
- bool reverse = false;
ui64 itemsLimit = 0;
TString itemsLimitParamName;
NYql::NDqProto::TData itemsLimitBytes;
NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
- YQL_ENSURE(!source.GetReverse(), "reverse not supported yet");
-
for (auto& [shardId, shardInfo] : partitions) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
@@ -696,9 +693,6 @@ protected:
NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
FillTableMeta(stageInfo, settings.MutableTable());
- for (auto& key : source.GetSkipNullKeys()) {
- settings.AddSkipNullKeys(key);
- }
for (auto& keyColumn : keyTypes) {
settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId()));
@@ -727,7 +721,7 @@ protected:
}
shardInfo.KeyReadRanges->SerializeTo(&settings);
- settings.SetReverse(reverse);
+ settings.SetReverse(source.GetReverse());
settings.SetSorted(source.GetSorted());
settings.SetShardIdHint(shardId);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index d9a1ba27e3..90ec709c7a 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -34,6 +34,7 @@ public:
AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages));
AddHandler(0, [](const TExprNode* node) { return TCoSort::Match(node) || TCoTopSort::Match(node); },
HNDL(RemoveRedundantSortByPk));
+ AddHandler(0, &TDqStage::Match, HNDL(RemoveRedundantSortByPkOverSource));
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter));
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage));
@@ -106,6 +107,8 @@ public:
AddHandler(1, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<true>));
AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>));
AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>));
+
+ AddHandler(2, &TDqStage::Match, HNDL(ExpandNullMembersForReadTableSource));
#undef HNDL
SetGlobal(1u);
@@ -137,12 +140,24 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> RemoveRedundantSortByPkOverSource(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpRemoveRedundantSortByPkOverSource(node, ctx, KqpCtx);
+ DumpAppliedRule("RemoveRedundantSortByPkOverSource", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> RemoveRedundantSortByPk(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRemoveRedundantSortByPk(node, ctx, KqpCtx);
DumpAppliedRule("RemoveRedundantSortByPk", node.Ptr(), output.Ptr(), ctx);
return output;
}
+ TMaybeNode<TExprBase> ExpandNullMembersForReadTableSource(TExprBase node, TExprContext& ctx) {
+ TExprBase output = ExpandSkipNullMembersForReadTableSource(node, ctx, KqpCtx);
+ DumpAppliedRule("ExpandSkipNullMembersForReadTableSource", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> ApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpApplyLimitToReadTableSource(node, ctx, KqpCtx);
DumpAppliedRule("ApplyLimitToReadTableSource", node.Ptr(), output.Ptr(), ctx);
@@ -329,7 +344,7 @@ protected:
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
- TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, /*pushLeftStage =*/ !KqpCtx.IsDataQuery());
+ TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, /*pushLeftStage =*/ !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node));
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index c463db658a..7a9d8cd8f7 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -54,6 +54,94 @@ TMaybeNode<TDqPhyPrecompute> BuildLookupKeysPrecompute(const TExprBase& input, T
.Done();
}
+// ReadRangesSource can't deal with skipnullkeys, so we should expand it to (ExtractMembers (SkipNullKeys))
+//FIXME: simplify KIKIMR-16987
+NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext&) {
+ auto stage = node.Cast<TDqStage>();
+ TMaybe<size_t> tableSourceIndex;
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ auto input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
+ tableSourceIndex = i;
+ }
+ }
+ if (!tableSourceIndex) {
+ return node;
+ }
+
+ auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>();
+ auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
+ auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings());
+
+ if (settings.SkipNullKeys.empty()) {
+ return node;
+ }
+
+ auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
+
+ THashSet<TString> seenColumns;
+ TVector<TCoAtom> columns;
+ TVector<TCoAtom> skipNullColumns;
+ for (size_t i = 0; i < readRangesSource.Columns().Size(); ++i) {
+ auto atom = readRangesSource.Columns().Item(i);
+ auto column = atom.StringValue();
+ if (seenColumns.insert(column).second) {
+ columns.push_back(atom);
+ }
+ }
+ for (auto& column : settings.SkipNullKeys) {
+ TCoAtom atom(ctx.NewAtom(readRangesSource.Settings().Pos(), column));
+ skipNullColumns.push_back(atom);
+ if (seenColumns.insert(column).second) {
+ columns.push_back(atom);
+ }
+ }
+
+ TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")};
+ NYql::TNodeOnNodeOwnedMap bodyReplaces;
+
+ bodyReplaces[sourceArg.Raw()] =
+ Build<TCoExtractMembers>(ctx, node.Pos())
+ .Members(readRangesSource.Columns())
+ .Input<TCoSkipNullMembers>()
+ .Input(replaceArg)
+ .Members().Add(skipNullColumns).Build()
+ .Build()
+ .Done().Ptr();
+
+ NYql::TNodeOnNodeOwnedMap inputsReplaces;
+ settings.SkipNullKeys.clear();
+
+ auto newSource = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
+ .Table(readRangesSource.Table())
+ .Columns().Add(columns).Build()
+ .Settings(settings.BuildNode(ctx, source.Settings().Pos()))
+ .RangesExpr(readRangesSource.RangesExpr())
+ .ExplainPrompt(readRangesSource.ExplainPrompt())
+ .Done();
+ inputsReplaces[readRangesSource.Raw()] = newSource.Ptr();
+
+ TVector<TCoArgument> args;
+ for (auto arg : stage.Program().Args()) {
+ if (arg.Raw() == sourceArg.Raw()) {
+ args.push_back(replaceArg);
+ } else {
+ args.push_back(arg);
+ }
+ }
+
+ return Build<TDqStage>(ctx, node.Pos())
+ .Settings(stage.Settings())
+ .Inputs(TExprList(ctx.ReplaceNodes(stage.Inputs().Ptr(), inputsReplaces)))
+ .Outputs(stage.Outputs())
+ .Program<TCoLambda>()
+ .Args(args)
+ .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces))
+ .Build()
+ .Done();
+}
+
+//FIXME: simplify KIKIMR-16987
TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TKqlReadTable>()) {
return node;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
index bdb7029c64..a29f7df2bc 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
@@ -84,4 +84,23 @@ TCoAtomList BuildColumnsList(const TVector<TString>& columns, TPositionHandle po
return BuildColumnsListImpl(columns, pos, ctx);
}
+bool AllowFuseJoinInputs(TExprBase node) {
+ if (!node.Maybe<TDqJoin>()) {
+ return false;
+ }
+ auto join = node.Cast<TDqJoin>();
+ for (auto& input : {join.LeftInput(), join.RightInput()}) {
+ if (auto conn = input.Maybe<TDqConnection>()) {
+ auto stage = conn.Cast().Output().Stage();
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ auto input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+}
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index f7b22dbb31..c3579fdbbb 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -8,6 +8,7 @@ namespace NKikimr::NKqp::NOpt {
using namespace NYql;
using namespace NYql::NNodes;
+//FIXME: simplify KIKIMR-16987
TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto stage = node.Cast<TDqStage>();
TMaybe<size_t> tableSourceIndex;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index 4e0a239bd2..512251ddd9 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -24,11 +24,16 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
+NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpApplyLimitToReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx);
NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
+NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx);
@@ -43,4 +48,6 @@ NYql::NNodes::TExprBase KqpFloatUpStage(NYql::NNodes::TExprBase node, NYql::TExp
NYql::NNodes::TExprBase KqpPropagatePrecomuteScalarRowset(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage);
+bool AllowFuseJoinInputs(NYql::NNodes::TExprBase node);
+
} // NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index 17fa9d1813..71b1d565cc 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -13,7 +13,16 @@ using namespace NYql::NNodes;
// Temporary solution, should be replaced with constraints
// copy-past from old engine algo: https://a.yandex-team.ru/arc_vcs/yql/providers/kikimr/yql_kikimr_opt.cpp?rev=e592a5a9509952f1c29f1ec02343dd4c05fe426d#L122
-TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+
+using TTableData = std::pair<const NYql::TKikimrTableDescription*, NYql::TKqpReadTableSettings>;
+
+TExprBase KqpRemoveRedundantSortByPkBase(
+ TExprBase node,
+ TExprContext& ctx,
+ const TKqpOptimizeContext& kqpCtx,
+ std::function<TMaybe<TTableData>(TExprBase)> tableAccessor,
+ std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput)
+{
auto maybeSort = node.Maybe<TCoSort>();
auto maybeTopSort = node.Maybe<TCoTopSort>();
@@ -38,13 +47,6 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
input = flatmap.Input();
}
- bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
- bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid();
-
- if (!isReadTable && !isReadTableRanges) {
- return node;
- }
-
enum : ui32 {
SortDirectionNone = 0,
SortDirectionForward = 1,
@@ -80,7 +82,12 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
}
}
- auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges));
+ auto tableData = tableAccessor(input);
+ if (!tableData) {
+ return node;
+ }
+ auto& tableDesc = *tableData->first;
+ auto settings = tableData->second;
auto checkKey = [keySelector, &tableDesc, &passthroughFields] (TExprBase key, ui32 index) {
if (!key.Maybe<TCoMember>()) {
@@ -125,8 +132,6 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
return node;
}
- auto settings = GetReadTableSettings(input, isReadTableRanges);
-
if (settings.Reverse) {
return node;
}
@@ -134,12 +139,11 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
settings.SetReverse();
settings.SetSorted();
- input = BuildReadNode(input.Pos(), ctx, input, settings);
+ input = rebuildInput(input, settings);
} else if (direction == SortDirectionForward) {
if (olapTable) {
- auto settings = GetReadTableSettings(input, isReadTableRanges);
settings.SetSorted();
- input = BuildReadNode(input.Pos(), ctx, input, settings);
+ input = rebuildInput(input, settings);
}
}
@@ -160,5 +164,92 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
}
}
+TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ return KqpRemoveRedundantSortByPkBase(node, ctx, kqpCtx,
+ [&](TExprBase input) -> TMaybe<TTableData> {
+ bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
+ bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid();
+ if (!isReadTable && !isReadTableRanges) {
+ return Nothing();
+ }
+ auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges));
+ auto settings = GetReadTableSettings(input, isReadTableRanges);
+ return TTableData{&tableDesc, settings};
+ },
+ [&](TExprBase input, NYql::TKqpReadTableSettings settings) {
+ return BuildReadNode(input.Pos(), ctx, input, settings);
+ });
+}
+
+//FIXME: simplify KIKIMR-16987
+NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
+ NYql::NNodes::TExprBase node, NYql::TExprContext& exprCtx, const TKqpOptimizeContext& kqpCtx)
+{
+ auto stage = node.Cast<TDqStage>();
+ TMaybe<size_t> tableSourceIndex;
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ auto input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
+ tableSourceIndex = i;
+ }
+ }
+ if (!tableSourceIndex) {
+ return node;
+ }
+
+ auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>();
+ auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
+ auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings());
+
+ auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path());
+
+ TVector<NYql::TKqpReadTableSettings> newSettings;
+ NYql::TNodeOnNodeOwnedMap replaces;
+ auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
+ VisitExpr(stage.Program().Body().Ptr(),
+ [&](const TExprNode::TPtr& exprPtr) -> bool {
+ TExprBase expr(exprPtr);
+ if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) {
+ return false;
+ }
+ if (TCoSort::Match(expr.Raw()) || TCoTopSort::Match(expr.Raw())) {
+ auto newExpr = KqpRemoveRedundantSortByPkBase(expr, exprCtx, kqpCtx,
+ [&](TExprBase node) -> TMaybe<TTableData> {
+ if (node.Ptr() != node.Ptr()) {
+ return Nothing();
+ }
+ return TTableData{&tableDesc, settings};
+ },
+ [&](TExprBase input, NYql::TKqpReadTableSettings settings) {
+ newSettings.push_back(settings);
+ return input;
+ });
+ if (newExpr.Ptr() != expr.Ptr()) {
+ replaces[expr.Raw()] = newExpr.Ptr();
+ }
+ }
+ return true;
+ });
+
+ if (newSettings) {
+ for (size_t i = 1; i < newSettings.size(); ++i) {
+ if (newSettings[0] != newSettings[i]) {
+ return node;
+ }
+ }
+
+ auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos())
+ .Table(readRangesSource.Table())
+ .Columns(readRangesSource.Columns())
+ .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos()))
+ .RangesExpr(readRangesSource.RangesExpr())
+ .ExplainPrompt(readRangesSource.ExplainPrompt())
+ .Done();
+ replaces[readRangesSource.Raw()] = newSource.Ptr();
+ }
+
+ return TExprBase(exprCtx.ReplaceNodes(node.Ptr(), replaces));
+}
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index ab44faba02..942c6d47b9 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -156,8 +156,8 @@ void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& col
}
}
-template <typename TProto>
-void FillColumns(const TCoAtomList& columns, const TKikimrTableMetadata& tableMeta,
+template <typename TProto, typename TContainer>
+void FillColumns(const TContainer& columns, const TKikimrTableMetadata& tableMeta,
TProto& opProto, bool allowSystemColumns)
{
for (const auto& columnNode : columns) {
@@ -518,6 +518,17 @@ public:
for (auto member : programParams->GetItems()) {
inputsParams.push_back(member);
}
+
+ std::sort(inputsParams.begin(), inputsParams.end(),
+ [](const TItemExprType* first, const TItemExprType* second) {
+ return first->GetName() < second->GetName();
+ });
+ inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(),
+ [](const TItemExprType* first, const TItemExprType* second) {
+ return first->GetName() == second->GetName();
+ }),
+ inputsParams.end());
+
return ctx.MakeType<TStructExprType>(inputsParams);
}
}
@@ -790,14 +801,24 @@ private:
auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
YQL_ENSURE(tableMeta);
- FillColumns(settings.Columns().Cast(), *tableMeta, readProto, allowSystemColumns);
+ {
+
+ THashMap<TString, const TExprNode*> columnsMap;
+ for (auto item : settings.Columns().Cast()) {
+ columnsMap[item.StringValue()] = item.Raw();
+ }
+ TVector<TCoAtom> columns;
+ auto type = settings.Raw()->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
+ for (auto item : type->GetItems()) {
+ columns.push_back(TCoAtom(columnsMap.at(item->GetName())));
+ }
+ FillColumns(columns, *tableMeta, readProto, allowSystemColumns);
+ }
auto readSettings = TKqpReadTableSettings::Parse(settings.Settings().Cast());
readProto.SetReverse(readSettings.Reverse);
readProto.SetSorted(readSettings.Sorted);
- for (auto&& key : readSettings.SkipNullKeys) {
- readProto.AddSkipNullKeys(key);
- }
+ YQL_ENSURE(readSettings.SkipNullKeys.empty());
auto ranges = settings.RangesExpr().template Maybe<TCoParameter>();
if (ranges.IsValid()) {
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 5b227e0ec3..65a2a00801 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -48,7 +48,7 @@ public:
TSmallVec<TSerializedCellVec> Points;
TOwnedCellVec LastKey;
- ui32 FirstUnprocessedRequest = 0;
+ TMaybe<ui32> FirstUnprocessedRequest;
TMaybe<ui32> ReadId;
ui64 TabletId;
@@ -57,9 +57,9 @@ public:
bool NeedResolve = false;
- void CopyContinuationToken(TShardState* state) {
+ void AssignContinuationToken(TShardState* state) {
if (state->LastKey.DataSize() != 0) {
- LastKey = state->LastKey;
+ LastKey = std::move(state->LastKey);
}
FirstUnprocessedRequest = state->FirstUnprocessedRequest;
}
@@ -136,7 +136,8 @@ public:
void FillUnprocessedRanges(
TVector<TSerializedTableRange>& result,
- TConstArrayRef<NScheme::TTypeInfo> keyTypes) const
+ TConstArrayRef<NScheme::TTypeInfo> keyTypes,
+ bool reverse) const
{
// Form new vector. Skip ranges already read.
bool lastKeyEmpty = LastKey.DataSize() == 0;
@@ -145,32 +146,50 @@ public:
YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key");
}
- auto rangeIt = Ranges.begin() + FirstUnprocessedRequest;
+ if (reverse) {
+ auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(Ranges.size());
- if (!lastKeyEmpty) {
- // It is range, where read was interrupted. Restart operation from last read key.
- result.emplace_back(std::move(TSerializedTableRange(
- TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
- )));
- ++rangeIt;
- }
+ if (!lastKeyEmpty) {
+ // It is range, where read was interrupted. Restart operation from last read key.
+ result.emplace_back(std::move(TSerializedTableRange(
+ rangeIt->From.GetBuffer(), TSerializedCellVec::Serialize(LastKey), rangeIt->ToInclusive, false
+ )));
+ }
+ result.insert(result.begin(), Ranges.begin(), rangeIt);
+ } else {
+ auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(0);
+
+ if (!lastKeyEmpty) {
+ // It is range, where read was interrupted. Restart operation from last read key.
+ result.emplace_back(std::move(TSerializedTableRange(
+ TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
+ )));
+ ++rangeIt;
+ }
- // And push all others
- result.insert(result.end(), rangeIt, Ranges.end());
+ // And push all others
+ result.insert(result.end(), rangeIt, Ranges.end());
+ }
for (auto& range : result) {
MakePrefixRange(range, keyTypes.size());
}
}
- void FillUnprocessedPoints(TVector<TSerializedCellVec>& result) const {
- result.insert(result.begin(), Points.begin() + FirstUnprocessedRequest, Points.end());
+ void FillUnprocessedPoints(TVector<TSerializedCellVec>& result, bool reverse) const {
+ if (reverse) {
+ auto it = FirstUnprocessedRequest ? Points.begin() + *FirstUnprocessedRequest + 1 : Points.end();
+ result.insert(result.begin(), Points.begin(), it);
+ } else {
+ auto it = FirstUnprocessedRequest ? Points.begin() + *FirstUnprocessedRequest : Points.begin();
+ result.insert(result.begin(), it, Points.end());
+ }
}
- void FillEvRead(TEvDataShard::TEvRead& ev, TConstArrayRef<NScheme::TTypeInfo> keyTypes) {
+ void FillEvRead(TEvDataShard::TEvRead& ev, TConstArrayRef<NScheme::TTypeInfo> keyTypes, bool reversed) {
if (Ranges.empty()) {
- FillUnprocessedPoints(ev.Keys);
+ FillUnprocessedPoints(ev.Keys, reversed);
} else {
- FillUnprocessedRanges(ev.Ranges, keyTypes);
+ FillUnprocessedRanges(ev.Ranges, keyTypes, reversed);
}
}
@@ -300,9 +319,15 @@ public:
CA_LOG_D("BEFORE: " << PendingShards.Size() << "." << RunningReads());
isFirst = false;
}
- auto state = THolder<TShardState>(PendingShards.PopFront());
- InFlightShards.PushFront(state.Get());
- StartRead(state.Release());
+ if (Settings.GetReverse()) {
+ auto state = THolder<TShardState>(PendingShards.PopBack());
+ InFlightShards.PushBack(state.Get());
+ StartRead(state.Release());
+ } else {
+ auto state = THolder<TShardState>(PendingShards.PopFront());
+ InFlightShards.PushFront(state.Get());
+ StartRead(state.Release());
+ }
}
if (!isFirst) {
CA_LOG_D("AFTER: " << PendingShards.Size() << "." << RunningReads());
@@ -434,8 +459,8 @@ public:
auto newShard = MakeHolder<TShardState>(partition.ShardId);
- if (idx == 0 && state) {
- newShard->CopyContinuationToken(state.Get());
+ if (((!Settings.GetReverse() && idx == 0) || (Settings.GetReverse() && idx + 1 == keyDesc->GetPartitions().size())) && state) {
+ newShard->AssignContinuationToken(state.Get());
}
for (ui64 j = i; j < state->Ranges.size(); ++j) {
@@ -464,12 +489,14 @@ public:
}
YQL_ENSURE(!newShards.empty());
- for (int i = newShards.ysize() - 1; i >= 0; --i) {
- PendingShards.PushFront(newShards[i].Release());
- }
-
- if (!state->LastKey.empty()) {
- PendingShards.Front()->LastKey = std::move(state->LastKey);
+ if (Settings.GetReverse()) {
+ for (size_t i = 0; i < newShards.size(); ++i) {
+ PendingShards.PushBack(newShards[i].Release());
+ }
+ } else {
+ for (int i = newShards.ysize() - 1; i >= 0; --i) {
+ PendingShards.PushFront(newShards[i].Release());
+ }
}
if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)
@@ -541,7 +568,7 @@ public:
THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead());
auto& record = ev->Record;
- state->FillEvRead(*ev, KeyColumnTypes);
+ state->FillEvRead(*ev, KeyColumnTypes, Settings.GetReverse());
for (const auto& column : Settings.GetColumns()) {
if (!IsSystemColumn(column.GetId())) {
record.AddColumns(column.GetId());
diff --git a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
index df940da9d0..f4155d060c 100644
--- a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
@@ -70,6 +70,8 @@ TExprNode::TPtr ApplyExtractMembersToSkipNullMembers(const TExprNode::TPtr& node
if (hasMember) {
filteredMembers.push_back(x.Ptr());
+ } else {
+ return nullptr;
}
}