diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-08-29 20:40:16 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-08-30 01:54:50 +0300 |
commit | 83afbdd473100c991c643469c71299292a82b3f1 (patch) | |
tree | f54c279f0df2963fd52a524d992879eba10a7272 | |
parent | a81aa11915b2075c4c0436914432e01102271042 (diff) | |
download | ydb-83afbdd473100c991c643469c71299292a82b3f1.tar.gz |
YQL-16325 match_recognize computation skeleton
29 files changed, 575 insertions, 10 deletions
diff --git a/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt index 6c1a4cb9ee..a715d1f3c1 100644 --- a/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt @@ -6,12 +6,24 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(yql-core-sql_types) target_link_libraries(yql-core-sql_types PUBLIC contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime ) target_sources(yql-core-sql_types PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp ) +generate_enum_serilization(yql-core-sql_types + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h + INCLUDE_HEADERS + ydb/library/yql/core/sql_types/match_recognize.h +) diff --git a/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt index e4181a0333..54472b4296 100644 --- a/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt @@ -6,13 +6,25 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(yql-core-sql_types) target_link_libraries(yql-core-sql_types PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime ) target_sources(yql-core-sql_types PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp ) +generate_enum_serilization(yql-core-sql_types + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h + INCLUDE_HEADERS + ydb/library/yql/core/sql_types/match_recognize.h +) diff --git a/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt index e4181a0333..54472b4296 100644 --- a/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt @@ -6,13 +6,25 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(yql-core-sql_types) target_link_libraries(yql-core-sql_types PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime ) target_sources(yql-core-sql_types PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp ) +generate_enum_serilization(yql-core-sql_types + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h + INCLUDE_HEADERS + ydb/library/yql/core/sql_types/match_recognize.h +) diff --git a/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt index 6c1a4cb9ee..a715d1f3c1 100644 --- a/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt @@ -6,12 +6,24 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(yql-core-sql_types) target_link_libraries(yql-core-sql_types PUBLIC contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime ) target_sources(yql-core-sql_types PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp ) +generate_enum_serilization(yql-core-sql_types + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h + INCLUDE_HEADERS + ydb/library/yql/core/sql_types/match_recognize.h +) diff --git a/ydb/library/yql/core/sql_types/match_recognize.h b/ydb/library/yql/core/sql_types/match_recognize.h index 4c0ce6e4f5..fc2d6881ff 100644 --- a/ydb/library/yql/core/sql_types/match_recognize.h +++ b/ydb/library/yql/core/sql_types/match_recognize.h @@ -1,8 +1,21 @@ #pragma once +#include <util/generic/string.h> +#include <util/string/cast.h> #include <stddef.h> -namespace NYql { +namespace NYql::NMatchRecognize { -constexpr size_t MaxMatchRecognizePatternNesting = 20; //Limit recursion +constexpr size_t MaxPatternNesting = 20; //Limit recursion for patterns -}//namespace NYql +//Mixin columns for calculating measures +enum class MeasureInputDataSpecialColumns { + Classifier = 0, + MatchNumber = 1, + Last +}; + +inline TString MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns c) { + return TString("_yql_") + ToString(c); +} + +}//namespace NYql::NMatchRecognize diff --git a/ydb/library/yql/core/sql_types/ya.make b/ydb/library/yql/core/sql_types/ya.make index 0c38562ebc..2ccf3b837d 100644 --- a/ydb/library/yql/core/sql_types/ya.make +++ b/ydb/library/yql/core/sql_types/ya.make @@ -1,8 +1,10 @@ LIBRARY() SRCS( + match_recognize.h simple_types.h simple_types.cpp ) +GENERATE_ENUM_SERIALIZATION(match_recognize.h) END() diff --git a/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp b/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp index 8eecc70c1a..ebf6d9d2c9 100644 --- a/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp @@ -63,7 +63,7 @@ const std::unordered_set<TString> GetPrimaryVars(const TExprNode::TPtr& pattern, if (factor->ChildRef(0)->IsAtom()) { result.insert(TString(factor->ChildRef(0)->Content())); } else { - YQL_ENSURE(nestingLevel < MaxMatchRecognizePatternNesting, "To big nesting level in the pattern"); + YQL_ENSURE(nestingLevel < NYql::NMatchRecognize::MaxPatternNesting, "To big nesting level in the pattern"); auto subExprVars = GetPrimaryVars(factor->ChildRef(0), ctx, ++nestingLevel); result.insert(subExprVars.begin(), subExprVars.end()); } @@ -107,9 +107,15 @@ MatchRecognizeMeasuresWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& out return IGraphTransformer::TStatus::Error; } - auto lambdaInputRowColumns = inputRowType->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems(); - lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>("_yql_Classifier", ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8))); - lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>("_yql_MatchNumber", ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64))); + auto lambdaInputRowColumns = inputRowType->GetTypeAnn() + ->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems(); + using NYql::NMatchRecognize::MeasureInputDataSpecialColumns; + lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>( + MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::Classifier), + ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8))); + lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>( + MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::MatchNumber), + ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); auto lambdaInputRowType = ctx.Expr.MakeType<TStructExprType>(lambdaInputRowColumns); const auto& matchedRowsRanges = GetMatchedRowsRangesType(pattern, ctx); YQL_ENSURE(matchedRowsRanges); diff --git a/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt index 8be56fe7c9..09684df2ae 100644 --- a/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt @@ -42,6 +42,7 @@ target_link_libraries(library-yql-minikql PUBLIC yql-public-udf public-udf-tz library-yql-utils + yql-core-sql_types ydb-library-uuid public-lib-scheme_types ) diff --git a/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt index 569f8b35ff..af687a328e 100644 --- a/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt @@ -43,6 +43,7 @@ target_link_libraries(library-yql-minikql PUBLIC yql-public-udf public-udf-tz library-yql-utils + yql-core-sql_types ydb-library-uuid public-lib-scheme_types ) diff --git a/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt index 569f8b35ff..af687a328e 100644 --- a/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt @@ -43,6 +43,7 @@ target_link_libraries(library-yql-minikql PUBLIC yql-public-udf public-udf-tz library-yql-utils + yql-core-sql_types ydb-library-uuid public-lib-scheme_types ) diff --git a/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt index 8be56fe7c9..09684df2ae 100644 --- a/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt @@ -42,6 +42,7 @@ target_link_libraries(library-yql-minikql PUBLIC yql-public-udf public-udf-tz library-yql-utils + yql-core-sql_types ydb-library-uuid public-lib-scheme_types ) diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt index db7bf3a22f..780ea80c0a 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt @@ -106,6 +106,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt index 18a9e65c47..f279be85fb 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt @@ -107,6 +107,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt index 18a9e65c47..f279be85fb 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt @@ -107,6 +107,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt index db7bf3a22f..780ea80c0a 100644 --- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt @@ -106,6 +106,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index d882c5f2f6..45ee823be4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -63,6 +63,7 @@ #include "mkql_map.h" #include "mkql_mapnext.h" #include "mkql_map_join.h" +#include "mkql_match_recognize.h" #include "mkql_multimap.h" #include "mkql_next_value.h" #include "mkql_nop.h" @@ -342,6 +343,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"RoundDown", &WrapRound}, {"NextValue", &WrapNextValue}, {"Nop", &WrapNop}, + {"MatchRecognizeCore", &WrapMatchRecognizeCore}, }; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp new file mode 100644 index 0000000000..99b5b932be --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp @@ -0,0 +1,304 @@ +#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_runtime_version.h> +#include <ydb/library/yql/core/sql_types/match_recognize.h> + +namespace NKikimr::NMiniKQL { + +namespace { + +enum class EMeasureColumnSource {Classifier = 0, MatchNumber = 1, Input}; +using TMeasureInputColumnOrder = std::vector<std::pair<EMeasureColumnSource, size_t>>; + +enum class EOutputColumnSource {PartitionKey, Measure}; +using TOutputColumnOrder = std::vector<std::pair<EOutputColumnSource, size_t>>; + +//Process one partition of input data +struct IProcessMatchRecognize { + ///return true if it has output data ready + virtual bool ProcessInputRow(NUdf::TUnboxedValue&& row) = 0; + virtual NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) = 0; + virtual bool ProcessEndOfData() = 0; + virtual ~IProcessMatchRecognize(){} +}; + +class TStreamMatchRecognize: public IProcessMatchRecognize { +public: + TStreamMatchRecognize( + NUdf::TUnboxedValue&& partitionKey, + std::vector<IComputationNode*>& measures, + const TOutputColumnOrder& outputColumnOrder, + const TContainerCacheOnContext& cache + ) + : PartitionKey(std::move(partitionKey)) + , Measures(measures) + , OutputColumnOrder(outputColumnOrder) + , Cache(cache) + , HasMatch(false) + { + } + + bool ProcessInputRow(NUdf::TUnboxedValue&& row) override{ + //Assume match on every row, TODO fixme + Y_UNUSED(row); + HasMatch = true; + return HasMatch; + } + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { + if (!HasMatch) + return NUdf::TUnboxedValue::Invalid(); + HasMatch = false; + NUdf::TUnboxedValue *itemsPtr = nullptr; + const auto result = Cache.NewArray(ctx, OutputColumnOrder.size(), itemsPtr); + for (auto const& c: OutputColumnOrder) { + switch(c.first) { + case EOutputColumnSource::Measure: + *itemsPtr++ = Measures[c.second]->GetValue(ctx); + break; + case EOutputColumnSource::PartitionKey: + *itemsPtr++ = PartitionKey.GetElement(c.second); + break; + default: + MKQL_ENSURE(false, "Internal logic error"); + } + } + return result; + } + bool ProcessEndOfData() override { + //TODO + return false; + } +private: + const NUdf::TUnboxedValue PartitionKey; + const std::vector<IComputationNode*>& Measures; + const TOutputColumnOrder& OutputColumnOrder; + const TContainerCacheOnContext& Cache; + bool HasMatch; + +}; + + +class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecognizeWrapper> { + using TBaseComputation = TStatefulFlowComputationNode<TMatchRecognizeWrapper>; +public: + TMatchRecognizeWrapper(TComputationMutables &mutables, EValueRepresentation kind, IComputationNode *inputFlow, + IComputationExternalNode *inputRowArg, + IComputationNode *partitionKey, + TType* partitionKeyType, + std::vector<IComputationNode*>&& measures, + TOutputColumnOrder&& outputColumnOrder + ) + : TBaseComputation(mutables, inputFlow, kind, EValueRepresentation::Embedded) + , InputFlow(inputFlow) + , InputRowArg(inputRowArg) + , PartitionKey(partitionKey) + , PartitionKeyType(partitionKeyType) + , Measures(measures) + , OutputColumnOrder(outputColumnOrder) + , Cache(mutables) + {} + + NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue &stateValue, TComputationContext &ctx) const { + if (stateValue.IsInvalid()) { + stateValue = ctx.HolderFactory.Create<TState>( + InputRowArg, + PartitionKey, + PartitionKeyType, + Measures, + OutputColumnOrder, + Cache + ); + } + auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get()); + while (true) { + if (auto output = state.GetOutputIfReady(ctx); !output.IsInvalid()) { + return output; + } + auto item = InputFlow->GetValue(ctx); + if (item.IsFinish()) { + state.ProcessEndOfData(); + continue; + } else if (item.IsSpecial()) { + return item; + } + state.ProcessInputRow(std::move(item), ctx); + } + } +private: + + class TState: public TComputationValue<TState> { + using TPartitionMap = std::unordered_map<TString, std::unique_ptr<IProcessMatchRecognize>>; + public: + TState( + TMemoryUsageInfo* memInfo, + IComputationExternalNode* inputRowArg, + IComputationNode* partitionKey, + TType* partitionKeyType, + const std::vector<IComputationNode*>& measures, + const TOutputColumnOrder& outputColumnOrder, + const TContainerCacheOnContext& cache + ) + : TComputationValue<TState>(memInfo) + , InputRowArg(inputRowArg) + , PartitionKey(partitionKey) + , PartitionKeyPacker(true, partitionKeyType) + , Measures(measures) + , OutputColumnOrder(outputColumnOrder) + , Cache(cache) + { + } + + void ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) { + auto partition = GetPartitionHandler(row, ctx); + if (partition->second->ProcessInputRow(std::move(row))) { + HasReadyOutput.push(partition); + } + } + + void ProcessEndOfData() { + for (auto it = Partitions.begin(); it != Partitions.end(); ++it) { + auto b = it->second->ProcessEndOfData(); + if (b) { + HasReadyOutput.push(it); + } + } + Terminating = true; + } + + NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { + while (!HasReadyOutput.empty()) { + auto r = HasReadyOutput.top()->second->GetOutputIfReady(ctx); + if (r.IsInvalid()) { + //dried up + HasReadyOutput.pop(); + continue; + } else { + return r; + } + } + return Terminating ? NUdf::TUnboxedValue::MakeFinish() : NUdf::TUnboxedValue::Invalid(); + } + + private: + TPartitionMap::iterator GetPartitionHandler(const NUdf::TUnboxedValue& row, TComputationContext &ctx) { + InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(row)); + auto partitionKey = PartitionKey->GetValue(ctx); + const auto packedKey = PartitionKeyPacker.Pack(partitionKey); + if (const auto it = Partitions.find(TString(packedKey)); it != Partitions.end()) { + return it; + } else { + return Partitions.emplace_hint(it, TString(packedKey), std::make_unique<TStreamMatchRecognize>( + std::move(partitionKey), + Measures, + OutputColumnOrder, + Cache + )); + } + } + + private: + //for this class + TPartitionMap Partitions; + std::stack<TPartitionMap::iterator> HasReadyOutput; + bool Terminating = false; + + IComputationExternalNode* InputRowArg; + IComputationNode* PartitionKey; + //TODO switch to tuple compare + TValuePackerGeneric<false> PartitionKeyPacker; + + //to be passed to partitions + std::vector<IComputationNode*> Measures; + const TOutputColumnOrder& OutputColumnOrder; + const TContainerCacheOnContext& Cache; + }; + +private: + void RegisterDependencies() const final { + if (const auto flow = FlowDependsOn(InputFlow)) { + Own(flow, InputRowArg); + } + DependsOn(PartitionKey, InputRowArg); + } + + IComputationNode* const InputFlow; + IComputationExternalNode* const InputRowArg; + IComputationNode* const PartitionKey; + TType* const PartitionKeyType; + std::vector<IComputationNode*> Measures; + TOutputColumnOrder OutputColumnOrder; + const TContainerCacheOnContext Cache; +}; + +TOutputColumnOrder GetOutputColumnOrder(TRuntimeNode partitionKyeColumnsIndexes, TRuntimeNode measureColumnsIndexes) { + std::unordered_map<size_t, std::pair<EOutputColumnSource, size_t>> temp; + { + auto list = AS_VALUE(TListLiteral, partitionKyeColumnsIndexes); + for (ui32 i = 0; i != list->GetItemsCount(); ++i) { + auto index = AS_VALUE(TDataLiteral, list->GetItems()[i])->AsValue().Get<ui32>(); + temp[index] = std::make_pair(EOutputColumnSource::PartitionKey, i); + } + } + { + auto list = AS_VALUE(TListLiteral, measureColumnsIndexes); + for (ui32 i = 0; i != list->GetItemsCount(); ++i) { + auto index = AS_VALUE(TDataLiteral, list->GetItems()[i])->AsValue().Get<ui32>(); + temp[index] = std::make_pair(EOutputColumnSource::Measure, i); + } + } + if (temp.empty()) + return {}; + auto outputSize = max_element(temp.cbegin(), temp.cend())->first + 1; + TOutputColumnOrder result(outputSize); + for (const auto& [i, v]: temp) { + result[i] = v; + } + return result; +} + +std::vector<IComputationNode*> ConvertVectorOfCallables(const std::vector<TRuntimeNode>& v, const TComputationNodeFactoryContext& ctx) { + std::vector<IComputationNode*> result; + result.reserve(v.size()); + for (auto& c: v) { + result.push_back(LocateNode(ctx.NodeLocator, *c.GetNode())); + } + return result; +} + +} //namespace + + +IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + size_t inputIndex = 0; + const auto& inputFlow = callable.GetInput(inputIndex++); + const auto& inputRowArg = callable.GetInput(inputIndex++); + const auto& partitionKeySelector = callable.GetInput(inputIndex++); + const auto& partitionColumnIndexes = callable.GetInput(inputIndex++); + const auto& measureInputDataArg = callable.GetInput(inputIndex++); + const auto& measureSpecialColumnIndexes = callable.GetInput(inputIndex++); + const auto& inputRowColumnCount = callable.GetInput(inputIndex++); + const auto& matchedRangesArg = callable.GetInput(inputIndex++); + const auto& measureColumnIndexes = callable.GetInput(inputIndex++); + std::vector<TRuntimeNode> measures; + for (size_t i = 0; i != AS_VALUE(TListLiteral, measureColumnIndexes)->GetItemsCount(); ++i) { + measures.push_back(callable.GetInput(inputIndex++)); + } + MKQL_ENSURE(callable.GetInputsCount() == inputIndex, "Wrong input count"); + Y_UNUSED(measureInputDataArg); + Y_UNUSED(measureSpecialColumnIndexes); + Y_UNUSED(inputRowColumnCount); + Y_UNUSED(matchedRangesArg); + + return new TMatchRecognizeWrapper(ctx.Mutables, GetValueRepresentation(inputFlow.GetStaticType()) + , LocateNode(ctx.NodeLocator, *inputFlow.GetNode()) + , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode())) + , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode()) + , partitionKeySelector.GetStaticType() + , ConvertVectorOfCallables(measures, ctx) + , GetOutputColumnOrder(partitionColumnIndexes, measureColumnIndexes) + ); +} + +} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h new file mode 100644 index 0000000000..858200d144 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h @@ -0,0 +1,6 @@ +#pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr::NMiniKQL { + IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputationNodeFactoryContext& ctx); +} diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt index f10e5ccc39..e40064d4a6 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt @@ -102,6 +102,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt index 96b0c048ea..01c73e57af 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt @@ -103,6 +103,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt index 96b0c048ea..01c73e57af 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt @@ -103,6 +103,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt index f10e5ccc39..e40064d4a6 100644 --- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt @@ -102,6 +102,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make.inc b/ydb/library/yql/minikql/comp_nodes/ya.make.inc index 1a84c6e9ba..d4315842e7 100644 --- a/ydb/library/yql/minikql/comp_nodes/ya.make.inc +++ b/ydb/library/yql/minikql/comp_nodes/ya.make.inc @@ -73,6 +73,7 @@ SRCS( mkql_map.cpp mkql_mapnext.cpp mkql_map_join.cpp + mkql_match_recognize.cpp mkql_multihopping.cpp mkql_multimap.cpp mkql_next_value.cpp diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 7f19b235d4..27be8ff034 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -7,6 +7,7 @@ #include "ydb/library/yql/minikql/mkql_function_registry.h" #include "ydb/library/yql/minikql/mkql_utils.h" #include "ydb/library/yql/minikql/mkql_type_builder.h" +#include "ydb/library/yql/core/sql_types/match_recognize.h" #include <util/string/cast.h> #include <util/string/printf.h> @@ -5790,6 +5791,126 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a return TRuntimeNode(builder.Build(), false); } +TRuntimeNode TProgramBuilder::MatchRecognizeCore( + TRuntimeNode inputStream, + const TUnaryLambda& getPartitionKeySelectorNode, + const TArrayRef<TStringBuf>& partitionColumns, + const TArrayRef<std::pair<TStringBuf, TBinaryLambda>>& getMeasures +) { + MKQL_ENSURE(RuntimeVersion >= 42, "MatchRecognize is not supported in runtime version " << RuntimeVersion); + + const auto inputRowType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputStream.GetStaticType())->GetItemType()); + const auto inputRowArg = Arg(inputRowType); + const auto partitionKeySelectorNode = getPartitionKeySelectorNode(inputRowArg); + + TStructTypeBuilder indexRangeTypeBuilder(Env); + indexRangeTypeBuilder.Add("From", TDataType::Create(NUdf::TDataType<ui64>::Id, Env)); + indexRangeTypeBuilder.Add("To", TDataType::Create(NUdf::TDataType<ui64>::Id, Env)); + const auto& rangeList = TListType::Create(indexRangeTypeBuilder.Build(), Env); + TStructTypeBuilder matchedVarsTypeBuilder(Env); + //assume simple pattern with one var "A" that always match TODO fixme + matchedVarsTypeBuilder.Add("A", rangeList); + TRuntimeNode matchedVarsArg = Arg(matchedVarsTypeBuilder.Build()); + + //---These vars may be empty in case of no measures + TRuntimeNode measureInputDataArg; + std::vector<TRuntimeNode> specialColumnIndexesInMeasureInputDataRow; + TVector<TRuntimeNode> measures; + TVector<TType*> measureTypes; + //--- + if (getMeasures.empty()) { + measureInputDataArg = Arg(Env.GetTypeOfVoid()); + } else { + using NYql::NMatchRecognize::MeasureInputDataSpecialColumns; + measures.reserve(getMeasures.size()); + measureTypes.reserve(getMeasures.size()); + specialColumnIndexesInMeasureInputDataRow.resize(static_cast<size_t>(NYql::NMatchRecognize::MeasureInputDataSpecialColumns::Last)); + TStructTypeBuilder measureInputDataRowTypeBuilder(Env); + for (ui32 i = 0; i != inputRowType->GetMembersCount(); ++i) { + measureInputDataRowTypeBuilder.Add(inputRowType->GetMemberName(i), inputRowType->GetMemberType(i)); + } + measureInputDataRowTypeBuilder.Add( + MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::Classifier), + TDataType::Create(NUdf::TDataType<NYql::NUdf::TUtf8>::Id, Env) + ); + measureInputDataRowTypeBuilder.Add( + MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::MatchNumber), + TDataType::Create(NUdf::TDataType<ui64>::Id, Env) + ); + const auto measureInputDataRowType = measureInputDataRowTypeBuilder.Build(); + + for (ui32 i = 0; i != measureInputDataRowType->GetMembersCount(); ++i) { + //assume a few, if grows, it's better to use a lookup table here + static_assert(static_cast<size_t>(MeasureInputDataSpecialColumns::Last) < 5); + for (size_t j = 0; j != static_cast<size_t>(MeasureInputDataSpecialColumns::Last); ++j) { + if (measureInputDataRowType->GetMemberName(i) == + NYql::NMatchRecognize::MeasureInputDataSpecialColumnName(static_cast<MeasureInputDataSpecialColumns>(j))) + specialColumnIndexesInMeasureInputDataRow[j] = NewDataLiteral<ui32>(i); + } + } + + measureInputDataArg = Arg(TListType::Create(measureInputDataRowType, Env)); + for (size_t i = 0; i != getMeasures.size(); ++i) { + measures.push_back(getMeasures[i].second(measureInputDataArg, matchedVarsArg)); + measureTypes.push_back(measures[i].GetStaticType()); + } + } + + TStructTypeBuilder outputRowTypeBuilder(Env); + THashMap<TStringBuf, size_t> partitionColumnLookup; + for (size_t i = 0; i != partitionColumns.size(); ++i) { + const auto& name = partitionColumns[i]; + partitionColumnLookup[name] = i; + outputRowTypeBuilder.Add( + name, + AS_TYPE(TTupleType, partitionKeySelectorNode.GetStaticType())->GetElementType(i) + ); + } + THashMap<TStringBuf, size_t> measureColumnLookup; + for (size_t i = 0; i != measures.size(); ++i) { + const auto& name = getMeasures[i].first; + measureColumnLookup[name] = i; + outputRowTypeBuilder.Add( + name, + measures[i].GetStaticType() + ); + } + auto outputRowType = outputRowTypeBuilder.Build(); + + std::vector<TRuntimeNode> partitionColumnIndexes(partitionColumnLookup.size()); + std::vector<TRuntimeNode> measureColumnIndexes(measureColumnLookup.size()); + for (ui32 i = 0; i != outputRowType->GetMembersCount(); ++i) { + if (auto it = partitionColumnLookup.find(outputRowType->GetMemberName(i)); it != partitionColumnLookup.end()) { + partitionColumnIndexes[it->second] = NewDataLiteral<ui32>(i); + } + else if (auto it = measureColumnLookup.find(outputRowType->GetMemberName(i)); it != measureColumnLookup.end()) { + measureColumnIndexes[it->second] = NewDataLiteral<ui32>(i); + } + } + auto outputType = (TType*)TFlowType::Create(outputRowType, Env); + + TCallableBuilder callableBuilder(GetTypeEnvironment(), "MatchRecognizeCore", outputType); + auto indexType = TDataType::Create(NUdf::TDataType<ui32>::Id, Env); + auto indexListType = TListType::Create(indexType, Env); + callableBuilder.Add(inputStream); + callableBuilder.Add(inputRowArg); + callableBuilder.Add(partitionKeySelectorNode); + callableBuilder.Add(TRuntimeNode(TListLiteral::Create(partitionColumnIndexes.data(), partitionColumnIndexes.size(), indexListType, Env), true)); + callableBuilder.Add(measureInputDataArg); + callableBuilder.Add(TRuntimeNode(TListLiteral::Create( + specialColumnIndexesInMeasureInputDataRow.data(), specialColumnIndexesInMeasureInputDataRow.size(), + indexListType, Env + ), + true)); + callableBuilder.Add(NewDataLiteral<ui32>(inputRowType->GetMembersCount())); + callableBuilder.Add(matchedVarsArg); + callableBuilder.Add(TRuntimeNode(TListLiteral::Create(measureColumnIndexes.data(), measureColumnIndexes.size(), indexListType, Env), true)); + for (const auto& m: measures) { + callableBuilder.Add(m); + } + return TRuntimeNode(callableBuilder.Build(), false); +} + bool CanExportType(TType* type, const TTypeEnvironment& env) { if (type->GetKind() == TType::EKind::Type) { return false; // Type of Type diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 8514e3c1b3..92aefa416c 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -4,6 +4,7 @@ #include "mkql_node.h" #include "mkql_node_builder.h" #include <ydb/library/yql/public/udf/udf_value.h> +#include <ydb/library/yql/core/sql_types/match_recognize.h> #include <functional> @@ -695,6 +696,13 @@ public: TRuntimeNode ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler); + TRuntimeNode MatchRecognizeCore( + TRuntimeNode inputStream, + const TUnaryLambda& getPartitionKeySelectorNode, + const TArrayRef<TStringBuf>& partitionColumns, + const TArrayRef<std::pair<TStringBuf, TBinaryLambda>>& getMeasures + ); + protected: TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args); TRuntimeNode IfPresent(TRuntimeNode optional, const TUnaryLambda& thenBranch, TRuntimeNode elseBranch); diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index 5f78ef9581..c263002da3 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 41U +#define MKQL_RUNTIME_VERSION 42U #endif // History: diff --git a/ydb/library/yql/minikql/ya.make b/ydb/library/yql/minikql/ya.make index 8c64d9eb3c..eb336034ce 100644 --- a/ydb/library/yql/minikql/ya.make +++ b/ydb/library/yql/minikql/ya.make @@ -69,6 +69,7 @@ PEERDIR( ydb/library/yql/public/udf ydb/library/yql/public/udf/tz ydb/library/yql/utils + ydb/library/yql/core/sql_types ydb/library/uuid ydb/public/lib/scheme_types ) diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 54309ba558..ce0b02f4da 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -852,6 +852,47 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.Nth(tupleObj, index); }); + AddCallable("MatchRecognizeCore", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto& inputStream = node.Child(0); + const auto& partitionKeySelector = node.Child(1); + const auto& partitionColumns = node.Child(2); + const auto& params = node.Child(3); + + //explore params + const auto& measures = params->ChildRef(0); + + //explore measures + const auto measureNames = measures->ChildRef(2); + constexpr size_t FirstMeasureLambdaIndex = 3; + + TVector<TStringBuf> partitionColumnNames; + for (const auto& n: partitionColumns->Children()) { + partitionColumnNames.push_back(n->Content()); + } + + TProgramBuilder::TUnaryLambda getPartitionKeySelector = [partitionKeySelector, &ctx](TRuntimeNode inputRowArg){ + return MkqlBuildLambda(*partitionKeySelector, ctx, {inputRowArg}); + }; + + TVector<std::pair<TStringBuf, TProgramBuilder::TBinaryLambda>> getMeasures(measureNames->ChildrenSize()); + for (size_t i = 0; i != measureNames->ChildrenSize(); ++i) { + getMeasures[i] = std::pair{ + measureNames->ChildRef(i)->Content(), + [i, measures, &ctx](TRuntimeNode data, TRuntimeNode matchedVars) { + return MkqlBuildLambda(*measures->ChildRef(FirstMeasureLambdaIndex + i), ctx, + {data, matchedVars}); + } + }; + } + + return ctx.ProgramBuilder.MatchRecognizeCore( + MkqlBuildExpr(*inputStream, ctx), + getPartitionKeySelector, + partitionColumnNames, + getMeasures + ); + }); + AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto variantObj = MkqlBuildExpr(node.Head(), ctx); auto type = node.Head().GetTypeAnn(); diff --git a/ydb/library/yql/sql/v1/sql_match_recognize.cpp b/ydb/library/yql/sql/v1/sql_match_recognize.cpp index db48964329..3b05ce6409 100644 --- a/ydb/library/yql/sql/v1/sql_match_recognize.cpp +++ b/ydb/library/yql/sql/v1/sql_match_recognize.cpp @@ -162,7 +162,7 @@ TNamedLambda TSqlMatchRecognizeClause::ParseOneMeasure(const TRule_row_pattern_m const auto& name = Id(node.GetRule_an_id3(), *this); //TODO https://st.yandex-team.ru/YQL-16186 //Each measure must be a lambda, that accepts 2 args: - // - List<InputTableColumns + _yql_Classifier, _yql_MatchNumber, _yqlMatchRowNumber> + // - List<InputTableColumns + _yql_Classifier, _yql_MatchNumber> // - Struct that maps row pattern variables to ranges in the queue return {expr, name}; } @@ -254,7 +254,7 @@ TRowPatternTerm TSqlMatchRecognizeClause::ParsePatternTerm(const TRule_row_patte Y_ENSURE("^" == std::get<0>(primary)); break; case TRule_row_pattern_primary::kAltRowPatternPrimary4: { - if (++PatternNestingLevel <= MaxMatchRecognizePatternNesting) { + if (++PatternNestingLevel <= NYql::NMatchRecognize::MaxPatternNesting) { primary = ParsePattern(primaryVar.GetAlt_row_pattern_primary4().GetBlock2().GetRule_row_pattern1()); } else { Ctx.Error(TokenPosition(primaryVar.GetAlt_row_pattern_primary4().GetToken1())) |