aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-19 19:12:51 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-19 19:34:04 +0300
commit6e4b3cea694724389e8ceaea57de19fba857f3bf (patch)
tree248ab22050cf4ffd8da4352ce0a32ff604d74a72
parent7b0fc6ec5239bf02080f3f3578371aec67d2e039 (diff)
downloadydb-6e4b3cea694724389e8ceaea57de19fba857f3bf.tar.gz
YQL-16443 use TimeOrderRecover for streams
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp2
-rw-r--r--ydb/library/yql/core/yql_opt_match_recognize.cpp76
-rw-r--r--ydb/library/yql/core/yql_opt_match_recognize.h5
-rw-r--r--ydb/library/yql/core/yql_type_annotation.h4
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_log.cpp7
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_log.h2
-rw-r--r--ydb/library/yql/providers/config/yql_config_provider.cpp42
-rw-r--r--ydb/library/yql/providers/dq/opt/logical_optimize.cpp6
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp2
9 files changed, 125 insertions, 21 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index c9fa795fd4..460f5b776b 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -7368,7 +7368,7 @@ struct TPeepHoleRules {
{"CheckedMinus", &ExpandCheckedMinus},
{"JsonValue", &ExpandJsonValue},
{"JsonExists", &ExpandJsonExists},
- {"MatchRecognize", &ExpandMatchRecognize}
+ //TODO(zverevgeny): add me {"MatchRecognize", &ExpandMatchRecognize}
};
static constexpr std::initializer_list<TExtPeepHoleOptimizerMap::value_type> CommonStageExtRulesInit = {
diff --git a/ydb/library/yql/core/yql_opt_match_recognize.cpp b/ydb/library/yql/core/yql_opt_match_recognize.cpp
index d5e0bd4ca9..06522daf0a 100644
--- a/ydb/library/yql/core/yql_opt_match_recognize.cpp
+++ b/ydb/library/yql/core/yql_opt_match_recognize.cpp
@@ -1,5 +1,6 @@
#include "yql_opt_match_recognize.h"
#include "yql_opt_utils.h"
+#include <ydb/library/yql/core/sql_types/time_order_recover.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -21,7 +22,7 @@ bool IsStreaming(const TExprNode::TPtr& input) {
}
} //namespace
-TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx) {
+TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx) {
YQL_ENSURE(node->IsCallable({"MatchRecognize"}));
const auto& input = node->ChildRef(0);
const auto& partitionKeySelector = node->ChildRef(1);
@@ -56,18 +57,75 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
TExprNode::TPtr result;
if (isStreaming) {
- //TODO use TimeOrderRecover
+ YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams");
+ const auto reordered = ctx.Builder(pos)
+ .Lambda()
+ .Param("partition")
+ .Callable("ForwardList")
+ .Callable(0, "OrderedMap")
+ .Callable(0, "TimeOrderRecover")
+ .Callable(0, "ToFlow").
+ Arg(0, "partition")
+ .Seal()
+ .Add(1, sortKey)
+ .Callable(2, "Interval")
+ .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverDelay)))
+ .Seal()
+ .Callable(3, "Interval")
+ .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverAhead)))
+ .Seal()
+ .Callable(4, "Uint32")
+ .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverRowLimit)))
+ .Seal()
+ .Seal()
+ .Lambda(1)
+ .Param("row")
+ .Callable("RemoveMember")
+ .Arg(0, "row")
+ .Add(1, ctx.NewAtom(pos, NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER))
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ const auto matchRecognizeOnReorderedPartition = ctx.Builder(pos)
+ .Lambda()
+ .Param("partition")
+ .Apply(matchRecognize)
+ .With(0)
+ .Apply(reordered)
+ .With(0)
+ .Arg("partition")
+ .Done()
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal()
+ .Build();
+ TExprNode::TPtr keySelector;
if (partitionColumns->ChildrenSize() != 0) {
- result = ctx.Builder(pos)
- .Callable("ShuffleByKeys")
- .Add(0, input)
- .Add(1, partitionKeySelector)
- .Add(2, matchRecognize)
+ keySelector = partitionKeySelector;
+ } else {
+ //Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
+ //TODO(zverevgeny): fixme
+ keySelector = ctx.Builder(pos)
+ .Lambda()
+ .Param("row")
+ .Callable("Bool")
+ .Add(0, ctx.NewAtom(pos, "true"))
+ .Seal()
.Seal()
.Build();
- } else {
- result = matchRecognize;
}
+ result = ctx.Builder(pos)
+ .Callable("ShuffleByKeys")
+ .Add(0, input)
+ .Add(1, keySelector)
+ .Add(2, matchRecognizeOnReorderedPartition)
+ .Seal()
+ .Build();
} else { //non-streaming
if (partitionColumns->ChildrenSize() != 0) {
result = ctx.Builder(pos)
diff --git a/ydb/library/yql/core/yql_opt_match_recognize.h b/ydb/library/yql/core/yql_opt_match_recognize.h
index 934a783a61..0eed26ecc9 100644
--- a/ydb/library/yql/core/yql_opt_match_recognize.h
+++ b/ydb/library/yql/core/yql_opt_match_recognize.h
@@ -1,8 +1,9 @@
#pragma once
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
namespace NYql {
-TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx);
+TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext& typeAnnCtx);
-} //namespace NYql \ No newline at end of file
+} //namespace NYql
diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h
index 41f8b4e8d5..0285f0d097 100644
--- a/ydb/library/yql/core/yql_type_annotation.h
+++ b/ydb/library/yql/core/yql_type_annotation.h
@@ -238,7 +238,9 @@ struct TTypeAnnotationContext: public TThrRefBase {
IArrowResolver::TPtr ArrowResolver;
TString CostBasedOptimizerType;
bool MatchRecognize = false;
-
+ i64 TimeOrderRecoverDelay = -10'000'000; //microseconds
+ i64 TimeOrderRecoverAhead = 10'000'000; //microseconds
+ ui32 TimeOrderRecoverRowLimit = 1'000'000;
// compatibility with v0 or raw s-expression code
bool OrderedColumns = false;
TColumnOrderStorage::TPtr ColumnOrderStorage = new TColumnOrderStorage;
diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp
index 9e5809d1c4..079a0f116b 100644
--- a/ydb/library/yql/dq/opt/dq_opt_log.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp
@@ -293,10 +293,9 @@ IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::T
return status;
}
-TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx) {
- if (node.Maybe<TCoMatchRecognize>())
- return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx));
- return node;
+TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx) {
+ YQL_ENSURE(node.Maybe<TCoMatchRecognize>(), "Expected MatchRecognize");
+ return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx));
}
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h
index 820d247da0..622bf92c72 100644
--- a/ydb/library/yql/dq/opt/dq_opt_log.h
+++ b/ydb/library/yql/dq/opt/dq_opt_log.h
@@ -34,6 +34,6 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx);
IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
-NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx);
+NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp
index 54e6111267..129d6f672f 100644
--- a/ydb/library/yql/providers/config/yql_config_provider.cpp
+++ b/ydb/library/yql/providers/config/yql_config_provider.cpp
@@ -828,6 +828,48 @@ namespace {
}
Types.MatchRecognize = name == "_EnableMatchRecognize";
}
+ else if (name == "TimeOrderRecoverDelay") {
+ if (args.size() != 1) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size()));
+ return false;
+ }
+ if (!TryFromString(args[0], Types.TimeOrderRecoverDelay)) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
+ return false;
+ }
+ if (Types.TimeOrderRecoverDelay >= 0) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected negative value, but got: " << args[0]));
+ return false;
+ }
+ }
+ else if (name == "TimeOrderRecoverAhead") {
+ if (args.size() != 1) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size()));
+ return false;
+ }
+ if (!TryFromString(args[0], Types.TimeOrderRecoverAhead)) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
+ return false;
+ }
+ if (Types.TimeOrderRecoverAhead <= 0) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected positive value, but got: " << args[0]));
+ return false;
+ }
+ }
+ else if (name == "TimeOrderRecoverRowLimit") {
+ if (args.size() != 1) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size()));
+ return false;
+ }
+ if (!TryFromString(args[0], Types.TimeOrderRecoverRowLimit)) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
+ return false;
+ }
+ if (Types.TimeOrderRecoverRowLimit == 0) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected positive value, but got: " << args[0]));
+ return false;
+ }
+ }
else {
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
return false;
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
index 8187676776..1b91310822 100644
--- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
@@ -146,8 +146,10 @@ protected:
}
TMaybeNode<TExprBase> ExpandMatchRecognize(TExprBase node, TExprContext& ctx) {
- if (node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>()) {
- return DqExpandMatchRecognize(node, ctx);
+ if (node.Maybe<TCoMatchRecognize>() &&
+ node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>()
+ ) {
+ return DqExpandMatchRecognize(node, ctx, TypesCtx);
}
return node;
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
index 59cc56ce1a..13b0525e8e 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
@@ -2660,7 +2660,7 @@ protected:
}
TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
- return ExpandMatchRecognize(node.Ptr(), ctx);
+ return ExpandMatchRecognize(node.Ptr(), ctx, *Types);
}
private:
TYtState::TPtr State_;