diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-19 19:12:51 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-19 19:34:04 +0300 |
commit | 6e4b3cea694724389e8ceaea57de19fba857f3bf (patch) | |
tree | 248ab22050cf4ffd8da4352ce0a32ff604d74a72 | |
parent | 7b0fc6ec5239bf02080f3f3578371aec67d2e039 (diff) | |
download | ydb-6e4b3cea694724389e8ceaea57de19fba857f3bf.tar.gz |
YQL-16443 use TimeOrderRecover for streams
9 files changed, 125 insertions, 21 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index c9fa795fd4..460f5b776b 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -7368,7 +7368,7 @@ struct TPeepHoleRules { {"CheckedMinus", &ExpandCheckedMinus}, {"JsonValue", &ExpandJsonValue}, {"JsonExists", &ExpandJsonExists}, - {"MatchRecognize", &ExpandMatchRecognize} + //TODO(zverevgeny): add me {"MatchRecognize", &ExpandMatchRecognize} }; static constexpr std::initializer_list<TExtPeepHoleOptimizerMap::value_type> CommonStageExtRulesInit = { diff --git a/ydb/library/yql/core/yql_opt_match_recognize.cpp b/ydb/library/yql/core/yql_opt_match_recognize.cpp index d5e0bd4ca9..06522daf0a 100644 --- a/ydb/library/yql/core/yql_opt_match_recognize.cpp +++ b/ydb/library/yql/core/yql_opt_match_recognize.cpp @@ -1,5 +1,6 @@ #include "yql_opt_match_recognize.h" #include "yql_opt_utils.h" +#include <ydb/library/yql/core/sql_types/time_order_recover.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/utils/log/log.h> @@ -21,7 +22,7 @@ bool IsStreaming(const TExprNode::TPtr& input) { } } //namespace -TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx) { YQL_ENSURE(node->IsCallable({"MatchRecognize"})); const auto& input = node->ChildRef(0); const auto& partitionKeySelector = node->ChildRef(1); @@ -56,18 +57,75 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx); TExprNode::TPtr result; if (isStreaming) { - //TODO use TimeOrderRecover + YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams"); + const auto reordered = ctx.Builder(pos) + .Lambda() + .Param("partition") + .Callable("ForwardList") + .Callable(0, "OrderedMap") + .Callable(0, "TimeOrderRecover") + .Callable(0, "ToFlow"). + Arg(0, "partition") + .Seal() + .Add(1, sortKey) + .Callable(2, "Interval") + .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverDelay))) + .Seal() + .Callable(3, "Interval") + .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverAhead))) + .Seal() + .Callable(4, "Uint32") + .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverRowLimit))) + .Seal() + .Seal() + .Lambda(1) + .Param("row") + .Callable("RemoveMember") + .Arg(0, "row") + .Add(1, ctx.NewAtom(pos, NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER)) + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + + const auto matchRecognizeOnReorderedPartition = ctx.Builder(pos) + .Lambda() + .Param("partition") + .Apply(matchRecognize) + .With(0) + .Apply(reordered) + .With(0) + .Arg("partition") + .Done() + .Seal() + .Done() + .Seal() + .Seal() + .Build(); + TExprNode::TPtr keySelector; if (partitionColumns->ChildrenSize() != 0) { - result = ctx.Builder(pos) - .Callable("ShuffleByKeys") - .Add(0, input) - .Add(1, partitionKeySelector) - .Add(2, matchRecognize) + keySelector = partitionKeySelector; + } else { + //Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage + //TODO(zverevgeny): fixme + keySelector = ctx.Builder(pos) + .Lambda() + .Param("row") + .Callable("Bool") + .Add(0, ctx.NewAtom(pos, "true")) + .Seal() .Seal() .Build(); - } else { - result = matchRecognize; } + result = ctx.Builder(pos) + .Callable("ShuffleByKeys") + .Add(0, input) + .Add(1, keySelector) + .Add(2, matchRecognizeOnReorderedPartition) + .Seal() + .Build(); } else { //non-streaming if (partitionColumns->ChildrenSize() != 0) { result = ctx.Builder(pos) diff --git a/ydb/library/yql/core/yql_opt_match_recognize.h b/ydb/library/yql/core/yql_opt_match_recognize.h index 934a783a61..0eed26ecc9 100644 --- a/ydb/library/yql/core/yql_opt_match_recognize.h +++ b/ydb/library/yql/core/yql_opt_match_recognize.h @@ -1,8 +1,9 @@ #pragma once #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/core/yql_type_annotation.h> namespace NYql { -TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx); +TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext& typeAnnCtx); -} //namespace NYql
\ No newline at end of file +} //namespace NYql diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h index 41f8b4e8d5..0285f0d097 100644 --- a/ydb/library/yql/core/yql_type_annotation.h +++ b/ydb/library/yql/core/yql_type_annotation.h @@ -238,7 +238,9 @@ struct TTypeAnnotationContext: public TThrRefBase { IArrowResolver::TPtr ArrowResolver; TString CostBasedOptimizerType; bool MatchRecognize = false; - + i64 TimeOrderRecoverDelay = -10'000'000; //microseconds + i64 TimeOrderRecoverAhead = 10'000'000; //microseconds + ui32 TimeOrderRecoverRowLimit = 1'000'000; // compatibility with v0 or raw s-expression code bool OrderedColumns = false; TColumnOrderStorage::TPtr ColumnOrderStorage = new TColumnOrderStorage; diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp index 9e5809d1c4..079a0f116b 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp @@ -293,10 +293,9 @@ IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::T return status; } -TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx) { - if (node.Maybe<TCoMatchRecognize>()) - return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx)); - return node; +TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx) { + YQL_ENSURE(node.Maybe<TCoMatchRecognize>(), "Expected MatchRecognize"); + return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx)); } } diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h index 820d247da0..622bf92c72 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.h +++ b/ydb/library/yql/dq/opt/dq_opt_log.h @@ -34,6 +34,6 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx); IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config); -NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx); +NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, const TTypeAnnotationContext& typeAnnCtx); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp index 54e6111267..129d6f672f 100644 --- a/ydb/library/yql/providers/config/yql_config_provider.cpp +++ b/ydb/library/yql/providers/config/yql_config_provider.cpp @@ -828,6 +828,48 @@ namespace { } Types.MatchRecognize = name == "_EnableMatchRecognize"; } + else if (name == "TimeOrderRecoverDelay") { + if (args.size() != 1) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size())); + return false; + } + if (!TryFromString(args[0], Types.TimeOrderRecoverDelay)) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0])); + return false; + } + if (Types.TimeOrderRecoverDelay >= 0) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected negative value, but got: " << args[0])); + return false; + } + } + else if (name == "TimeOrderRecoverAhead") { + if (args.size() != 1) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size())); + return false; + } + if (!TryFromString(args[0], Types.TimeOrderRecoverAhead)) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0])); + return false; + } + if (Types.TimeOrderRecoverAhead <= 0) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected positive value, but got: " << args[0])); + return false; + } + } + else if (name == "TimeOrderRecoverRowLimit") { + if (args.size() != 1) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected one argument, but got " << args.size())); + return false; + } + if (!TryFromString(args[0], Types.TimeOrderRecoverRowLimit)) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0])); + return false; + } + if (Types.TimeOrderRecoverRowLimit == 0) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected positive value, but got: " << args[0])); + return false; + } + } else { ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name)); return false; diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index 8187676776..1b91310822 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -146,8 +146,10 @@ protected: } TMaybeNode<TExprBase> ExpandMatchRecognize(TExprBase node, TExprContext& ctx) { - if (node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>()) { - return DqExpandMatchRecognize(node, ctx); + if (node.Maybe<TCoMatchRecognize>() && + node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>() + ) { + return DqExpandMatchRecognize(node, ctx, TypesCtx); } return node; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp index 59cc56ce1a..13b0525e8e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp @@ -2660,7 +2660,7 @@ protected: } TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) { - return ExpandMatchRecognize(node.Ptr(), ctx); + return ExpandMatchRecognize(node.Ptr(), ctx, *Types); } private: TYtState::TPtr State_; |