diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-19 22:22:42 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-19 22:37:56 +0300 |
commit | ec05b76b4a9bbd7f35510298508a6435d7c43500 (patch) | |
tree | 8001f93bdc23ad6e34e0b45139962670078ee0ff | |
parent | 0f1825e5660fc3c4b9f0e618aabcd650eaa8c815 (diff) | |
download | ydb-ec05b76b4a9bbd7f35510298508a6435d7c43500.tar.gz |
YQL-16443 test steaming version of match_recognize on tables
-rw-r--r-- | ydb/library/yql/core/yql_opt_match_recognize.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_type_annotation.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/providers/config/yql_config_provider.cpp | 17 |
3 files changed, 35 insertions, 2 deletions
diff --git a/ydb/library/yql/core/yql_opt_match_recognize.cpp b/ydb/library/yql/core/yql_opt_match_recognize.cpp index 06522daf0a..10598d0833 100644 --- a/ydb/library/yql/core/yql_opt_match_recognize.cpp +++ b/ydb/library/yql/core/yql_opt_match_recognize.cpp @@ -9,7 +9,16 @@ namespace NYql { using namespace NNodes; namespace { -bool IsStreaming(const TExprNode::TPtr& input) { +bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) { + if (TTypeAnnotationContext::EMatchRecognizeStreamingMode::Disable == typeAnnCtx.MatchRecognizeStreaming){ + return false; + } + if (TTypeAnnotationContext::EMatchRecognizeStreamingMode::Force == typeAnnCtx.MatchRecognizeStreaming){ + return true; + } + + YQL_ENSURE(TTypeAnnotationContext::EMatchRecognizeStreamingMode::Auto == typeAnnCtx.MatchRecognizeStreaming, "Internal logic error"); + bool hasPq = false; NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node){ if (node->IsCallable("DataSource")) { @@ -31,7 +40,8 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& const auto& params = node->ChildRef(4); const auto pos = node->Pos(); - const bool isStreaming = IsStreaming(input); + const bool isStreaming = IsStreaming(input, typeAnnCtx); + TExprNode::TPtr settings = AddSetting(*ctx.NewList(pos, {}), pos, "Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx); diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h index 0285f0d097..cb21c059b4 100644 --- a/ydb/library/yql/core/yql_type_annotation.h +++ b/ydb/library/yql/core/yql_type_annotation.h @@ -238,6 +238,12 @@ struct TTypeAnnotationContext: public TThrRefBase { IArrowResolver::TPtr ArrowResolver; TString CostBasedOptimizerType; bool MatchRecognize = false; + enum class EMatchRecognizeStreamingMode { + Disable, + Auto, + Force, + }; + EMatchRecognizeStreamingMode MatchRecognizeStreaming = EMatchRecognizeStreamingMode::Auto; i64 TimeOrderRecoverDelay = -10'000'000; //microseconds i64 TimeOrderRecoverAhead = 10'000'000; //microseconds ui32 TimeOrderRecoverRowLimit = 1'000'000; diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp index 129d6f672f..5b531dc2a3 100644 --- a/ydb/library/yql/providers/config/yql_config_provider.cpp +++ b/ydb/library/yql/providers/config/yql_config_provider.cpp @@ -870,6 +870,23 @@ namespace { return false; } } + else if (name == "MatchRecognizeStream") { + if (args.size() != 1) { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size())); + return false; + } + const auto& arg = args[0]; + if (arg == "disable") { + Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Disable; + } else if (arg == "auto") { + Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Auto; + } else if (arg == "force") { + Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Force; + } else { + ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0])); + return false; + } + } else { ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name)); return false; |