aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-19 22:22:42 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-19 22:37:56 +0300
commitec05b76b4a9bbd7f35510298508a6435d7c43500 (patch)
tree8001f93bdc23ad6e34e0b45139962670078ee0ff
parent0f1825e5660fc3c4b9f0e618aabcd650eaa8c815 (diff)
downloadydb-ec05b76b4a9bbd7f35510298508a6435d7c43500.tar.gz
YQL-16443 test steaming version of match_recognize on tables
-rw-r--r--ydb/library/yql/core/yql_opt_match_recognize.cpp14
-rw-r--r--ydb/library/yql/core/yql_type_annotation.h6
-rw-r--r--ydb/library/yql/providers/config/yql_config_provider.cpp17
3 files changed, 35 insertions, 2 deletions
diff --git a/ydb/library/yql/core/yql_opt_match_recognize.cpp b/ydb/library/yql/core/yql_opt_match_recognize.cpp
index 06522daf0a..10598d0833 100644
--- a/ydb/library/yql/core/yql_opt_match_recognize.cpp
+++ b/ydb/library/yql/core/yql_opt_match_recognize.cpp
@@ -9,7 +9,16 @@ namespace NYql {
using namespace NNodes;
namespace {
-bool IsStreaming(const TExprNode::TPtr& input) {
+bool IsStreaming(const TExprNode::TPtr& input, const TTypeAnnotationContext& typeAnnCtx) {
+ if (TTypeAnnotationContext::EMatchRecognizeStreamingMode::Disable == typeAnnCtx.MatchRecognizeStreaming){
+ return false;
+ }
+ if (TTypeAnnotationContext::EMatchRecognizeStreamingMode::Force == typeAnnCtx.MatchRecognizeStreaming){
+ return true;
+ }
+
+ YQL_ENSURE(TTypeAnnotationContext::EMatchRecognizeStreamingMode::Auto == typeAnnCtx.MatchRecognizeStreaming, "Internal logic error");
+
bool hasPq = false;
NYql::VisitExpr(input, [&hasPq](const TExprNode::TPtr& node){
if (node->IsCallable("DataSource")) {
@@ -31,7 +40,8 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
const auto& params = node->ChildRef(4);
const auto pos = node->Pos();
- const bool isStreaming = IsStreaming(input);
+ const bool isStreaming = IsStreaming(input, typeAnnCtx);
+
TExprNode::TPtr settings = AddSetting(*ctx.NewList(pos, {}), pos,
"Streaming", ctx.NewAtom(pos, ToString(isStreaming)), ctx);
diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h
index 0285f0d097..cb21c059b4 100644
--- a/ydb/library/yql/core/yql_type_annotation.h
+++ b/ydb/library/yql/core/yql_type_annotation.h
@@ -238,6 +238,12 @@ struct TTypeAnnotationContext: public TThrRefBase {
IArrowResolver::TPtr ArrowResolver;
TString CostBasedOptimizerType;
bool MatchRecognize = false;
+ enum class EMatchRecognizeStreamingMode {
+ Disable,
+ Auto,
+ Force,
+ };
+ EMatchRecognizeStreamingMode MatchRecognizeStreaming = EMatchRecognizeStreamingMode::Auto;
i64 TimeOrderRecoverDelay = -10'000'000; //microseconds
i64 TimeOrderRecoverAhead = 10'000'000; //microseconds
ui32 TimeOrderRecoverRowLimit = 1'000'000;
diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp
index 129d6f672f..5b531dc2a3 100644
--- a/ydb/library/yql/providers/config/yql_config_provider.cpp
+++ b/ydb/library/yql/providers/config/yql_config_provider.cpp
@@ -870,6 +870,23 @@ namespace {
return false;
}
}
+ else if (name == "MatchRecognizeStream") {
+ if (args.size() != 1) {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size()));
+ return false;
+ }
+ const auto& arg = args[0];
+ if (arg == "disable") {
+ Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Disable;
+ } else if (arg == "auto") {
+ Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Auto;
+ } else if (arg == "force") {
+ Types.MatchRecognizeStreaming = TTypeAnnotationContext::EMatchRecognizeStreamingMode::Force;
+ } else {
+ ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
+ return false;
+ }
+ }
else {
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
return false;