summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yql/essentials/core/expr_nodes/yql_expr_nodes.json8
-rw-r--r--yql/essentials/core/sql_types/time_order_recover.h6
-rw-r--r--yql/essentials/core/yql_opt_match_recognize.cpp308
-rw-r--r--yql/essentials/tests/sql/minirun/part1/canondata/result.json14
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json12
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql56
-rw-r--r--yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql64
7 files changed, 312 insertions, 156 deletions
diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json
index 76e556533f7..09de074b5ed 100644
--- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json
+++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json
@@ -1563,10 +1563,10 @@
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "MatchRecognize"},
"Children": [
- {"Index": 1, "Name": "partitionKeySelector", "Type": "TExprBase"},
- {"Index": 2, "Name": "partitionColumns", "Type": "TExprBase"},
- {"Index": 3, "Name": "sortTraits", "Type": "TExprBase"},
- {"Index": 4, "Name": "params", "Type": "TExprBase"}
+ {"Index": 1, "Name": "PartitionKeySelector", "Type": "TExprBase"},
+ {"Index": 2, "Name": "PartitionColumns", "Type": "TExprBase"},
+ {"Index": 3, "Name": "SortTraits", "Type": "TExprBase"},
+ {"Index": 4, "Name": "Params", "Type": "TExprBase"}
]
},
{
diff --git a/yql/essentials/core/sql_types/time_order_recover.h b/yql/essentials/core/sql_types/time_order_recover.h
index fd6ef0b61a7..590f73b7c0c 100644
--- a/yql/essentials/core/sql_types/time_order_recover.h
+++ b/yql/essentials/core/sql_types/time_order_recover.h
@@ -1,9 +1,9 @@
#pragma once
+
#include <string_view>
namespace NYql::NTimeOrderRecover {
-using namespace std::string_view_literals;
-constexpr auto OUT_OF_ORDER_MARKER = "_yql_OutOfOrder"sv;
+inline constexpr std::string_view OUT_OF_ORDER_MARKER = "_yql_OutOfOrder";
-}//namespace NYql::NMatchRecognize
+} // namespace NYql::NTimeOrderRecover
diff --git a/yql/essentials/core/yql_opt_match_recognize.cpp b/yql/essentials/core/yql_opt_match_recognize.cpp
index 3efae9e42b4..e8f986fb232 100644
--- a/yql/essentials/core/yql_opt_match_recognize.cpp
+++ b/yql/essentials/core/yql_opt_match_recognize.cpp
@@ -1,8 +1,11 @@
#include "yql_opt_match_recognize.h"
+
#include "yql_opt_utils.h"
+
#include <yql/essentials/core/sql_types/time_order_recover.h>
#include <yql/essentials/core/yql_expr_optimize.h>
#include <yql/essentials/utils/log/log.h>
+#include <ranges>
namespace NYql {
@@ -11,24 +14,22 @@ using namespace NNodes;
namespace {
bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
- if (EMatchRecognizeStreamingMode::Disable == typeAnnCtx.MatchRecognizeStreaming){
- return false;
- }
- if (EMatchRecognizeStreamingMode::Force == typeAnnCtx.MatchRecognizeStreaming){
- return true;
- }
-
- YQL_ENSURE(EMatchRecognizeStreamingMode::Auto == typeAnnCtx.MatchRecognizeStreaming, "Internal logic error");
-
- bool hasPq = false;
- NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node){
- if (node->IsCallable("DataSource")) {
- YQL_ENSURE(node->ChildrenSize() > 0 and node->Child(0)->IsAtom());
- hasPq = node->Child(0)->Content() == "pq";
+ switch (typeAnnCtx.MatchRecognizeStreaming) {
+ case EMatchRecognizeStreamingMode::Disable:
+ return false;
+ case EMatchRecognizeStreamingMode::Force:
+ return true;
+ case EMatchRecognizeStreamingMode::Auto: {
+ bool hasPq = false;
+ NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node) {
+ if (auto maybeDataSource = TExprBase(node).Maybe<TCoDataSource>()) {
+ hasPq = maybeDataSource.Cast().Category().Value() == "pq";
+ }
+ return !hasPq;
+ });
+ return hasPq;
}
- return !hasPq;
- });
- return hasPq;
+ }
}
TExprNode::TPtr ExpandMatchRecognizeMeasuresAggregates(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& /* typeAnnCtx */) {
@@ -138,16 +139,16 @@ THashSet<TStringBuf> FindUsedVars(const TExprNode::TPtr& params) {
const auto defines = params->Child(4);
static constexpr size_t defineLambdasStartPos = 3;
- for (auto i = defineLambdasStartPos; i < defines->ChildrenSize(); ++i) {
- const auto lambda = defines->Child(i);
- const auto lambdaArgs = lambda->Child(0);
- const auto lambdaBody = lambda->ChildPtr(1);
- const auto varsArg = lambdaArgs->Child(1);
+ for (const auto& define : defines->Children() | std::views::drop(defineLambdasStartPos)) {
+ const auto lambda = TCoLambda(define);
+ const auto varsArg = lambda.Args().Arg(1).Ptr();
+ const auto lambdaBody = lambda.Body().Ptr();
NYql::VisitExpr(
lambdaBody,
[varsArg, &result](const TExprNode::TPtr& node) {
- if (node->IsCallable("Member") && node->Child(0) == varsArg) {
- result.insert(node->Child(1)->Content());
+ if (auto maybeMember = TMaybeNode<TCoMember>(node);
+ maybeMember && maybeMember.Cast().Struct().Ptr() == varsArg) {
+ result.insert(maybeMember.Cast().Name().Value());
return false;
}
return true;
@@ -158,164 +159,173 @@ THashSet<TStringBuf> FindUsedVars(const TExprNode::TPtr& params) {
return result;
}
-TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet<TStringBuf>& usedVars, TStringBuf rowsPerMatch) {
+TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet<TStringBuf>& usedVars, const TExprNode::TPtr& rowsPerMatch) {
const auto pos = node->Pos();
- if (node->ChildrenSize() != 0 && node->Child(0)->IsAtom()) {
+ if (node->ChildrenSize() == 6 && node->Child(0)->IsAtom()) {
const auto varName = node->Child(0)->Content();
const auto output = FromString<bool>(node->Child(4)->Content());
- const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch || !output) && !usedVars.contains(varName);
- return ctx.Builder(pos)
- .List()
- .Add(0, node->ChildPtr(0))
- .Add(1, node->ChildPtr(1))
- .Add(2, node->ChildPtr(2))
- .Add(3, node->ChildPtr(3))
- .Add(4, node->ChildPtr(4))
- .Add(5, ctx.NewAtom(pos, ToString(varUnused)))
- .Seal()
- .Build();
- }
- TExprNodeList newChildren;
- for (size_t chPos = 0; chPos != node->ChildrenSize(); chPos++) {
- newChildren.push_back(MarkUnusedPatternVars(node->ChildPtr(chPos), ctx, usedVars, rowsPerMatch));
+ const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch->Content() || !output) && !usedVars.contains(varName);
+ return Build<TExprList>(ctx, pos)
+ .Add(node->ChildPtr(0))
+ .Add(node->ChildPtr(1))
+ .Add(node->ChildPtr(2))
+ .Add(node->ChildPtr(3))
+ .Add(node->ChildPtr(4))
+ .Add<TCoAtom>().Build(ToString(varUnused))
+ .Done()
+ .Ptr();
}
- if (node->IsCallable()) {
- return ctx.Builder(pos).Callable(node->Content()).Add(std::move(newChildren)).Seal().Build();
- } else if (node->IsList()) {
- return ctx.Builder(pos).List().Add(std::move(newChildren)).Seal().Build();
- } else { // Atom
- return node;
+ TExprNode::TListType newChildren;
+ for (const auto& child : node->Children()) {
+ newChildren.push_back(MarkUnusedPatternVars(child, ctx, usedVars, rowsPerMatch));
}
+ return ctx.ChangeChildren(*node, std::move(newChildren));
}
} // anonymous namespace
TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
- YQL_ENSURE(node->IsCallable("MatchRecognize"));
- const auto input = node->Child(0);
- const auto partitionKeySelector = node->Child(1);
- const auto partitionColumns = node->Child(2);
- const auto sortTraits = node->Child(3);
- const auto params = node->Child(4);
- const auto pos = node->Pos();
+ YQL_CLOG(DEBUG, Core) << "Expand " << node->Content();
+ TCoMatchRecognize matchRecognize(node);
+ const auto input = matchRecognize.Input().Ptr();
+ const auto partitionKeySelector = matchRecognize.PartitionKeySelector().Ptr();
+ const auto partitionColumns = matchRecognize.PartitionColumns().Ptr();
+ const auto sortTraits = matchRecognize.SortTraits().Ptr();
+ const auto params = matchRecognize.Params().Ptr();
+ const auto pos = matchRecognize.Pos();
- const bool isStreaming = IsStreaming(input, typeAnnCtx);
+ const auto isStreaming = IsStreaming(input, typeAnnCtx);
- TExprNode::TPtr settings = AddSetting(*ctx.NewList(pos, {}), pos,
- "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
-
- const auto rowsPerMatch = params->Child(1)->Content();
- auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx);
- const auto matchRecognize = ctx.Builder(pos)
- .Lambda()
- .Param("sortedPartition")
- .Callable(0, "ForwardList")
- .Callable(0, "MatchRecognizeCore")
- .Callable(0, "ToFlow")
- .Arg(0, "sortedPartition")
- .Seal()
- .Add(1, partitionKeySelector)
- .Add(2, partitionColumns)
- .Callable(3, params->Content())
- .Add(0, std::move(measures))
- .Add(1, params->ChildPtr(1))
- .Add(2, params->ChildPtr(2))
- .Add(3, MarkUnusedPatternVars(params->ChildPtr(3), ctx, FindUsedVars(params), rowsPerMatch))
- .Add(4, params->ChildPtr(4))
- .Seal()
- .Add(4, settings)
- .Seal()
- .Seal()
- .Seal()
- .Build();
+ auto newInput = Build<TCoLambda>(ctx, pos)
+ .Args({"partition"})
+ .Body<TCoToFlow>()
+ .Input("partition")
+ .Build()
+ .Done()
+ .Ptr();
TExprNode::TPtr sortKey;
TExprNode::TPtr sortOrder;
ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
- TExprNode::TPtr result;
- if (isStreaming) {
- YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE");
- const auto reordered = ctx.Builder(pos)
- .Lambda()
- .Param("partition")
- .Callable("ForwardList")
- .Callable(0, "OrderedMap")
- .Callable(0, "TimeOrderRecover")
- .Callable(0, "ToFlow").
- Arg(0, "partition")
+ auto timeOrderRecover = [&]() -> TExprNode::TPtr {
+ if (!isStreaming) {
+ return newInput;
+ }
+ switch (sortOrder->ChildrenSize()) {
+ case 0:
+ return newInput;
+ case 1: {
+ auto timeOrderRecover = ctx.Builder(pos)
+ .Lambda()
+ .Param("partition")
+ .Callable("TimeOrderRecover")
+ .Apply(0, std::move(newInput))
+ .With(0, "partition")
.Seal()
.Add(1, sortKey)
.Callable(2, "Interval")
- .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverDelay)))
+ .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverDelay))
.Seal()
.Callable(3, "Interval")
- .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverAhead)))
+ .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverAhead))
.Seal()
.Callable(4, "Uint32")
- .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverRowLimit)))
- .Seal()
- .Seal()
- .Lambda(1)
- .Param("row")
- .Callable("RemoveMember")
- .Arg(0, "row")
- .Add(1, ctx.NewAtom(pos, NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER))
+ .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverRowLimit))
.Seal()
.Seal()
.Seal()
- .Seal()
- .Seal()
- .Build();
+ .Build();
+ return Build<TCoLambda>(ctx, pos)
+ .Args({"partition"})
+ .Body<TCoOrderedMap>()
+ .Input<TExprApplier>()
+ .Apply(TCoLambda(timeOrderRecover))
+ .With(0, "partition")
+ .Build()
+ .Lambda<TCoLambda>()
+ .Args({"row"})
+ .Body<TCoRemoveMember>()
+ .Struct("row")
+ .Name<TCoAtom>().Build(NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER)
+ .Build()
+ .Build()
+ .Build()
+ .Done()
+ .Ptr();
+ }
+ default:
+ ctx.AddError(TIssue(ctx.GetPosition(sortTraits->Pos()), "Expect ORDER BY timestamp for MATCH_RECOGNIZE"));
+ return {};
+ }
+ }();
- const auto matchRecognizeOnReorderedPartition = ctx.Builder(pos)
- .Lambda()
- .Param("partition")
- .Apply(matchRecognize)
- .With(0)
- .Apply(reordered)
- .With(0)
- .Arg("partition")
- .Done()
- .Seal()
- .Done()
+ auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx);
+ auto rowsPerMatch = params->ChildPtr(1);
+ const auto usedVars = FindUsedVars(params);
+ auto pattern = MarkUnusedPatternVars(params->ChildPtr(3), ctx, usedVars, rowsPerMatch);
+ auto settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
+ auto newMatchRecognize = ctx.Builder(pos)
+ .Lambda()
+ .Param("partition")
+ .Callable("MatchRecognizeCore")
+ .Apply(0, std::move(timeOrderRecover))
+ .With(0, "partition")
+ .Seal()
+ .Add(1, partitionKeySelector)
+ .Add(2, partitionColumns)
+ .Callable(3, params->Content())
+ .Add(0, std::move(measures))
+ .Add(1, std::move(rowsPerMatch))
+ .Add(2, params->ChildPtr(2))
+ .Add(3, std::move(pattern))
+ .Add(4, params->ChildPtr(4))
.Seal()
+ .Add(4, std::move(settings))
.Seal()
- .Build();
+ .Seal()
+ .Build();
+
+ auto lambda = Build<TCoLambda>(ctx, pos)
+ .Args({"partition"})
+ .Body<TCoForwardList>()
+ .Stream<TExprApplier>()
+ .Apply(TCoLambda(newMatchRecognize))
+ .With(0, "partition")
+ .Build()
+ .Build()
+ .Done()
+ .Ptr();
+
+ if (isStreaming) {
TExprNode::TPtr keySelector;
if (partitionColumns->ChildrenSize() != 0) {
- keySelector = partitionKeySelector;
+ keySelector = std::move(partitionKeySelector);
} else {
- //Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
- //TODO(zverevgeny): fixme
- keySelector = ctx.Builder(pos)
- .Lambda()
- .Param("row")
- .Callable("Bool")
- .Add(0, ctx.NewAtom(pos, "true"))
- .Seal()
- .Seal()
- .Build();
+ // Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage
+ // TODO(zverevgeny): fixme
+ keySelector = Build<TCoLambda>(ctx, pos)
+ .Args({"row"})
+ .Body(MakeBool<true>(pos, ctx))
+ .Done()
+ .Ptr();
}
- result = ctx.Builder(pos)
- .Callable("ShuffleByKeys")
- .Add(0, input)
- .Add(1, keySelector)
- .Add(2, matchRecognizeOnReorderedPartition)
- .Seal()
- .Build();
- } else { //non-streaming
- result = ctx.Builder(pos)
- .Callable("PartitionsByKeys")
- .Add(0, input)
- .Add(1, partitionKeySelector)
- .Add(2, sortOrder)
- .Add(3, sortKey)
- .Add(4, matchRecognize)
- .Seal()
- .Build();
+
+ return Build<TCoShuffleByKeys>(ctx, pos)
+ .Input(std::move(input))
+ .KeySelectorLambda(std::move(keySelector))
+ .ListHandlerLambda(std::move(lambda))
+ .Done()
+ .Ptr();
+ } else { // non-streaming
+ return Build<TCoPartitionsByKeys>(ctx, pos)
+ .Input(std::move(input))
+ .KeySelectorLambda(std::move(partitionKeySelector))
+ .SortDirections(std::move(sortOrder))
+ .SortKeySelectorLambda(std::move(sortKey))
+ .ListHandlerLambda(std::move(lambda))
+ .Done()
+ .Ptr();
}
- YQL_CLOG(INFO, Core) << "Expanded MatchRecognize";
- return result;
}
-} //namespace NYql
+} // namespace NYql
diff --git a/yql/essentials/tests/sql/minirun/part1/canondata/result.json b/yql/essentials/tests/sql/minirun/part1/canondata/result.json
index 74ea11e4a39..e299c4d970d 100644
--- a/yql/essentials/tests/sql/minirun/part1/canondata/result.json
+++ b/yql/essentials/tests/sql/minirun/part1/canondata/result.json
@@ -734,6 +734,20 @@
"uri": "https://{canondata_backend}/1942278/bea251ec797c6ae6c79a3fa31fd0d3dbee273fa6/resource.tar.gz#test.test_library-library_yqls--Results_/results.txt"
}
],
+ "test.test[match_recognize-alerts_without_order-streaming-default.txt-Debug]": [
+ {
+ "checksum": "da3c36fb5e54fd02cedfd15c0d0b9d0b",
+ "size": 5896,
+ "uri": "https://{canondata_backend}/937458/61af1cc3453c6ab9c5a837eeb404bf874f130293/resource.tar.gz#test.test_match_recognize-alerts_without_order-streaming-default.txt-Debug_/opt.yql"
+ }
+ ],
+ "test.test[match_recognize-alerts_without_order-streaming-default.txt-Results]": [
+ {
+ "checksum": "6f125fabed1ec00aaab81efa4ab4a1b3",
+ "size": 4625,
+ "uri": "https://{canondata_backend}/937458/61af1cc3453c6ab9c5a837eeb404bf874f130293/resource.tar.gz#test.test_match_recognize-alerts_without_order-streaming-default.txt-Results_/results.txt"
+ }
+ ],
"test.test[match_recognize-greedy_quantifiers-default.txt-Debug]": [
{
"checksum": "66fb0a8ccd3814cb306c356fcecea0d1",
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index 2328e0f0a6d..8b6291d7191 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -4185,6 +4185,13 @@
"uri": "https://{canondata_backend}/1920236/5e37b541c71c89b1b95dee0463a5a2e9bc5999f4/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql"
}
],
+ "test_sql2yql.test[match_recognize-alerts_without_order-streaming]": [
+ {
+ "checksum": "2544bb720aab6ef9d8d57f909f58ce8f",
+ "size": 9925,
+ "uri": "https://{canondata_backend}/1925842/7d0ab953a9979e9baa7ae26ebae2128b1cbe8128/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order-streaming_/sql.yql"
+ }
+ ],
"test_sql2yql.test[match_recognize-alerts_without_order]": [
{
"checksum": "7e6cd1cda9ddc8a2fe0f41ace902517e",
@@ -10213,6 +10220,11 @@
"uri": "file://test_sql_format.test_match_recognize-alerts_/formatted.sql"
}
],
+ "test_sql_format.test[match_recognize-alerts_without_order-streaming]": [
+ {
+ "uri": "file://test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql"
+ }
+ ],
"test_sql_format.test[match_recognize-alerts_without_order]": [
{
"uri": "file://test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql"
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql
new file mode 100644
index 00000000000..8ba43bd8299
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql
@@ -0,0 +1,56 @@
+$osquery_data = [
+ <|dt: 1688910000, host: 'fqdn1', ev_type: 'someEv', ev_status: '', user: '', vpn: FALSE,|>,
+ <|dt: 1688910050, host: 'fqdn2', ev_type: 'login', ev_status: 'success', user: '', vpn: TRUE,|>,
+ <|dt: 1688910100, host: 'fqdn1', ev_type: 'login', ev_status: 'success', user: '', vpn: TRUE,|>,
+ <|dt: 1688910220, host: 'fqdn1', ev_type: 'login', ev_status: 'success', user: '', vpn: FALSE,|>,
+ <|dt: 1688910300, host: 'fqdn1', ev_type: 'delete_all', ev_status: '', user: '', vpn: FALSE,|>,
+ <|dt: 1688910400, host: 'fqdn2', ev_type: 'delete_all', ev_status: '', user: '', vpn: FALSE,|>,
+ <|dt: 1688910500, host: 'fqdn1', ev_type: 'login', ev_status: 'failed', user: 'user1', vpn: FALSE,|>,
+ <|dt: 1688910500, host: 'fqdn1', ev_type: 'login', ev_status: 'failed', user: 'user2', vpn: FALSE,|>,
+ <|dt: 1688910600, host: 'fqdn', ev_type: 'someEv', ev_status: '', user: 'user1', vpn: FALSE,|>,
+ <|dt: 1688910800, host: 'fqdn2', ev_type: 'login', ev_status: 'failed', user: 'user1', vpn: FALSE,|>,
+ <|dt: 1688910900, host: 'fqdn2', ev_type: 'login', ev_status: 'failed', user: 'user2', vpn: FALSE,|>,
+ <|dt: 1688911000, host: 'fqdn2', ev_type: 'login', ev_status: 'success', user: 'user1', vpn: FALSE,|>,
+ <|dt: 1688911001, host: 'fqdn2', ev_type: 'login', ev_status: 'success', user: 'user1', vpn: FALSE,|>,
+];
+
+PRAGMA FeatureR010 = 'prototype';
+PRAGMA config.flags('MatchRecognizeStream', 'force');
+
+SELECT
+ *
+FROM
+ AS_TABLE($osquery_data) MATCH_RECOGNIZE (
+ MEASURES
+ LAST(LOGIN_SUCCESS_REMOTE.host) AS remote_login_host,
+ LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user,
+ LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt,
+ LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt,
+ FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin,
+ FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end,
+ LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login
+ ONE ROW PER MATCH
+ AFTER MATCH SKIP TO NEXT ROW
+ PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
+ DEFINE
+ LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == 'login'
+ AND LOGIN_SUCCESS_REMOTE.ev_status == 'success'
+ AND LOGIN_SUCCESS_REMOTE.vpn == TRUE
+ AND COALESCE(LOGIN_SUCCESS_REMOTE.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE),
+ ANY_ROW1 AS COALESCE(ANY_ROW1.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
+ SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host)
+ AND SUSPICIOUS_ACTION_SOON.ev_type == 'delete_all'
+ AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
+ LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == 'login'
+ AND LOGIN_FAILED_SAME_USER.ev_status != 'success'
+ AND (
+ LAST(LOGIN_FAILED_SAME_USER.user) IS NULL
+ OR LAST(LOGIN_FAILED_SAME_USER.user) == LOGIN_FAILED_SAME_USER.user
+ ) AND COALESCE(LOGIN_FAILED_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE),
+ ANY_ROW2 AS COALESCE(ANY_ROW2.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE),
+ LOGIN_SUCCESS_SAME_USER AS LOGIN_SUCCESS_SAME_USER.ev_type == 'login'
+ AND LOGIN_SUCCESS_SAME_USER.ev_status == 'success'
+ AND LOGIN_SUCCESS_SAME_USER.user == LAST(LOGIN_FAILED_SAME_USER.user)
+ AND COALESCE(LOGIN_SUCCESS_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE)
+ ) AS MATCHED
+;
diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql
new file mode 100644
index 00000000000..8c627a65cde
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql
@@ -0,0 +1,64 @@
+$osquery_data = [
+<|dt:1688910000, host:"fqdn1", ev_type:"someEv", ev_status:"", user:"", vpn:false, |>,
+<|dt:1688910050, host:"fqdn2", ev_type:"login", ev_status:"success", user:"", vpn:true, |>,
+<|dt:1688910100, host:"fqdn1", ev_type:"login", ev_status:"success", user:"", vpn:true, |>,
+<|dt:1688910220, host:"fqdn1", ev_type:"login", ev_status:"success", user:"", vpn:false, |>,
+<|dt:1688910300, host:"fqdn1", ev_type:"delete_all", ev_status:"", user:"", vpn:false, |>,
+<|dt:1688910400, host:"fqdn2", ev_type:"delete_all", ev_status:"", user:"", vpn:false, |>,
+<|dt:1688910500, host:"fqdn1", ev_type:"login", ev_status:"failed", user:"user1", vpn:false, |>,
+<|dt:1688910500, host:"fqdn1", ev_type:"login", ev_status:"failed", user:"user2", vpn:false, |>,
+<|dt:1688910600, host:"fqdn", ev_type:"someEv", ev_status:"", user:"user1", vpn:false, |>,
+<|dt:1688910800, host:"fqdn2", ev_type:"login", ev_status:"failed", user:"user1", vpn:false, |>,
+<|dt:1688910900, host:"fqdn2", ev_type:"login", ev_status:"failed", user:"user2", vpn:false, |>,
+<|dt:1688911000, host:"fqdn2", ev_type:"login", ev_status:"success", user:"user1", vpn:false, |>,
+<|dt:1688911001, host:"fqdn2", ev_type:"login", ev_status:"success", user:"user1", vpn:false, |>,
+];
+
+pragma FeatureR010="prototype";
+pragma config.flags("MatchRecognizeStream", "force");
+
+SELECT *
+FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
+ MEASURES
+ LAST(LOGIN_SUCCESS_REMOTE.host) as remote_login_host,
+ LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user,
+ LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt,
+ LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt,
+ FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin,
+ FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end,
+ LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login
+
+ ONE ROW PER MATCH
+ AFTER MATCH SKIP TO NEXT ROW
+ PATTERN (
+ LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON |
+ (LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER
+ )
+ DEFINE
+ LOGIN_SUCCESS_REMOTE as
+ LOGIN_SUCCESS_REMOTE.ev_type = "login" and
+ LOGIN_SUCCESS_REMOTE.ev_status = "success" and
+ LOGIN_SUCCESS_REMOTE.vpn = true and
+ COALESCE(LOGIN_SUCCESS_REMOTE.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE),
+ ANY_ROW1 as
+ COALESCE(ANY_ROW1.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
+ SUSPICIOUS_ACTION_SOON as
+ SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and
+ SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and
+ COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
+ LOGIN_FAILED_SAME_USER as
+ LOGIN_FAILED_SAME_USER.ev_type = "login" and
+ LOGIN_FAILED_SAME_USER.ev_status <> "success" and
+ (LAST(LOGIN_FAILED_SAME_USER.user) IS NULL
+ or LAST(LOGIN_FAILED_SAME_USER.user) = LOGIN_FAILED_SAME_USER.user
+ ) and COALESCE(LOGIN_FAILED_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE),
+ ANY_ROW2 as
+ COALESCE(ANY_ROW2.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE),
+ LOGIN_SUCCESS_SAME_USER as
+ LOGIN_SUCCESS_SAME_USER.ev_type = "login" and
+ LOGIN_SUCCESS_SAME_USER.ev_status = "success" and
+ LOGIN_SUCCESS_SAME_USER.user = LAST(LOGIN_FAILED_SAME_USER.user) and
+ COALESCE(LOGIN_SUCCESS_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE)
+) AS MATCHED
+;
+