diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-02 22:45:33 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-02 22:45:33 +0300 |
commit | 723dc9b7f7c452d7dcb7b3ae01338a7431a98dc2 (patch) | |
tree | a49c41ede6b38b6a26e5883d0d1060311a03428f | |
parent | b3fbf80af4a86528c65131030667348a89a3a856 (diff) | |
download | ydb-723dc9b7f7c452d7dcb7b3ae01338a7431a98dc2.tar.gz |
[kqp][optimizer] add SortedDataConstraintSettingName to ReadRanges KIKIMR-14818
ref:d7f387edfd36de4d3e8f39480fb5d00cef81463e
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/compile/kqp_compile.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_tasks_graph.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp | 12 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 3 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 |
8 files changed, 33 insertions, 4 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 4cd92ee10e1..e98b1ea63c1 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -145,6 +145,8 @@ TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) { } else if (name == TKqpReadTableSettings::ReverseSettingName) { YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); settings.Reverse = true; + } else if (name == TKqpReadTableSettings::SortedSettingName) { + YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); } else { YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'"); } @@ -201,6 +203,14 @@ NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx .Done()); } + if (Sorted) { + settings.emplace_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name() + .Build(SortedSettingName) + .Done()); + } + return Build<TCoNameValueTupleList>(ctx, pos) .Add(settings) .Done(); diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 7de6cffe397..1f75891bd7c 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -44,14 +44,17 @@ struct TKqpReadTableSettings { static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys"; static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit"; static constexpr TStringBuf ReverseSettingName = "Reverse"; + static constexpr TStringBuf SortedSettingName = "Sorted"; TVector<TString> SkipNullKeys; TExprNode::TPtr ItemsLimit; bool Reverse = false; + bool Sorted = false; void AddSkipNullKey(const TString& key); void SetItemsLimit(const TExprNode::TPtr& expr) { ItemsLimit = expr; } void SetReverse() { Reverse = true; } + void SetSorted() { Sorted = true; } static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableBase& node); static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableRangesBase& node); diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp index 133034043a9..7836fff3573 100644 --- a/ydb/core/kqp/compile/kqp_compile.cpp +++ b/ydb/core/kqp/compile/kqp_compile.cpp @@ -268,6 +268,10 @@ void FillReadRanges(const TReader& read, const TKikimrTableMetadata&, TProto& re } } + if constexpr (std::is_same_v<TProto, NKqpProto::TKqpPhyOpReadOlapRanges>) { + readProto.SetSorted(settings.Sorted); + } + readProto.SetReverse(settings.Reverse); } diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index a3964f788fe..02c5db30640 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -462,6 +462,7 @@ private: bool reverse = false; ui64 itemsLimit = 0; + bool sortedScanFlag = true; TString itemsLimitParamName; NDqProto::TData itemsLimitBytes; NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; @@ -477,6 +478,7 @@ private: stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(), op.GetReadRange().GetSkipNullKeys().end()); } else if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + sortedScanFlag = op.GetReadOlapRange().GetSorted(); reverse = op.GetReadOlapRange().GetReverse(); ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); @@ -731,6 +733,7 @@ private: if (!task.Meta.Reads->empty()) { protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse); protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); + protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted); if (tableInfo.TableKind == ETableKind::Olap) { auto* olapProgram = protoTaskMeta.MutableOlapProgram(); diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h index 7c356ab623f..582be0f0397 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer/kqp_tasks_graph.h @@ -123,6 +123,7 @@ struct TTaskMeta { struct TReadInfo { ui64 ItemsLimit = 0; bool Reverse = false; + bool Sorted = false; TKqpOlapProgram OlapProgram; }; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp index 97b416f7a8f..68073db5921 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp @@ -119,11 +119,8 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK } } + bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap; if (direction == SortDirectionReverse) { -// if (!config.AllowReverseRange()) { -// return node; -// } - bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap; if (!olapTable && kqpCtx.IsScanQuery()) { return node; } @@ -135,8 +132,15 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK } settings.SetReverse(); + settings.SetSorted(); input = BuildReadNode(input.Pos(), ctx, input, settings); + } else if (SortDirectionForward) { + if (olapTable) { + auto settings = GetReadTableSettings(input, isReadTableRanges); + settings.SetSorted(); + input = BuildReadNode(input.Pos(), ctx, input, settings); + } } if (maybeFlatmap) { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index df7b9796939..cae8f32e05e 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -143,6 +143,9 @@ message TKqpPhyOpReadOlapRanges { * it is done a bit later in executor, so we need separate field for parameter names. */ repeated string OlapProgramParameterNames = 5; + // Sorted sign which indicates that read operation should return a sorted data out of the + // dedicated actors or we can relax constraints here and return unsorted data. + bool Sorted = 6; } message TKqpPhyOpReadRanges { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index b43410e07a7..4c4179fef3f 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -213,6 +213,7 @@ message TKqpTransaction { reserved 8; // optional bytes ProcessProgram = 8; optional EScanDataFormat DataFormat = 9; optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables + optional bool Sorted = 11; } optional EKqpTransactionType Type = 1; |