diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-08-25 18:58:02 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-08-25 19:12:46 +0300 |
commit | f36aa51c663ece905630866ce7eff969024c7274 (patch) | |
tree | ef58d0561ad2704ac8b7387656d0c80e05c91ed3 | |
parent | b5b002248a04fdffdd1f6c0fa26f9551bc9ed7f5 (diff) | |
download | ydb-f36aa51c663ece905630866ce7eff969024c7274.tar.gz |
YQL-16325 initial optimizer for MATCH_RECOGNIZE
16 files changed, 162 insertions, 3 deletions
diff --git a/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt index 151c88bb9d..1101ee7d42 100644 --- a/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/core/CMakeLists.darwin-x86_64.txt @@ -94,6 +94,7 @@ target_sources(library-yql-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_holding_file_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_join.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_library_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_proposed_by_data.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_range.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_rewrite_io.cpp diff --git a/ydb/library/yql/core/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/CMakeLists.linux-aarch64.txt index bcf8ef8a85..e70610f312 100644 --- a/ydb/library/yql/core/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/core/CMakeLists.linux-aarch64.txt @@ -95,6 +95,7 @@ target_sources(library-yql-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_holding_file_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_join.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_library_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_proposed_by_data.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_range.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_rewrite_io.cpp diff --git a/ydb/library/yql/core/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/CMakeLists.linux-x86_64.txt index bcf8ef8a85..e70610f312 100644 --- a/ydb/library/yql/core/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/core/CMakeLists.linux-x86_64.txt @@ -95,6 +95,7 @@ target_sources(library-yql-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_holding_file_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_join.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_library_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_proposed_by_data.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_range.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_rewrite_io.cpp diff --git a/ydb/library/yql/core/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/CMakeLists.windows-x86_64.txt index 151c88bb9d..1101ee7d42 100644 --- a/ydb/library/yql/core/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/core/CMakeLists.windows-x86_64.txt @@ -94,6 +94,7 @@ target_sources(library-yql-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_holding_file_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_join.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_library_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_proposed_by_data.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_range.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_rewrite_io.cpp diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index fd2f837316..9b3b819794 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1472,6 +1472,17 @@ ] }, { + "Name": "TCoMatchRecognize", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "MatchRecognize"}, + "Children": [ + {"Index": 1, "Name": "partitionKeySelector", "Type": "TExprBase"}, + {"Index": 2, "Name": "partitionColumns", "Type": "TExprBase"}, + {"Index": 3, "Name": "sortTraits", "Type": "TExprBase"}, + {"Index": 4, "Name": "params", "Type": "TExprBase"} + ] + }, + { "Name": "TCoEquiJoinTuple", "Base": "TExprBase", "Match": {"Type": "Tuple"}, diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index e612cd70f6..62b4c057df 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -8,6 +8,7 @@ #include <ydb/library/yql/core/yql_opt_range.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/core/yql_opt_window.h> +#include <ydb/library/yql/core/yql_opt_match_recognize.h> #include <ydb/library/yql/core/yql_join.h> #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/common_opt/yql_co_transformer.h> @@ -7240,7 +7241,8 @@ struct TPeepHoleRules { {"CheckedMod", &ExpandCheckedMod}, {"CheckedMinus", &ExpandCheckedMinus}, {"JsonValue", &ExpandJsonValue}, - {"JsonExists", &ExpandJsonExists} + {"JsonExists", &ExpandJsonExists}, + {"MatchRecognize", &ExpandMatchRecognize} }; static constexpr std::initializer_list<TExtPeepHoleOptimizerMap::value_type> CommonStageExtRulesInit = { diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index bbef3d080d..205f5be3e3 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -12107,6 +12107,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["MatchRecognizeMeasures"] = &MatchRecognizeMeasuresWrapper; Functions["MatchRecognizePattern"] = &MatchRecognizePatternWrapper; Functions["MatchRecognizeDefines"] = &MatchRecognizeDefinesWrapper; + Functions["MatchRecognizeCore"] = &MatchRecognizeCoreWrapper; Functions["FromPg"] = &FromPgWrapper; Functions["ToPg"] = &ToPgWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp b/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp index 6de8f30140..5d8b74692d 100644 --- a/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp @@ -64,8 +64,8 @@ const std::unordered_set<TString> GetPrimaryVars(const TExprNode::TPtr& pattern, result.insert(TString(factor->ChildRef(0)->Content())); } else { YQL_ENSURE(nestingLevel < MaxMatchRecognizePatternNesting, "To big nesting level in the pattern"); - auto subExprVars = GetPrimaryVars(factor->ChildRef(0), ctx, ++nestingLevel); - result.insert(subExprVars.begin(), subExprVars.end()); + auto subExprVars = GetPrimaryVars(factor->ChildRef(0), ctx, ++nestingLevel); + result.insert(subExprVars.begin(), subExprVars.end()); } } } @@ -200,4 +200,35 @@ MatchRecognizeDefinesWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& outp return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus +MatchRecognizeCoreWrapper(const TExprNode::TPtr &input, TExprNode::TPtr &output, TContext &ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 4, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + const auto& source = input->ChildRef(0); + const auto& partitionKeySelector = input->ChildRef(1); + const auto& partitionColumns = input->ChildRef(2); + const auto& params = input->ChildRef(3); + + YQL_ENSURE(source->GetTypeAnn()->Cast<TFlowExprType>() != NULL, "Internal logic error. Flow expected"); + const auto& define = params->ChildRef(4); + YQL_ENSURE(GetSeqItemType(source->GetTypeAnn())->Equals(*define->ChildRef(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()), + "Internal logic error. Expected the same input type as for DEFINE"); + + const auto& partitionKeySelectorType = partitionKeySelector->GetTypeAnn(); + const auto& partitionKeySelectorItemTypes = partitionKeySelectorType->Cast<TTupleExprType>()->GetItems(); + + auto outputTableColumns = params->GetTypeAnn()->Cast<TStructExprType>()->GetItems(); + for (size_t i = 0; i != partitionColumns->ChildrenSize(); ++i) { + outputTableColumns.push_back(ctx.Expr.MakeType<TItemExprType>( + partitionColumns->ChildRef(i)->Content(), + partitionKeySelectorItemTypes[i] + )); + } + const auto outputTableRowType = ctx.Expr.MakeType<TStructExprType>(outputTableColumns); + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputTableRowType)); + return IGraphTransformer::TStatus::Ok; +} + } // namespace NYql::NTypeAnnImpl diff --git a/ydb/library/yql/core/type_ann/type_ann_match_recognize.h b/ydb/library/yql/core/type_ann/type_ann_match_recognize.h index 5c2be3cf61..ddb870c33f 100644 --- a/ydb/library/yql/core/type_ann/type_ann_match_recognize.h +++ b/ydb/library/yql/core/type_ann/type_ann_match_recognize.h @@ -14,5 +14,6 @@ namespace NYql { IGraphTransformer::TStatus MatchRecognizeMeasuresWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus MatchRecognizePatternWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus MatchRecognizeDefinesWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus MatchRecognizeCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); } } diff --git a/ydb/library/yql/core/ya.make b/ydb/library/yql/core/ya.make index 7665acc9da..ec7b793d99 100644 --- a/ydb/library/yql/core/ya.make +++ b/ydb/library/yql/core/ya.make @@ -27,6 +27,8 @@ SRCS( yql_join.cpp yql_join.h yql_library_compiler.cpp + yql_opt_match_recognize.cpp + yql_opt_match_recognize.h yql_opt_proposed_by_data.cpp yql_opt_proposed_by_data.h yql_opt_range.cpp diff --git a/ydb/library/yql/core/yql_opt_match_recognize.cpp b/ydb/library/yql/core/yql_opt_match_recognize.cpp new file mode 100644 index 0000000000..f7cc70ca3d --- /dev/null +++ b/ydb/library/yql/core/yql_opt_match_recognize.cpp @@ -0,0 +1,75 @@ +#include "yql_opt_match_recognize.h" +#include "yql_opt_utils.h" + +#include <ydb/library/yql/utils/log/log.h> + +namespace NYql { + +using namespace NNodes; + +TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx) { + YQL_ENSURE(node->IsCallable({"MatchRecognize"})); + const auto& input = node->ChildRef(0); + const auto& partitionKeySelector = node->ChildRef(1); + const auto& partitionColumns = node->ChildRef(2); + const auto& sortTraits = node->ChildRef(3); + const auto& params = node->ChildRef(4); + + const auto pos = node->Pos(); + + const auto matchRecognize = ctx.Builder(pos) + .Lambda() + .Param("sortedPartition") + .Callable(0, "ForwardList") + .Callable(0, "MatchRecognizeCore") + .Callable(0, "ToFlow") + .Arg(0, "sortedPartition") + .Seal() + .Add(1, partitionKeySelector) + .Add(2, partitionColumns) + .Add(3, params) + .Seal() + .Seal() + .Seal() + .Build(); + + TExprNode::TPtr sortKey; + TExprNode::TPtr sortOrder; + ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx); + + const auto matchRecognizeOnSortedPartition = sortOrder->ChildrenSize() != 0 ? + ctx.Builder(pos) + .Lambda() + .Param("partition") + .Apply(matchRecognize) + .With(0) + .Callable("Sort") + .Arg(0, "partition") + .Add(1, sortOrder) + .Add(2, sortKey) + .Seal() + .Done() + .Seal() + .Seal() + .Build() : + matchRecognize; + + const auto result = partitionColumns->ChildrenSize() != 0 ? + ctx.Builder(pos) + .Callable("ShuffleByKeys") + .Add(0, input) + .Add(1, partitionKeySelector) + .Add(2, matchRecognizeOnSortedPartition) + .Seal() + .Build() : + ctx.Builder(pos) + .Apply(matchRecognizeOnSortedPartition) + .With(0, input) + .Seal() + .Build(); + YQL_CLOG(INFO, Core) << "Expanded MatchRecognize"; + return result; +} + + +} //namespace NYql diff --git a/ydb/library/yql/core/yql_opt_match_recognize.h b/ydb/library/yql/core/yql_opt_match_recognize.h new file mode 100644 index 0000000000..934a783a61 --- /dev/null +++ b/ydb/library/yql/core/yql_opt_match_recognize.h @@ -0,0 +1,8 @@ +#pragma once +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> + +namespace NYql { + +TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr &node, TExprContext &ctx); + +} //namespace NYql
\ No newline at end of file diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp index 4787cc1b11..9e5809d1c4 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/core/yql_aggregate_expander.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_window.h> +#include <ydb/library/yql/core/yql_opt_match_recognize.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/core/yql_type_annotation.h> #include <ydb/library/yql/dq/integration/yql_dq_integration.h> @@ -292,4 +293,10 @@ IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::T return status; } +TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx) { + if (node.Maybe<TCoMatchRecognize>()) + return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx)); + return node; +} + } diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h index 6a0dd1b4d8..820d247da0 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.h +++ b/ydb/library/yql/dq/opt/dq_opt_log.h @@ -34,4 +34,6 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx); IGraphTransformer::TStatus DqWrapRead(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config); +NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx); + } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index cbabdbc158..c1b827f88b 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -58,6 +58,7 @@ public: AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteEquiJoin)); AddHandler(0, &TCoCalcOverWindowBase::Match, HNDL(ExpandWindowFunctions)); AddHandler(0, &TCoCalcOverWindowGroup::Match, HNDL(ExpandWindowFunctions)); + AddHandler(0, &TCoMatchRecognize::Match, HNDL(ExpandMatchRecognize)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(FlatMapOverExtend)); AddHandler(0, &TDqQuery::Match, HNDL(MergeQueriesWithSinks)); AddHandler(0, &TCoSqlIn::Match, HNDL(SqlInDropCompact)); @@ -135,6 +136,13 @@ protected: return node; } + TMaybeNode<TExprBase> ExpandMatchRecognize(TExprBase node, TExprContext& ctx) { + if (node.Cast<TCoInputBase>().Input().Maybe<TDqConnection>()) { + return DqExpandMatchRecognize(node, ctx); + } + return node; + } + TMaybeNode<TExprBase> MergeQueriesWithSinks(TExprBase node, TExprContext& ctx) { return DqMergeQueriesWithSinks(node, ctx); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp index 5eb55995c9..59cc56ce1a 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp @@ -15,6 +15,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_window.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/core/yql_opt_match_recognize.h> #include <ydb/library/yql/core/yql_join.h> #include <ydb/library/yql/core/yql_type_helpers.h> #include <ydb/library/yql/utils/log/log.h> @@ -77,6 +78,8 @@ public: AddHandler(2, &TCoEquiJoin::Match, HNDL(ConvertToCommonTypeForForcedMergeJoin)); AddHandler(2, &TCoShuffleByKeys::Match, HNDL(ShuffleByKeys)); + + AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize)); #undef HNDL } @@ -2655,6 +2658,10 @@ protected: .Build() .Done(); } + + TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) { + return ExpandMatchRecognize(node.Ptr(), ctx); + } private: TYtState::TPtr State_; }; |