aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-08-29 20:40:16 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-08-30 01:54:50 +0300
commit83afbdd473100c991c643469c71299292a82b3f1 (patch)
treef54c279f0df2963fd52a524d992879eba10a7272
parenta81aa11915b2075c4c0436914432e01102271042 (diff)
downloadydb-83afbdd473100c991c643469c71299292a82b3f1.tar.gz
YQL-16325 match_recognize computation skeleton
-rw-r--r--ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt12
-rw-r--r--ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt12
-rw-r--r--ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt12
-rw-r--r--ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt12
-rw-r--r--ydb/library/yql/core/sql_types/match_recognize.h19
-rw-r--r--ydb/library/yql/core/sql_types/ya.make2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp14
-rw-r--r--ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp304
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ya.make.inc1
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp121
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h8
-rw-r--r--ydb/library/yql/minikql/mkql_runtime_version.h2
-rw-r--r--ydb/library/yql/minikql/ya.make1
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp41
-rw-r--r--ydb/library/yql/sql/v1/sql_match_recognize.cpp4
29 files changed, 575 insertions, 10 deletions
diff --git a/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt
index 6c1a4cb9ee..a715d1f3c1 100644
--- a/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/core/sql_types/CMakeLists.darwin-x86_64.txt
@@ -6,12 +6,24 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-core-sql_types)
target_link_libraries(yql-core-sql_types PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-core-sql_types PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp
)
+generate_enum_serilization(yql-core-sql_types
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h
+ INCLUDE_HEADERS
+ ydb/library/yql/core/sql_types/match_recognize.h
+)
diff --git a/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt
index e4181a0333..54472b4296 100644
--- a/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/core/sql_types/CMakeLists.linux-aarch64.txt
@@ -6,13 +6,25 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-core-sql_types)
target_link_libraries(yql-core-sql_types PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-core-sql_types PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp
)
+generate_enum_serilization(yql-core-sql_types
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h
+ INCLUDE_HEADERS
+ ydb/library/yql/core/sql_types/match_recognize.h
+)
diff --git a/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt
index e4181a0333..54472b4296 100644
--- a/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/core/sql_types/CMakeLists.linux-x86_64.txt
@@ -6,13 +6,25 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-core-sql_types)
target_link_libraries(yql-core-sql_types PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-core-sql_types PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp
)
+generate_enum_serilization(yql-core-sql_types
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h
+ INCLUDE_HEADERS
+ ydb/library/yql/core/sql_types/match_recognize.h
+)
diff --git a/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt
index 6c1a4cb9ee..a715d1f3c1 100644
--- a/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/core/sql_types/CMakeLists.windows-x86_64.txt
@@ -6,12 +6,24 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(yql-core-sql_types)
target_link_libraries(yql-core-sql_types PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-core-sql_types PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/simple_types.cpp
)
+generate_enum_serilization(yql-core-sql_types
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/sql_types/match_recognize.h
+ INCLUDE_HEADERS
+ ydb/library/yql/core/sql_types/match_recognize.h
+)
diff --git a/ydb/library/yql/core/sql_types/match_recognize.h b/ydb/library/yql/core/sql_types/match_recognize.h
index 4c0ce6e4f5..fc2d6881ff 100644
--- a/ydb/library/yql/core/sql_types/match_recognize.h
+++ b/ydb/library/yql/core/sql_types/match_recognize.h
@@ -1,8 +1,21 @@
#pragma once
+#include <util/generic/string.h>
+#include <util/string/cast.h>
#include <stddef.h>
-namespace NYql {
+namespace NYql::NMatchRecognize {
-constexpr size_t MaxMatchRecognizePatternNesting = 20; //Limit recursion
+constexpr size_t MaxPatternNesting = 20; //Limit recursion for patterns
-}//namespace NYql
+//Mixin columns for calculating measures
+enum class MeasureInputDataSpecialColumns {
+ Classifier = 0,
+ MatchNumber = 1,
+ Last
+};
+
+inline TString MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns c) {
+ return TString("_yql_") + ToString(c);
+}
+
+}//namespace NYql::NMatchRecognize
diff --git a/ydb/library/yql/core/sql_types/ya.make b/ydb/library/yql/core/sql_types/ya.make
index 0c38562ebc..2ccf3b837d 100644
--- a/ydb/library/yql/core/sql_types/ya.make
+++ b/ydb/library/yql/core/sql_types/ya.make
@@ -1,8 +1,10 @@
LIBRARY()
SRCS(
+ match_recognize.h
simple_types.h
simple_types.cpp
)
+GENERATE_ENUM_SERIALIZATION(match_recognize.h)
END()
diff --git a/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp b/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp
index 8eecc70c1a..ebf6d9d2c9 100644
--- a/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_match_recognize.cpp
@@ -63,7 +63,7 @@ const std::unordered_set<TString> GetPrimaryVars(const TExprNode::TPtr& pattern,
if (factor->ChildRef(0)->IsAtom()) {
result.insert(TString(factor->ChildRef(0)->Content()));
} else {
- YQL_ENSURE(nestingLevel < MaxMatchRecognizePatternNesting, "To big nesting level in the pattern");
+ YQL_ENSURE(nestingLevel < NYql::NMatchRecognize::MaxPatternNesting, "To big nesting level in the pattern");
auto subExprVars = GetPrimaryVars(factor->ChildRef(0), ctx, ++nestingLevel);
result.insert(subExprVars.begin(), subExprVars.end());
}
@@ -107,9 +107,15 @@ MatchRecognizeMeasuresWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& out
return IGraphTransformer::TStatus::Error;
}
- auto lambdaInputRowColumns = inputRowType->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems();
- lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>("_yql_Classifier", ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8)));
- lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>("_yql_MatchNumber", ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64)));
+ auto lambdaInputRowColumns = inputRowType->GetTypeAnn()
+ ->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems();
+ using NYql::NMatchRecognize::MeasureInputDataSpecialColumns;
+ lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>(
+ MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::Classifier),
+ ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8)));
+ lambdaInputRowColumns.push_back(ctx.Expr.MakeType<TItemExprType>(
+ MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::MatchNumber),
+ ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
auto lambdaInputRowType = ctx.Expr.MakeType<TStructExprType>(lambdaInputRowColumns);
const auto& matchedRowsRanges = GetMatchedRowsRangesType(pattern, ctx);
YQL_ENSURE(matchedRowsRanges);
diff --git a/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt
index 8be56fe7c9..09684df2ae 100644
--- a/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/CMakeLists.darwin-x86_64.txt
@@ -42,6 +42,7 @@ target_link_libraries(library-yql-minikql PUBLIC
yql-public-udf
public-udf-tz
library-yql-utils
+ yql-core-sql_types
ydb-library-uuid
public-lib-scheme_types
)
diff --git a/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt
index 569f8b35ff..af687a328e 100644
--- a/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt
@@ -43,6 +43,7 @@ target_link_libraries(library-yql-minikql PUBLIC
yql-public-udf
public-udf-tz
library-yql-utils
+ yql-core-sql_types
ydb-library-uuid
public-lib-scheme_types
)
diff --git a/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt
index 569f8b35ff..af687a328e 100644
--- a/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/CMakeLists.linux-x86_64.txt
@@ -43,6 +43,7 @@ target_link_libraries(library-yql-minikql PUBLIC
yql-public-udf
public-udf-tz
library-yql-utils
+ yql-core-sql_types
ydb-library-uuid
public-lib-scheme_types
)
diff --git a/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt
index 8be56fe7c9..09684df2ae 100644
--- a/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/CMakeLists.windows-x86_64.txt
@@ -42,6 +42,7 @@ target_link_libraries(library-yql-minikql PUBLIC
yql-public-udf
public-udf-tz
library-yql-utils
+ yql-core-sql_types
ydb-library-uuid
public-lib-scheme_types
)
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
index db7bf3a22f..780ea80c0a 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.darwin-x86_64.txt
@@ -106,6 +106,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
index 18a9e65c47..f279be85fb 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-aarch64.txt
@@ -107,6 +107,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
index 18a9e65c47..f279be85fb 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.linux-x86_64.txt
@@ -107,6 +107,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
index db7bf3a22f..780ea80c0a 100644
--- a/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/llvm/CMakeLists.windows-x86_64.txt
@@ -106,6 +106,7 @@ target_sources(minikql-comp_nodes-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index d882c5f2f6..45ee823be4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -63,6 +63,7 @@
#include "mkql_map.h"
#include "mkql_mapnext.h"
#include "mkql_map_join.h"
+#include "mkql_match_recognize.h"
#include "mkql_multimap.h"
#include "mkql_next_value.h"
#include "mkql_nop.h"
@@ -342,6 +343,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"RoundDown", &WrapRound},
{"NextValue", &WrapNextValue},
{"Nop", &WrapNop},
+ {"MatchRecognizeCore", &WrapMatchRecognizeCore},
};
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
new file mode 100644
index 0000000000..99b5b932be
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
@@ -0,0 +1,304 @@
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_runtime_version.h>
+#include <ydb/library/yql/core/sql_types/match_recognize.h>
+
+namespace NKikimr::NMiniKQL {
+
+namespace {
+
+enum class EMeasureColumnSource {Classifier = 0, MatchNumber = 1, Input};
+using TMeasureInputColumnOrder = std::vector<std::pair<EMeasureColumnSource, size_t>>;
+
+enum class EOutputColumnSource {PartitionKey, Measure};
+using TOutputColumnOrder = std::vector<std::pair<EOutputColumnSource, size_t>>;
+
+//Process one partition of input data
+struct IProcessMatchRecognize {
+ ///return true if it has output data ready
+ virtual bool ProcessInputRow(NUdf::TUnboxedValue&& row) = 0;
+ virtual NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) = 0;
+ virtual bool ProcessEndOfData() = 0;
+ virtual ~IProcessMatchRecognize(){}
+};
+
+class TStreamMatchRecognize: public IProcessMatchRecognize {
+public:
+ TStreamMatchRecognize(
+ NUdf::TUnboxedValue&& partitionKey,
+ std::vector<IComputationNode*>& measures,
+ const TOutputColumnOrder& outputColumnOrder,
+ const TContainerCacheOnContext& cache
+ )
+ : PartitionKey(std::move(partitionKey))
+ , Measures(measures)
+ , OutputColumnOrder(outputColumnOrder)
+ , Cache(cache)
+ , HasMatch(false)
+ {
+ }
+
+ bool ProcessInputRow(NUdf::TUnboxedValue&& row) override{
+ //Assume match on every row, TODO fixme
+ Y_UNUSED(row);
+ HasMatch = true;
+ return HasMatch;
+ }
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
+ if (!HasMatch)
+ return NUdf::TUnboxedValue::Invalid();
+ HasMatch = false;
+ NUdf::TUnboxedValue *itemsPtr = nullptr;
+ const auto result = Cache.NewArray(ctx, OutputColumnOrder.size(), itemsPtr);
+ for (auto const& c: OutputColumnOrder) {
+ switch(c.first) {
+ case EOutputColumnSource::Measure:
+ *itemsPtr++ = Measures[c.second]->GetValue(ctx);
+ break;
+ case EOutputColumnSource::PartitionKey:
+ *itemsPtr++ = PartitionKey.GetElement(c.second);
+ break;
+ default:
+ MKQL_ENSURE(false, "Internal logic error");
+ }
+ }
+ return result;
+ }
+ bool ProcessEndOfData() override {
+ //TODO
+ return false;
+ }
+private:
+ const NUdf::TUnboxedValue PartitionKey;
+ const std::vector<IComputationNode*>& Measures;
+ const TOutputColumnOrder& OutputColumnOrder;
+ const TContainerCacheOnContext& Cache;
+ bool HasMatch;
+
+};
+
+
+class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecognizeWrapper> {
+ using TBaseComputation = TStatefulFlowComputationNode<TMatchRecognizeWrapper>;
+public:
+ TMatchRecognizeWrapper(TComputationMutables &mutables, EValueRepresentation kind, IComputationNode *inputFlow,
+ IComputationExternalNode *inputRowArg,
+ IComputationNode *partitionKey,
+ TType* partitionKeyType,
+ std::vector<IComputationNode*>&& measures,
+ TOutputColumnOrder&& outputColumnOrder
+ )
+ : TBaseComputation(mutables, inputFlow, kind, EValueRepresentation::Embedded)
+ , InputFlow(inputFlow)
+ , InputRowArg(inputRowArg)
+ , PartitionKey(partitionKey)
+ , PartitionKeyType(partitionKeyType)
+ , Measures(measures)
+ , OutputColumnOrder(outputColumnOrder)
+ , Cache(mutables)
+ {}
+
+ NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue &stateValue, TComputationContext &ctx) const {
+ if (stateValue.IsInvalid()) {
+ stateValue = ctx.HolderFactory.Create<TState>(
+ InputRowArg,
+ PartitionKey,
+ PartitionKeyType,
+ Measures,
+ OutputColumnOrder,
+ Cache
+ );
+ }
+ auto& state = *static_cast<TState *>(stateValue.AsBoxed().Get());
+ while (true) {
+ if (auto output = state.GetOutputIfReady(ctx); !output.IsInvalid()) {
+ return output;
+ }
+ auto item = InputFlow->GetValue(ctx);
+ if (item.IsFinish()) {
+ state.ProcessEndOfData();
+ continue;
+ } else if (item.IsSpecial()) {
+ return item;
+ }
+ state.ProcessInputRow(std::move(item), ctx);
+ }
+ }
+private:
+
+ class TState: public TComputationValue<TState> {
+ using TPartitionMap = std::unordered_map<TString, std::unique_ptr<IProcessMatchRecognize>>;
+ public:
+ TState(
+ TMemoryUsageInfo* memInfo,
+ IComputationExternalNode* inputRowArg,
+ IComputationNode* partitionKey,
+ TType* partitionKeyType,
+ const std::vector<IComputationNode*>& measures,
+ const TOutputColumnOrder& outputColumnOrder,
+ const TContainerCacheOnContext& cache
+ )
+ : TComputationValue<TState>(memInfo)
+ , InputRowArg(inputRowArg)
+ , PartitionKey(partitionKey)
+ , PartitionKeyPacker(true, partitionKeyType)
+ , Measures(measures)
+ , OutputColumnOrder(outputColumnOrder)
+ , Cache(cache)
+ {
+ }
+
+ void ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
+ auto partition = GetPartitionHandler(row, ctx);
+ if (partition->second->ProcessInputRow(std::move(row))) {
+ HasReadyOutput.push(partition);
+ }
+ }
+
+ void ProcessEndOfData() {
+ for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
+ auto b = it->second->ProcessEndOfData();
+ if (b) {
+ HasReadyOutput.push(it);
+ }
+ }
+ Terminating = true;
+ }
+
+ NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
+ while (!HasReadyOutput.empty()) {
+ auto r = HasReadyOutput.top()->second->GetOutputIfReady(ctx);
+ if (r.IsInvalid()) {
+ //dried up
+ HasReadyOutput.pop();
+ continue;
+ } else {
+ return r;
+ }
+ }
+ return Terminating ? NUdf::TUnboxedValue::MakeFinish() : NUdf::TUnboxedValue::Invalid();
+ }
+
+ private:
+ TPartitionMap::iterator GetPartitionHandler(const NUdf::TUnboxedValue& row, TComputationContext &ctx) {
+ InputRowArg->SetValue(ctx, NUdf::TUnboxedValue(row));
+ auto partitionKey = PartitionKey->GetValue(ctx);
+ const auto packedKey = PartitionKeyPacker.Pack(partitionKey);
+ if (const auto it = Partitions.find(TString(packedKey)); it != Partitions.end()) {
+ return it;
+ } else {
+ return Partitions.emplace_hint(it, TString(packedKey), std::make_unique<TStreamMatchRecognize>(
+ std::move(partitionKey),
+ Measures,
+ OutputColumnOrder,
+ Cache
+ ));
+ }
+ }
+
+ private:
+ //for this class
+ TPartitionMap Partitions;
+ std::stack<TPartitionMap::iterator> HasReadyOutput;
+ bool Terminating = false;
+
+ IComputationExternalNode* InputRowArg;
+ IComputationNode* PartitionKey;
+ //TODO switch to tuple compare
+ TValuePackerGeneric<false> PartitionKeyPacker;
+
+ //to be passed to partitions
+ std::vector<IComputationNode*> Measures;
+ const TOutputColumnOrder& OutputColumnOrder;
+ const TContainerCacheOnContext& Cache;
+ };
+
+private:
+ void RegisterDependencies() const final {
+ if (const auto flow = FlowDependsOn(InputFlow)) {
+ Own(flow, InputRowArg);
+ }
+ DependsOn(PartitionKey, InputRowArg);
+ }
+
+ IComputationNode* const InputFlow;
+ IComputationExternalNode* const InputRowArg;
+ IComputationNode* const PartitionKey;
+ TType* const PartitionKeyType;
+ std::vector<IComputationNode*> Measures;
+ TOutputColumnOrder OutputColumnOrder;
+ const TContainerCacheOnContext Cache;
+};
+
+TOutputColumnOrder GetOutputColumnOrder(TRuntimeNode partitionKyeColumnsIndexes, TRuntimeNode measureColumnsIndexes) {
+ std::unordered_map<size_t, std::pair<EOutputColumnSource, size_t>> temp;
+ {
+ auto list = AS_VALUE(TListLiteral, partitionKyeColumnsIndexes);
+ for (ui32 i = 0; i != list->GetItemsCount(); ++i) {
+ auto index = AS_VALUE(TDataLiteral, list->GetItems()[i])->AsValue().Get<ui32>();
+ temp[index] = std::make_pair(EOutputColumnSource::PartitionKey, i);
+ }
+ }
+ {
+ auto list = AS_VALUE(TListLiteral, measureColumnsIndexes);
+ for (ui32 i = 0; i != list->GetItemsCount(); ++i) {
+ auto index = AS_VALUE(TDataLiteral, list->GetItems()[i])->AsValue().Get<ui32>();
+ temp[index] = std::make_pair(EOutputColumnSource::Measure, i);
+ }
+ }
+ if (temp.empty())
+ return {};
+ auto outputSize = max_element(temp.cbegin(), temp.cend())->first + 1;
+ TOutputColumnOrder result(outputSize);
+ for (const auto& [i, v]: temp) {
+ result[i] = v;
+ }
+ return result;
+}
+
+std::vector<IComputationNode*> ConvertVectorOfCallables(const std::vector<TRuntimeNode>& v, const TComputationNodeFactoryContext& ctx) {
+ std::vector<IComputationNode*> result;
+ result.reserve(v.size());
+ for (auto& c: v) {
+ result.push_back(LocateNode(ctx.NodeLocator, *c.GetNode()));
+ }
+ return result;
+}
+
+} //namespace
+
+
+IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ size_t inputIndex = 0;
+ const auto& inputFlow = callable.GetInput(inputIndex++);
+ const auto& inputRowArg = callable.GetInput(inputIndex++);
+ const auto& partitionKeySelector = callable.GetInput(inputIndex++);
+ const auto& partitionColumnIndexes = callable.GetInput(inputIndex++);
+ const auto& measureInputDataArg = callable.GetInput(inputIndex++);
+ const auto& measureSpecialColumnIndexes = callable.GetInput(inputIndex++);
+ const auto& inputRowColumnCount = callable.GetInput(inputIndex++);
+ const auto& matchedRangesArg = callable.GetInput(inputIndex++);
+ const auto& measureColumnIndexes = callable.GetInput(inputIndex++);
+ std::vector<TRuntimeNode> measures;
+ for (size_t i = 0; i != AS_VALUE(TListLiteral, measureColumnIndexes)->GetItemsCount(); ++i) {
+ measures.push_back(callable.GetInput(inputIndex++));
+ }
+ MKQL_ENSURE(callable.GetInputsCount() == inputIndex, "Wrong input count");
+ Y_UNUSED(measureInputDataArg);
+ Y_UNUSED(measureSpecialColumnIndexes);
+ Y_UNUSED(inputRowColumnCount);
+ Y_UNUSED(matchedRangesArg);
+
+ return new TMatchRecognizeWrapper(ctx.Mutables, GetValueRepresentation(inputFlow.GetStaticType())
+ , LocateNode(ctx.NodeLocator, *inputFlow.GetNode())
+ , static_cast<IComputationExternalNode*>(LocateNode(ctx.NodeLocator, *inputRowArg.GetNode()))
+ , LocateNode(ctx.NodeLocator, *partitionKeySelector.GetNode())
+ , partitionKeySelector.GetStaticType()
+ , ConvertVectorOfCallables(measures, ctx)
+ , GetOutputColumnOrder(partitionColumnIndexes, measureColumnIndexes)
+ );
+}
+
+} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h
new file mode 100644
index 0000000000..858200d144
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.h
@@ -0,0 +1,6 @@
+#pragma once
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+
+namespace NKikimr::NMiniKQL {
+ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt
index f10e5ccc39..e40064d4a6 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.darwin-x86_64.txt
@@ -102,6 +102,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt
index 96b0c048ea..01c73e57af 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-aarch64.txt
@@ -103,6 +103,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt
index 96b0c048ea..01c73e57af 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.linux-x86_64.txt
@@ -103,6 +103,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt
index f10e5ccc39..e40064d4a6 100644
--- a/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/no_llvm/CMakeLists.windows-x86_64.txt
@@ -102,6 +102,7 @@ target_sources(minikql-comp_nodes-no_llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_mapnext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make.inc b/ydb/library/yql/minikql/comp_nodes/ya.make.inc
index 1a84c6e9ba..d4315842e7 100644
--- a/ydb/library/yql/minikql/comp_nodes/ya.make.inc
+++ b/ydb/library/yql/minikql/comp_nodes/ya.make.inc
@@ -73,6 +73,7 @@ SRCS(
mkql_map.cpp
mkql_mapnext.cpp
mkql_map_join.cpp
+ mkql_match_recognize.cpp
mkql_multihopping.cpp
mkql_multimap.cpp
mkql_next_value.cpp
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 7f19b235d4..27be8ff034 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -7,6 +7,7 @@
#include "ydb/library/yql/minikql/mkql_function_registry.h"
#include "ydb/library/yql/minikql/mkql_utils.h"
#include "ydb/library/yql/minikql/mkql_type_builder.h"
+#include "ydb/library/yql/core/sql_types/match_recognize.h"
#include <util/string/cast.h>
#include <util/string/printf.h>
@@ -5790,6 +5791,126 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a
return TRuntimeNode(builder.Build(), false);
}
+TRuntimeNode TProgramBuilder::MatchRecognizeCore(
+ TRuntimeNode inputStream,
+ const TUnaryLambda& getPartitionKeySelectorNode,
+ const TArrayRef<TStringBuf>& partitionColumns,
+ const TArrayRef<std::pair<TStringBuf, TBinaryLambda>>& getMeasures
+) {
+ MKQL_ENSURE(RuntimeVersion >= 42, "MatchRecognize is not supported in runtime version " << RuntimeVersion);
+
+ const auto inputRowType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputStream.GetStaticType())->GetItemType());
+ const auto inputRowArg = Arg(inputRowType);
+ const auto partitionKeySelectorNode = getPartitionKeySelectorNode(inputRowArg);
+
+ TStructTypeBuilder indexRangeTypeBuilder(Env);
+ indexRangeTypeBuilder.Add("From", TDataType::Create(NUdf::TDataType<ui64>::Id, Env));
+ indexRangeTypeBuilder.Add("To", TDataType::Create(NUdf::TDataType<ui64>::Id, Env));
+ const auto& rangeList = TListType::Create(indexRangeTypeBuilder.Build(), Env);
+ TStructTypeBuilder matchedVarsTypeBuilder(Env);
+ //assume simple pattern with one var "A" that always match TODO fixme
+ matchedVarsTypeBuilder.Add("A", rangeList);
+ TRuntimeNode matchedVarsArg = Arg(matchedVarsTypeBuilder.Build());
+
+ //---These vars may be empty in case of no measures
+ TRuntimeNode measureInputDataArg;
+ std::vector<TRuntimeNode> specialColumnIndexesInMeasureInputDataRow;
+ TVector<TRuntimeNode> measures;
+ TVector<TType*> measureTypes;
+ //---
+ if (getMeasures.empty()) {
+ measureInputDataArg = Arg(Env.GetTypeOfVoid());
+ } else {
+ using NYql::NMatchRecognize::MeasureInputDataSpecialColumns;
+ measures.reserve(getMeasures.size());
+ measureTypes.reserve(getMeasures.size());
+ specialColumnIndexesInMeasureInputDataRow.resize(static_cast<size_t>(NYql::NMatchRecognize::MeasureInputDataSpecialColumns::Last));
+ TStructTypeBuilder measureInputDataRowTypeBuilder(Env);
+ for (ui32 i = 0; i != inputRowType->GetMembersCount(); ++i) {
+ measureInputDataRowTypeBuilder.Add(inputRowType->GetMemberName(i), inputRowType->GetMemberType(i));
+ }
+ measureInputDataRowTypeBuilder.Add(
+ MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::Classifier),
+ TDataType::Create(NUdf::TDataType<NYql::NUdf::TUtf8>::Id, Env)
+ );
+ measureInputDataRowTypeBuilder.Add(
+ MeasureInputDataSpecialColumnName(MeasureInputDataSpecialColumns::MatchNumber),
+ TDataType::Create(NUdf::TDataType<ui64>::Id, Env)
+ );
+ const auto measureInputDataRowType = measureInputDataRowTypeBuilder.Build();
+
+ for (ui32 i = 0; i != measureInputDataRowType->GetMembersCount(); ++i) {
+ //assume a few, if grows, it's better to use a lookup table here
+ static_assert(static_cast<size_t>(MeasureInputDataSpecialColumns::Last) < 5);
+ for (size_t j = 0; j != static_cast<size_t>(MeasureInputDataSpecialColumns::Last); ++j) {
+ if (measureInputDataRowType->GetMemberName(i) ==
+ NYql::NMatchRecognize::MeasureInputDataSpecialColumnName(static_cast<MeasureInputDataSpecialColumns>(j)))
+ specialColumnIndexesInMeasureInputDataRow[j] = NewDataLiteral<ui32>(i);
+ }
+ }
+
+ measureInputDataArg = Arg(TListType::Create(measureInputDataRowType, Env));
+ for (size_t i = 0; i != getMeasures.size(); ++i) {
+ measures.push_back(getMeasures[i].second(measureInputDataArg, matchedVarsArg));
+ measureTypes.push_back(measures[i].GetStaticType());
+ }
+ }
+
+ TStructTypeBuilder outputRowTypeBuilder(Env);
+ THashMap<TStringBuf, size_t> partitionColumnLookup;
+ for (size_t i = 0; i != partitionColumns.size(); ++i) {
+ const auto& name = partitionColumns[i];
+ partitionColumnLookup[name] = i;
+ outputRowTypeBuilder.Add(
+ name,
+ AS_TYPE(TTupleType, partitionKeySelectorNode.GetStaticType())->GetElementType(i)
+ );
+ }
+ THashMap<TStringBuf, size_t> measureColumnLookup;
+ for (size_t i = 0; i != measures.size(); ++i) {
+ const auto& name = getMeasures[i].first;
+ measureColumnLookup[name] = i;
+ outputRowTypeBuilder.Add(
+ name,
+ measures[i].GetStaticType()
+ );
+ }
+ auto outputRowType = outputRowTypeBuilder.Build();
+
+ std::vector<TRuntimeNode> partitionColumnIndexes(partitionColumnLookup.size());
+ std::vector<TRuntimeNode> measureColumnIndexes(measureColumnLookup.size());
+ for (ui32 i = 0; i != outputRowType->GetMembersCount(); ++i) {
+ if (auto it = partitionColumnLookup.find(outputRowType->GetMemberName(i)); it != partitionColumnLookup.end()) {
+ partitionColumnIndexes[it->second] = NewDataLiteral<ui32>(i);
+ }
+ else if (auto it = measureColumnLookup.find(outputRowType->GetMemberName(i)); it != measureColumnLookup.end()) {
+ measureColumnIndexes[it->second] = NewDataLiteral<ui32>(i);
+ }
+ }
+ auto outputType = (TType*)TFlowType::Create(outputRowType, Env);
+
+ TCallableBuilder callableBuilder(GetTypeEnvironment(), "MatchRecognizeCore", outputType);
+ auto indexType = TDataType::Create(NUdf::TDataType<ui32>::Id, Env);
+ auto indexListType = TListType::Create(indexType, Env);
+ callableBuilder.Add(inputStream);
+ callableBuilder.Add(inputRowArg);
+ callableBuilder.Add(partitionKeySelectorNode);
+ callableBuilder.Add(TRuntimeNode(TListLiteral::Create(partitionColumnIndexes.data(), partitionColumnIndexes.size(), indexListType, Env), true));
+ callableBuilder.Add(measureInputDataArg);
+ callableBuilder.Add(TRuntimeNode(TListLiteral::Create(
+ specialColumnIndexesInMeasureInputDataRow.data(), specialColumnIndexesInMeasureInputDataRow.size(),
+ indexListType, Env
+ ),
+ true));
+ callableBuilder.Add(NewDataLiteral<ui32>(inputRowType->GetMembersCount()));
+ callableBuilder.Add(matchedVarsArg);
+ callableBuilder.Add(TRuntimeNode(TListLiteral::Create(measureColumnIndexes.data(), measureColumnIndexes.size(), indexListType, Env), true));
+ for (const auto& m: measures) {
+ callableBuilder.Add(m);
+ }
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
bool CanExportType(TType* type, const TTypeEnvironment& env) {
if (type->GetKind() == TType::EKind::Type) {
return false; // Type of Type
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 8514e3c1b3..92aefa416c 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -4,6 +4,7 @@
#include "mkql_node.h"
#include "mkql_node_builder.h"
#include <ydb/library/yql/public/udf/udf_value.h>
+#include <ydb/library/yql/core/sql_types/match_recognize.h>
#include <functional>
@@ -695,6 +696,13 @@ public:
TRuntimeNode ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler);
+ TRuntimeNode MatchRecognizeCore(
+ TRuntimeNode inputStream,
+ const TUnaryLambda& getPartitionKeySelectorNode,
+ const TArrayRef<TStringBuf>& partitionColumns,
+ const TArrayRef<std::pair<TStringBuf, TBinaryLambda>>& getMeasures
+ );
+
protected:
TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args);
TRuntimeNode IfPresent(TRuntimeNode optional, const TUnaryLambda& thenBranch, TRuntimeNode elseBranch);
diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h
index 5f78ef9581..c263002da3 100644
--- a/ydb/library/yql/minikql/mkql_runtime_version.h
+++ b/ydb/library/yql/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 41U
+#define MKQL_RUNTIME_VERSION 42U
#endif
// History:
diff --git a/ydb/library/yql/minikql/ya.make b/ydb/library/yql/minikql/ya.make
index 8c64d9eb3c..eb336034ce 100644
--- a/ydb/library/yql/minikql/ya.make
+++ b/ydb/library/yql/minikql/ya.make
@@ -69,6 +69,7 @@ PEERDIR(
ydb/library/yql/public/udf
ydb/library/yql/public/udf/tz
ydb/library/yql/utils
+ ydb/library/yql/core/sql_types
ydb/library/uuid
ydb/public/lib/scheme_types
)
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 54309ba558..ce0b02f4da 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -852,6 +852,47 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.Nth(tupleObj, index);
});
+ AddCallable("MatchRecognizeCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ const auto& inputStream = node.Child(0);
+ const auto& partitionKeySelector = node.Child(1);
+ const auto& partitionColumns = node.Child(2);
+ const auto& params = node.Child(3);
+
+ //explore params
+ const auto& measures = params->ChildRef(0);
+
+ //explore measures
+ const auto measureNames = measures->ChildRef(2);
+ constexpr size_t FirstMeasureLambdaIndex = 3;
+
+ TVector<TStringBuf> partitionColumnNames;
+ for (const auto& n: partitionColumns->Children()) {
+ partitionColumnNames.push_back(n->Content());
+ }
+
+ TProgramBuilder::TUnaryLambda getPartitionKeySelector = [partitionKeySelector, &ctx](TRuntimeNode inputRowArg){
+ return MkqlBuildLambda(*partitionKeySelector, ctx, {inputRowArg});
+ };
+
+ TVector<std::pair<TStringBuf, TProgramBuilder::TBinaryLambda>> getMeasures(measureNames->ChildrenSize());
+ for (size_t i = 0; i != measureNames->ChildrenSize(); ++i) {
+ getMeasures[i] = std::pair{
+ measureNames->ChildRef(i)->Content(),
+ [i, measures, &ctx](TRuntimeNode data, TRuntimeNode matchedVars) {
+ return MkqlBuildLambda(*measures->ChildRef(FirstMeasureLambdaIndex + i), ctx,
+ {data, matchedVars});
+ }
+ };
+ }
+
+ return ctx.ProgramBuilder.MatchRecognizeCore(
+ MkqlBuildExpr(*inputStream, ctx),
+ getPartitionKeySelector,
+ partitionColumnNames,
+ getMeasures
+ );
+ });
+
AddCallable("Guess", [](const TExprNode& node, TMkqlBuildContext& ctx) {
const auto variantObj = MkqlBuildExpr(node.Head(), ctx);
auto type = node.Head().GetTypeAnn();
diff --git a/ydb/library/yql/sql/v1/sql_match_recognize.cpp b/ydb/library/yql/sql/v1/sql_match_recognize.cpp
index db48964329..3b05ce6409 100644
--- a/ydb/library/yql/sql/v1/sql_match_recognize.cpp
+++ b/ydb/library/yql/sql/v1/sql_match_recognize.cpp
@@ -162,7 +162,7 @@ TNamedLambda TSqlMatchRecognizeClause::ParseOneMeasure(const TRule_row_pattern_m
const auto& name = Id(node.GetRule_an_id3(), *this);
//TODO https://st.yandex-team.ru/YQL-16186
//Each measure must be a lambda, that accepts 2 args:
- // - List<InputTableColumns + _yql_Classifier, _yql_MatchNumber, _yqlMatchRowNumber>
+ // - List<InputTableColumns + _yql_Classifier, _yql_MatchNumber>
// - Struct that maps row pattern variables to ranges in the queue
return {expr, name};
}
@@ -254,7 +254,7 @@ TRowPatternTerm TSqlMatchRecognizeClause::ParsePatternTerm(const TRule_row_patte
Y_ENSURE("^" == std::get<0>(primary));
break;
case TRule_row_pattern_primary::kAltRowPatternPrimary4: {
- if (++PatternNestingLevel <= MaxMatchRecognizePatternNesting) {
+ if (++PatternNestingLevel <= NYql::NMatchRecognize::MaxPatternNesting) {
primary = ParsePattern(primaryVar.GetAlt_row_pattern_primary4().GetBlock2().GetRule_row_pattern1());
} else {
Ctx.Error(TokenPosition(primaryVar.GetAlt_row_pattern_primary4().GetToken1()))