diff options
7 files changed, 312 insertions, 156 deletions
diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json index 76e556533f7..09de074b5ed 100644 --- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json +++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json @@ -1563,10 +1563,10 @@ "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "MatchRecognize"}, "Children": [ - {"Index": 1, "Name": "partitionKeySelector", "Type": "TExprBase"}, - {"Index": 2, "Name": "partitionColumns", "Type": "TExprBase"}, - {"Index": 3, "Name": "sortTraits", "Type": "TExprBase"}, - {"Index": 4, "Name": "params", "Type": "TExprBase"} + {"Index": 1, "Name": "PartitionKeySelector", "Type": "TExprBase"}, + {"Index": 2, "Name": "PartitionColumns", "Type": "TExprBase"}, + {"Index": 3, "Name": "SortTraits", "Type": "TExprBase"}, + {"Index": 4, "Name": "Params", "Type": "TExprBase"} ] }, { diff --git a/yql/essentials/core/sql_types/time_order_recover.h b/yql/essentials/core/sql_types/time_order_recover.h index fd6ef0b61a7..590f73b7c0c 100644 --- a/yql/essentials/core/sql_types/time_order_recover.h +++ b/yql/essentials/core/sql_types/time_order_recover.h @@ -1,9 +1,9 @@ #pragma once + #include <string_view> namespace NYql::NTimeOrderRecover { -using namespace std::string_view_literals; -constexpr auto OUT_OF_ORDER_MARKER = "_yql_OutOfOrder"sv; +inline constexpr std::string_view OUT_OF_ORDER_MARKER = "_yql_OutOfOrder"; -}//namespace NYql::NMatchRecognize +} // namespace NYql::NTimeOrderRecover diff --git a/yql/essentials/core/yql_opt_match_recognize.cpp b/yql/essentials/core/yql_opt_match_recognize.cpp index 3efae9e42b4..e8f986fb232 100644 --- a/yql/essentials/core/yql_opt_match_recognize.cpp +++ b/yql/essentials/core/yql_opt_match_recognize.cpp @@ -1,8 +1,11 @@ #include "yql_opt_match_recognize.h" + #include "yql_opt_utils.h" + #include <yql/essentials/core/sql_types/time_order_recover.h> #include <yql/essentials/core/yql_expr_optimize.h> #include <yql/essentials/utils/log/log.h> +#include <ranges> namespace NYql { @@ -11,24 +14,22 @@ using namespace NNodes; namespace { bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) { - if (EMatchRecognizeStreamingMode::Disable == typeAnnCtx.MatchRecognizeStreaming){ - return false; - } - if (EMatchRecognizeStreamingMode::Force == typeAnnCtx.MatchRecognizeStreaming){ - return true; - } - - YQL_ENSURE(EMatchRecognizeStreamingMode::Auto == typeAnnCtx.MatchRecognizeStreaming, "Internal logic error"); - - bool hasPq = false; - NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node){ - if (node->IsCallable("DataSource")) { - YQL_ENSURE(node->ChildrenSize() > 0 and node->Child(0)->IsAtom()); - hasPq = node->Child(0)->Content() == "pq"; + switch (typeAnnCtx.MatchRecognizeStreaming) { + case EMatchRecognizeStreamingMode::Disable: + return false; + case EMatchRecognizeStreamingMode::Force: + return true; + case EMatchRecognizeStreamingMode::Auto: { + bool hasPq = false; + NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node) { + if (auto maybeDataSource = TExprBase(node).Maybe<TCoDataSource>()) { + hasPq = maybeDataSource.Cast().Category().Value() == "pq"; + } + return !hasPq; + }); + return hasPq; } - return !hasPq; - }); - return hasPq; + } } TExprNode::TPtr ExpandMatchRecognizeMeasuresAggregates(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& /* typeAnnCtx */) { @@ -138,16 +139,16 @@ THashSet<TStringBuf> FindUsedVars(const TExprNode::TPtr& params) { const auto defines = params->Child(4); static constexpr size_t defineLambdasStartPos = 3; - for (auto i = defineLambdasStartPos; i < defines->ChildrenSize(); ++i) { - const auto lambda = defines->Child(i); - const auto lambdaArgs = lambda->Child(0); - const auto lambdaBody = lambda->ChildPtr(1); - const auto varsArg = lambdaArgs->Child(1); + for (const auto& define : defines->Children() | std::views::drop(defineLambdasStartPos)) { + const auto lambda = TCoLambda(define); + const auto varsArg = lambda.Args().Arg(1).Ptr(); + const auto lambdaBody = lambda.Body().Ptr(); NYql::VisitExpr( lambdaBody, [varsArg, &result](const TExprNode::TPtr& node) { - if (node->IsCallable("Member") && node->Child(0) == varsArg) { - result.insert(node->Child(1)->Content()); + if (auto maybeMember = TMaybeNode<TCoMember>(node); + maybeMember && maybeMember.Cast().Struct().Ptr() == varsArg) { + result.insert(maybeMember.Cast().Name().Value()); return false; } return true; @@ -158,164 +159,173 @@ THashSet<TStringBuf> FindUsedVars(const TExprNode::TPtr& params) { return result; } -TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet<TStringBuf>& usedVars, TStringBuf rowsPerMatch) { +TExprNode::TPtr MarkUnusedPatternVars(const TExprNode::TPtr& node, TExprContext& ctx, const THashSet<TStringBuf>& usedVars, const TExprNode::TPtr& rowsPerMatch) { const auto pos = node->Pos(); - if (node->ChildrenSize() != 0 && node->Child(0)->IsAtom()) { + if (node->ChildrenSize() == 6 && node->Child(0)->IsAtom()) { const auto varName = node->Child(0)->Content(); const auto output = FromString<bool>(node->Child(4)->Content()); - const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch || !output) && !usedVars.contains(varName); - return ctx.Builder(pos) - .List() - .Add(0, node->ChildPtr(0)) - .Add(1, node->ChildPtr(1)) - .Add(2, node->ChildPtr(2)) - .Add(3, node->ChildPtr(3)) - .Add(4, node->ChildPtr(4)) - .Add(5, ctx.NewAtom(pos, ToString(varUnused))) - .Seal() - .Build(); - } - TExprNodeList newChildren; - for (size_t chPos = 0; chPos != node->ChildrenSize(); chPos++) { - newChildren.push_back(MarkUnusedPatternVars(node->ChildPtr(chPos), ctx, usedVars, rowsPerMatch)); + const auto varUnused = ("RowsPerMatch_AllRows" != rowsPerMatch->Content() || !output) && !usedVars.contains(varName); + return Build<TExprList>(ctx, pos) + .Add(node->ChildPtr(0)) + .Add(node->ChildPtr(1)) + .Add(node->ChildPtr(2)) + .Add(node->ChildPtr(3)) + .Add(node->ChildPtr(4)) + .Add<TCoAtom>().Build(ToString(varUnused)) + .Done() + .Ptr(); } - if (node->IsCallable()) { - return ctx.Builder(pos).Callable(node->Content()).Add(std::move(newChildren)).Seal().Build(); - } else if (node->IsList()) { - return ctx.Builder(pos).List().Add(std::move(newChildren)).Seal().Build(); - } else { // Atom - return node; + TExprNode::TListType newChildren; + for (const auto& child : node->Children()) { + newChildren.push_back(MarkUnusedPatternVars(child, ctx, usedVars, rowsPerMatch)); } + return ctx.ChangeChildren(*node, std::move(newChildren)); } } // anonymous namespace TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) { - YQL_ENSURE(node->IsCallable("MatchRecognize")); - const auto input = node->Child(0); - const auto partitionKeySelector = node->Child(1); - const auto partitionColumns = node->Child(2); - const auto sortTraits = node->Child(3); - const auto params = node->Child(4); - const auto pos = node->Pos(); + YQL_CLOG(DEBUG, Core) << "Expand " << node->Content(); + TCoMatchRecognize matchRecognize(node); + const auto input = matchRecognize.Input().Ptr(); + const auto partitionKeySelector = matchRecognize.PartitionKeySelector().Ptr(); + const auto partitionColumns = matchRecognize.PartitionColumns().Ptr(); + const auto sortTraits = matchRecognize.SortTraits().Ptr(); + const auto params = matchRecognize.Params().Ptr(); + const auto pos = matchRecognize.Pos(); - const bool isStreaming = IsStreaming(input, typeAnnCtx); + const auto isStreaming = IsStreaming(input, typeAnnCtx); - TExprNode::TPtr settings = AddSetting(*ctx.NewList(pos, {}), pos, - "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx); - - const auto rowsPerMatch = params->Child(1)->Content(); - auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx); - const auto matchRecognize = ctx.Builder(pos) - .Lambda() - .Param("sortedPartition") - .Callable(0, "ForwardList") - .Callable(0, "MatchRecognizeCore") - .Callable(0, "ToFlow") - .Arg(0, "sortedPartition") - .Seal() - .Add(1, partitionKeySelector) - .Add(2, partitionColumns) - .Callable(3, params->Content()) - .Add(0, std::move(measures)) - .Add(1, params->ChildPtr(1)) - .Add(2, params->ChildPtr(2)) - .Add(3, MarkUnusedPatternVars(params->ChildPtr(3), ctx, FindUsedVars(params), rowsPerMatch)) - .Add(4, params->ChildPtr(4)) - .Seal() - .Add(4, settings) - .Seal() - .Seal() - .Seal() - .Build(); + auto newInput = Build<TCoLambda>(ctx, pos) + .Args({"partition"}) + .Body<TCoToFlow>() + .Input("partition") + .Build() + .Done() + .Ptr(); TExprNode::TPtr sortKey; TExprNode::TPtr sortOrder; ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx); - TExprNode::TPtr result; - if (isStreaming) { - YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE"); - const auto reordered = ctx.Builder(pos) - .Lambda() - .Param("partition") - .Callable("ForwardList") - .Callable(0, "OrderedMap") - .Callable(0, "TimeOrderRecover") - .Callable(0, "ToFlow"). - Arg(0, "partition") + auto timeOrderRecover = [&]() -> TExprNode::TPtr { + if (!isStreaming) { + return newInput; + } + switch (sortOrder->ChildrenSize()) { + case 0: + return newInput; + case 1: { + auto timeOrderRecover = ctx.Builder(pos) + .Lambda() + .Param("partition") + .Callable("TimeOrderRecover") + .Apply(0, std::move(newInput)) + .With(0, "partition") .Seal() .Add(1, sortKey) .Callable(2, "Interval") - .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverDelay))) + .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverDelay)) .Seal() .Callable(3, "Interval") - .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverAhead))) + .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverAhead)) .Seal() .Callable(4, "Uint32") - .Add(0, ctx.NewAtom(pos, ToString(typeAnnCtx.TimeOrderRecoverRowLimit))) - .Seal() - .Seal() - .Lambda(1) - .Param("row") - .Callable("RemoveMember") - .Arg(0, "row") - .Add(1, ctx.NewAtom(pos, NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER)) + .Atom(0, ToString(typeAnnCtx.TimeOrderRecoverRowLimit)) .Seal() .Seal() .Seal() - .Seal() - .Seal() - .Build(); + .Build(); + return Build<TCoLambda>(ctx, pos) + .Args({"partition"}) + .Body<TCoOrderedMap>() + .Input<TExprApplier>() + .Apply(TCoLambda(timeOrderRecover)) + .With(0, "partition") + .Build() + .Lambda<TCoLambda>() + .Args({"row"}) + .Body<TCoRemoveMember>() + .Struct("row") + .Name<TCoAtom>().Build(NYql::NTimeOrderRecover::OUT_OF_ORDER_MARKER) + .Build() + .Build() + .Build() + .Done() + .Ptr(); + } + default: + ctx.AddError(TIssue(ctx.GetPosition(sortTraits->Pos()), "Expect ORDER BY timestamp for MATCH_RECOGNIZE")); + return {}; + } + }(); - const auto matchRecognizeOnReorderedPartition = ctx.Builder(pos) - .Lambda() - .Param("partition") - .Apply(matchRecognize) - .With(0) - .Apply(reordered) - .With(0) - .Arg("partition") - .Done() - .Seal() - .Done() + auto measures = ExpandMatchRecognizeMeasuresAggregates(params->ChildPtr(0), ctx, typeAnnCtx); + auto rowsPerMatch = params->ChildPtr(1); + const auto usedVars = FindUsedVars(params); + auto pattern = MarkUnusedPatternVars(params->ChildPtr(3), ctx, usedVars, rowsPerMatch); + auto settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx); + auto newMatchRecognize = ctx.Builder(pos) + .Lambda() + .Param("partition") + .Callable("MatchRecognizeCore") + .Apply(0, std::move(timeOrderRecover)) + .With(0, "partition") + .Seal() + .Add(1, partitionKeySelector) + .Add(2, partitionColumns) + .Callable(3, params->Content()) + .Add(0, std::move(measures)) + .Add(1, std::move(rowsPerMatch)) + .Add(2, params->ChildPtr(2)) + .Add(3, std::move(pattern)) + .Add(4, params->ChildPtr(4)) .Seal() + .Add(4, std::move(settings)) .Seal() - .Build(); + .Seal() + .Build(); + + auto lambda = Build<TCoLambda>(ctx, pos) + .Args({"partition"}) + .Body<TCoForwardList>() + .Stream<TExprApplier>() + .Apply(TCoLambda(newMatchRecognize)) + .With(0, "partition") + .Build() + .Build() + .Done() + .Ptr(); + + if (isStreaming) { TExprNode::TPtr keySelector; if (partitionColumns->ChildrenSize() != 0) { - keySelector = partitionKeySelector; + keySelector = std::move(partitionKeySelector); } else { - //Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage - //TODO(zverevgeny): fixme - keySelector = ctx.Builder(pos) - .Lambda() - .Param("row") - .Callable("Bool") - .Add(0, ctx.NewAtom(pos, "true")) - .Seal() - .Seal() - .Build(); + // Use pseudo partitioning with constant lambda to wrap TimeOrderRecover into DQ stage + // TODO(zverevgeny): fixme + keySelector = Build<TCoLambda>(ctx, pos) + .Args({"row"}) + .Body(MakeBool<true>(pos, ctx)) + .Done() + .Ptr(); } - result = ctx.Builder(pos) - .Callable("ShuffleByKeys") - .Add(0, input) - .Add(1, keySelector) - .Add(2, matchRecognizeOnReorderedPartition) - .Seal() - .Build(); - } else { //non-streaming - result = ctx.Builder(pos) - .Callable("PartitionsByKeys") - .Add(0, input) - .Add(1, partitionKeySelector) - .Add(2, sortOrder) - .Add(3, sortKey) - .Add(4, matchRecognize) - .Seal() - .Build(); + + return Build<TCoShuffleByKeys>(ctx, pos) + .Input(std::move(input)) + .KeySelectorLambda(std::move(keySelector)) + .ListHandlerLambda(std::move(lambda)) + .Done() + .Ptr(); + } else { // non-streaming + return Build<TCoPartitionsByKeys>(ctx, pos) + .Input(std::move(input)) + .KeySelectorLambda(std::move(partitionKeySelector)) + .SortDirections(std::move(sortOrder)) + .SortKeySelectorLambda(std::move(sortKey)) + .ListHandlerLambda(std::move(lambda)) + .Done() + .Ptr(); } - YQL_CLOG(INFO, Core) << "Expanded MatchRecognize"; - return result; } -} //namespace NYql +} // namespace NYql diff --git a/yql/essentials/tests/sql/minirun/part1/canondata/result.json b/yql/essentials/tests/sql/minirun/part1/canondata/result.json index 74ea11e4a39..e299c4d970d 100644 --- a/yql/essentials/tests/sql/minirun/part1/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part1/canondata/result.json @@ -734,6 +734,20 @@ "uri": "https://{canondata_backend}/1942278/bea251ec797c6ae6c79a3fa31fd0d3dbee273fa6/resource.tar.gz#test.test_library-library_yqls--Results_/results.txt" } ], + "test.test[match_recognize-alerts_without_order-streaming-default.txt-Debug]": [ + { + "checksum": "da3c36fb5e54fd02cedfd15c0d0b9d0b", + "size": 5896, + "uri": "https://{canondata_backend}/937458/61af1cc3453c6ab9c5a837eeb404bf874f130293/resource.tar.gz#test.test_match_recognize-alerts_without_order-streaming-default.txt-Debug_/opt.yql" + } + ], + "test.test[match_recognize-alerts_without_order-streaming-default.txt-Results]": [ + { + "checksum": "6f125fabed1ec00aaab81efa4ab4a1b3", + "size": 4625, + "uri": "https://{canondata_backend}/937458/61af1cc3453c6ab9c5a837eeb404bf874f130293/resource.tar.gz#test.test_match_recognize-alerts_without_order-streaming-default.txt-Results_/results.txt" + } + ], "test.test[match_recognize-greedy_quantifiers-default.txt-Debug]": [ { "checksum": "66fb0a8ccd3814cb306c356fcecea0d1", diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index 2328e0f0a6d..8b6291d7191 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -4185,6 +4185,13 @@ "uri": "https://{canondata_backend}/1920236/5e37b541c71c89b1b95dee0463a5a2e9bc5999f4/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql" } ], + "test_sql2yql.test[match_recognize-alerts_without_order-streaming]": [ + { + "checksum": "2544bb720aab6ef9d8d57f909f58ce8f", + "size": 9925, + "uri": "https://{canondata_backend}/1925842/7d0ab953a9979e9baa7ae26ebae2128b1cbe8128/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order-streaming_/sql.yql" + } + ], "test_sql2yql.test[match_recognize-alerts_without_order]": [ { "checksum": "7e6cd1cda9ddc8a2fe0f41ace902517e", @@ -10213,6 +10220,11 @@ "uri": "file://test_sql_format.test_match_recognize-alerts_/formatted.sql" } ], + "test_sql_format.test[match_recognize-alerts_without_order-streaming]": [ + { + "uri": "file://test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql" + } + ], "test_sql_format.test[match_recognize-alerts_without_order]": [ { "uri": "file://test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql" diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql new file mode 100644 index 00000000000..8ba43bd8299 --- /dev/null +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order-streaming_/formatted.sql @@ -0,0 +1,56 @@ +$osquery_data = [ + <|dt: 1688910000, host: 'fqdn1', ev_type: 'someEv', ev_status: '', user: '', vpn: FALSE,|>, + <|dt: 1688910050, host: 'fqdn2', ev_type: 'login', ev_status: 'success', user: '', vpn: TRUE,|>, + <|dt: 1688910100, host: 'fqdn1', ev_type: 'login', ev_status: 'success', user: '', vpn: TRUE,|>, + <|dt: 1688910220, host: 'fqdn1', ev_type: 'login', ev_status: 'success', user: '', vpn: FALSE,|>, + <|dt: 1688910300, host: 'fqdn1', ev_type: 'delete_all', ev_status: '', user: '', vpn: FALSE,|>, + <|dt: 1688910400, host: 'fqdn2', ev_type: 'delete_all', ev_status: '', user: '', vpn: FALSE,|>, + <|dt: 1688910500, host: 'fqdn1', ev_type: 'login', ev_status: 'failed', user: 'user1', vpn: FALSE,|>, + <|dt: 1688910500, host: 'fqdn1', ev_type: 'login', ev_status: 'failed', user: 'user2', vpn: FALSE,|>, + <|dt: 1688910600, host: 'fqdn', ev_type: 'someEv', ev_status: '', user: 'user1', vpn: FALSE,|>, + <|dt: 1688910800, host: 'fqdn2', ev_type: 'login', ev_status: 'failed', user: 'user1', vpn: FALSE,|>, + <|dt: 1688910900, host: 'fqdn2', ev_type: 'login', ev_status: 'failed', user: 'user2', vpn: FALSE,|>, + <|dt: 1688911000, host: 'fqdn2', ev_type: 'login', ev_status: 'success', user: 'user1', vpn: FALSE,|>, + <|dt: 1688911001, host: 'fqdn2', ev_type: 'login', ev_status: 'success', user: 'user1', vpn: FALSE,|>, +]; + +PRAGMA FeatureR010 = 'prototype'; +PRAGMA config.flags('MatchRecognizeStream', 'force'); + +SELECT + * +FROM + AS_TABLE($osquery_data) MATCH_RECOGNIZE ( + MEASURES + LAST(LOGIN_SUCCESS_REMOTE.host) AS remote_login_host, + LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user, + LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt, + LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt, + FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin, + FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end, + LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login + ONE ROW PER MATCH + AFTER MATCH SKIP TO NEXT ROW + PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) + DEFINE + LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == 'login' + AND LOGIN_SUCCESS_REMOTE.ev_status == 'success' + AND LOGIN_SUCCESS_REMOTE.vpn == TRUE + AND COALESCE(LOGIN_SUCCESS_REMOTE.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE), + ANY_ROW1 AS COALESCE(ANY_ROW1.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), + SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host) + AND SUSPICIOUS_ACTION_SOON.ev_type == 'delete_all' + AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), + LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == 'login' + AND LOGIN_FAILED_SAME_USER.ev_status != 'success' + AND ( + LAST(LOGIN_FAILED_SAME_USER.user) IS NULL + OR LAST(LOGIN_FAILED_SAME_USER.user) == LOGIN_FAILED_SAME_USER.user + ) AND COALESCE(LOGIN_FAILED_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE), + ANY_ROW2 AS COALESCE(ANY_ROW2.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE), + LOGIN_SUCCESS_SAME_USER AS LOGIN_SUCCESS_SAME_USER.ev_type == 'login' + AND LOGIN_SUCCESS_SAME_USER.ev_status == 'success' + AND LOGIN_SUCCESS_SAME_USER.user == LAST(LOGIN_FAILED_SAME_USER.user) + AND COALESCE(LOGIN_SUCCESS_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE) + ) AS MATCHED +; diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql new file mode 100644 index 00000000000..8c627a65cde --- /dev/null +++ b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order-streaming.sql @@ -0,0 +1,64 @@ +$osquery_data = [ +<|dt:1688910000, host:"fqdn1", ev_type:"someEv", ev_status:"", user:"", vpn:false, |>, +<|dt:1688910050, host:"fqdn2", ev_type:"login", ev_status:"success", user:"", vpn:true, |>, +<|dt:1688910100, host:"fqdn1", ev_type:"login", ev_status:"success", user:"", vpn:true, |>, +<|dt:1688910220, host:"fqdn1", ev_type:"login", ev_status:"success", user:"", vpn:false, |>, +<|dt:1688910300, host:"fqdn1", ev_type:"delete_all", ev_status:"", user:"", vpn:false, |>, +<|dt:1688910400, host:"fqdn2", ev_type:"delete_all", ev_status:"", user:"", vpn:false, |>, +<|dt:1688910500, host:"fqdn1", ev_type:"login", ev_status:"failed", user:"user1", vpn:false, |>, +<|dt:1688910500, host:"fqdn1", ev_type:"login", ev_status:"failed", user:"user2", vpn:false, |>, +<|dt:1688910600, host:"fqdn", ev_type:"someEv", ev_status:"", user:"user1", vpn:false, |>, +<|dt:1688910800, host:"fqdn2", ev_type:"login", ev_status:"failed", user:"user1", vpn:false, |>, +<|dt:1688910900, host:"fqdn2", ev_type:"login", ev_status:"failed", user:"user2", vpn:false, |>, +<|dt:1688911000, host:"fqdn2", ev_type:"login", ev_status:"success", user:"user1", vpn:false, |>, +<|dt:1688911001, host:"fqdn2", ev_type:"login", ev_status:"success", user:"user1", vpn:false, |>, +]; + +pragma FeatureR010="prototype"; +pragma config.flags("MatchRecognizeStream", "force"); + +SELECT * +FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( + MEASURES + LAST(LOGIN_SUCCESS_REMOTE.host) as remote_login_host, + LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user, + LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt, + LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt, + FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin, + FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end, + LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login + + ONE ROW PER MATCH + AFTER MATCH SKIP TO NEXT ROW + PATTERN ( + LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON | + (LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER + ) + DEFINE + LOGIN_SUCCESS_REMOTE as + LOGIN_SUCCESS_REMOTE.ev_type = "login" and + LOGIN_SUCCESS_REMOTE.ev_status = "success" and + LOGIN_SUCCESS_REMOTE.vpn = true and + COALESCE(LOGIN_SUCCESS_REMOTE.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE), + ANY_ROW1 as + COALESCE(ANY_ROW1.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), + SUSPICIOUS_ACTION_SOON as + SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and + SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and + COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), + LOGIN_FAILED_SAME_USER as + LOGIN_FAILED_SAME_USER.ev_type = "login" and + LOGIN_FAILED_SAME_USER.ev_status <> "success" and + (LAST(LOGIN_FAILED_SAME_USER.user) IS NULL + or LAST(LOGIN_FAILED_SAME_USER.user) = LOGIN_FAILED_SAME_USER.user + ) and COALESCE(LOGIN_FAILED_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE), + ANY_ROW2 as + COALESCE(ANY_ROW2.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE), + LOGIN_SUCCESS_SAME_USER as + LOGIN_SUCCESS_SAME_USER.ev_type = "login" and + LOGIN_SUCCESS_SAME_USER.ev_status = "success" and + LOGIN_SUCCESS_SAME_USER.user = LAST(LOGIN_FAILED_SAME_USER.user) and + COALESCE(LOGIN_SUCCESS_SAME_USER.dt - FIRST(LOGIN_FAILED_SAME_USER.dt) <= 500, TRUE) +) AS MATCHED +; + |
