aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-02 22:45:33 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-02 22:45:33 +0300
commit723dc9b7f7c452d7dcb7b3ae01338a7431a98dc2 (patch)
treea49c41ede6b38b6a26e5883d0d1060311a03428f
parentb3fbf80af4a86528c65131030667348a89a3a856 (diff)
downloadydb-723dc9b7f7c452d7dcb7b3ae01338a7431a98dc2.tar.gz
[kqp][optimizer] add SortedDataConstraintSettingName to ReadRanges KIKIMR-14818
ref:d7f387edfd36de4d3e8f39480fb5d00cef81463e
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp10
-rw-r--r--ydb/core/kqp/common/kqp_yql.h3
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp4
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp3
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.h1
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp12
-rw-r--r--ydb/core/protos/kqp_physical.proto3
-rw-r--r--ydb/core/protos/tx_datashard.proto1
8 files changed, 33 insertions, 4 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index 4cd92ee10e1..e98b1ea63c1 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -145,6 +145,8 @@ TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) {
} else if (name == TKqpReadTableSettings::ReverseSettingName) {
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
settings.Reverse = true;
+ } else if (name == TKqpReadTableSettings::SortedSettingName) {
+ YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
} else {
YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'");
}
@@ -201,6 +203,14 @@ NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx
.Done());
}
+ if (Sorted) {
+ settings.emplace_back(
+ Build<TCoNameValueTuple>(ctx, pos)
+ .Name()
+ .Build(SortedSettingName)
+ .Done());
+ }
+
return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
.Done();
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 7de6cffe397..1f75891bd7c 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -44,14 +44,17 @@ struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit";
static constexpr TStringBuf ReverseSettingName = "Reverse";
+ static constexpr TStringBuf SortedSettingName = "Sorted";
TVector<TString> SkipNullKeys;
TExprNode::TPtr ItemsLimit;
bool Reverse = false;
+ bool Sorted = false;
void AddSkipNullKey(const TString& key);
void SetItemsLimit(const TExprNode::TPtr& expr) { ItemsLimit = expr; }
void SetReverse() { Reverse = true; }
+ void SetSorted() { Sorted = true; }
static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableBase& node);
static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableRangesBase& node);
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index 133034043a9..7836fff3573 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -268,6 +268,10 @@ void FillReadRanges(const TReader& read, const TKikimrTableMetadata&, TProto& re
}
}
+ if constexpr (std::is_same_v<TProto, NKqpProto::TKqpPhyOpReadOlapRanges>) {
+ readProto.SetSorted(settings.Sorted);
+ }
+
readProto.SetReverse(settings.Reverse);
}
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index a3964f788fe..02c5db30640 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -462,6 +462,7 @@ private:
bool reverse = false;
ui64 itemsLimit = 0;
+ bool sortedScanFlag = true;
TString itemsLimitParamName;
NDqProto::TData itemsLimitBytes;
NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
@@ -477,6 +478,7 @@ private:
stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(),
op.GetReadRange().GetSkipNullKeys().end());
} else if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ sortedScanFlag = op.GetReadOlapRange().GetSorted();
reverse = op.GetReadOlapRange().GetReverse();
ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv,
itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
@@ -731,6 +733,7 @@ private:
if (!task.Meta.Reads->empty()) {
protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse);
protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
+ protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted);
if (tableInfo.TableKind == ETableKind::Olap) {
auto* olapProgram = protoTaskMeta.MutableOlapProgram();
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h
index 7c356ab623f..582be0f0397 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.h
@@ -123,6 +123,7 @@ struct TTaskMeta {
struct TReadInfo {
ui64 ItemsLimit = 0;
bool Reverse = false;
+ bool Sorted = false;
TKqpOlapProgram OlapProgram;
};
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index 97b416f7a8f..68073db5921 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -119,11 +119,8 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
}
}
+ bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap;
if (direction == SortDirectionReverse) {
-// if (!config.AllowReverseRange()) {
-// return node;
-// }
- bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap;
if (!olapTable && kqpCtx.IsScanQuery()) {
return node;
}
@@ -135,8 +132,15 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
}
settings.SetReverse();
+ settings.SetSorted();
input = BuildReadNode(input.Pos(), ctx, input, settings);
+ } else if (SortDirectionForward) {
+ if (olapTable) {
+ auto settings = GetReadTableSettings(input, isReadTableRanges);
+ settings.SetSorted();
+ input = BuildReadNode(input.Pos(), ctx, input, settings);
+ }
}
if (maybeFlatmap) {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index df7b9796939..cae8f32e05e 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -143,6 +143,9 @@ message TKqpPhyOpReadOlapRanges {
* it is done a bit later in executor, so we need separate field for parameter names.
*/
repeated string OlapProgramParameterNames = 5;
+ // Sorted sign which indicates that read operation should return a sorted data out of the
+ // dedicated actors or we can relax constraints here and return unsorted data.
+ bool Sorted = 6;
}
message TKqpPhyOpReadRanges {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index b43410e07a7..4c4179fef3f 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -213,6 +213,7 @@ message TKqpTransaction {
reserved 8; // optional bytes ProcessProgram = 8;
optional EScanDataFormat DataFormat = 9;
optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables
+ optional bool Sorted = 11;
}
optional EKqpTransactionType Type = 1;