diff options
| author | grigoriypisar <[email protected]> | 2025-08-28 16:56:17 +0300 |
|---|---|---|
| committer | grigoriypisar <[email protected]> | 2025-08-28 17:21:58 +0300 |
| commit | 45c5e8ac55972ded21c846f9e05a3d61d197e3a4 (patch) | |
| tree | 058b5d184adcc94e57a8262c9dcde768d5589eb7 /yql/essentials/sql | |
| parent | 9be8744bb96ab00cbf25afb60ab54db5cbad95dc (diff) | |
added sql syntax for CREATE / ALTER DROP STREAMING QUERY
Добавлен синтаксис (переиспользован код для inline action):
```
CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] query_name [WITH (
key = value
...
)] AS DO BEGIN
...
END DO;
ALTER STREAMING QUERY [IF EXISTS] query_name [SET(
key = value
...
)] [AS DO BEGIN
...
END DO];
DROP STREAMING QUERY [IF EXISTS] query_name;
```
commit_hash:29fa6aa7e61ecf45112480fe3c1df8fab542354e
Diffstat (limited to 'yql/essentials/sql')
| -rw-r--r-- | yql/essentials/sql/v1/SQLv1.g.in | 33 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/SQLv1Antlr4.g.in | 33 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/complete/sql_complete_ut.cpp | 3 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/context.cpp | 23 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/context.h | 2 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/format/sql_format.cpp | 43 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/format/sql_format_ut.h | 54 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/node.h | 10 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/object_processing.cpp | 121 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/object_processing.h | 116 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/query.cpp | 17 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/sql.cpp | 3 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/sql_query.cpp | 118 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/sql_translation.cpp | 182 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/sql_translation.h | 6 | ||||
| -rw-r--r-- | yql/essentials/sql/v1/sql_ut_common.h | 396 |
16 files changed, 1054 insertions, 106 deletions
diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in index 55b650d796a..1d86d4eefe4 100644 --- a/yql/essentials/sql/v1/SQLv1.g.in +++ b/yql/essentials/sql/v1/SQLv1.g.in @@ -81,6 +81,9 @@ sql_stmt_core: | drop_transfer_stmt | alter_database_stmt | show_create_table_stmt + | create_streaming_query_stmt + | alter_streaming_query_stmt + | drop_streaming_query_stmt ; expr: @@ -613,6 +616,33 @@ alter_external_data_source_action: drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE (IF EXISTS)? object_ref; +create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref + create_streaming_query_features? + streaming_query_definition +; +create_streaming_query_features: WITH streaming_query_settings; + +alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref + alter_streaming_query_action +; +alter_streaming_query_action: + alter_streaming_query_set_settings + | alter_streaming_query_set_settings? streaming_query_definition +; +alter_streaming_query_set_settings: SET streaming_query_settings; + +streaming_query_settings: LPAREN streaming_query_setting (COMMA streaming_query_setting)* COMMA? RPAREN; +streaming_query_setting: an_id_or_type EQUALS streaming_query_setting_value; +streaming_query_setting_value: + id_or_type + | STRING_VALUE + | bool_value +; + +streaming_query_definition: AS DO inline_action; + +drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref; + create_view_stmt: CREATE VIEW (IF NOT EXISTS)? object_ref create_object_features? AS select_stmt @@ -1509,6 +1539,7 @@ keyword_as_compat: | SEQUENCE | SOURCE | START + | STREAMING | SUBQUERY | SUBSET | SYMBOLS @@ -1739,6 +1770,7 @@ keyword_compat: ( | SEQUENCE | SOURCE | START + | STREAMING | SUBQUERY | SUBSET | SYMBOLS @@ -2118,6 +2150,7 @@ SEQUENCE: S E Q U E N C E; SOURCE: S O U R C E; START: S T A R T; STREAM: S T R E A M; +STREAMING: S T R E A M I N G; STRUCT: S T R U C T; SUBQUERY: S U B Q U E R Y; SUBSET: S U B S E T; diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in index fa26fd74415..e03f6b556f3 100644 --- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in +++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in @@ -80,6 +80,9 @@ sql_stmt_core: | drop_transfer_stmt | alter_database_stmt | show_create_table_stmt + | create_streaming_query_stmt + | alter_streaming_query_stmt + | drop_streaming_query_stmt ; expr: @@ -612,6 +615,33 @@ alter_external_data_source_action: drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE (IF EXISTS)? object_ref; +create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref + create_streaming_query_features? + streaming_query_definition +; +create_streaming_query_features: WITH streaming_query_settings; + +alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref + alter_streaming_query_action +; +alter_streaming_query_action: + alter_streaming_query_set_settings + | alter_streaming_query_set_settings? streaming_query_definition +; +alter_streaming_query_set_settings: SET streaming_query_settings; + +streaming_query_settings: LPAREN streaming_query_setting (COMMA streaming_query_setting)* COMMA? RPAREN; +streaming_query_setting: an_id_or_type EQUALS streaming_query_setting_value; +streaming_query_setting_value: + id_or_type + | STRING_VALUE + | bool_value +; + +streaming_query_definition: AS DO inline_action; + +drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref; + create_view_stmt: CREATE VIEW (IF NOT EXISTS)? object_ref create_object_features? AS select_stmt @@ -1509,6 +1539,7 @@ keyword_as_compat: | SEQUENCE | SOURCE | START + | STREAMING | SUBQUERY | SUBSET | SYMBOLS @@ -1739,6 +1770,7 @@ keyword_compat: ( | SEQUENCE | SOURCE | START + | STREAMING | SUBQUERY | SUBSET | SYMBOLS @@ -2120,6 +2152,7 @@ SEQUENCE: S E Q U E N C E; SOURCE: S O U R C E; START: S T A R T; STREAM: S T R E A M; +STREAMING: S T R E A M I N G; STRUCT: S T R U C T; SUBQUERY: S U B Q U E R Y; SUBSET: S U B S E T; diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp index 52eb2033717..faab7508f85 100644 --- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp +++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp @@ -297,6 +297,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { {Keyword, "OBJECT"}, {Keyword, "RESOURCE POOL"}, {Keyword, "SEQUENCE"}, + {Keyword, "STREAMING QUERY"}, {Keyword, "TABLE"}, {Keyword, "TABLESTORE"}, {Keyword, "TOPIC"}, @@ -317,6 +318,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { {Keyword, "OBJECT"}, {Keyword, "OR REPLACE"}, {Keyword, "RESOURCE POOL"}, + {Keyword, "STREAMING QUERY"}, {Keyword, "TABLE"}, {Keyword, "TABLESTORE"}, {Keyword, "TEMP TABLE"}, @@ -371,6 +373,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { {Keyword, "GROUP"}, {Keyword, "OBJECT"}, {Keyword, "RESOURCE POOL"}, + {Keyword, "STREAMING QUERY"}, {Keyword, "TABLE"}, {Keyword, "TABLESTORE"}, {Keyword, "TOPIC"}, diff --git a/yql/essentials/sql/v1/context.cpp b/yql/essentials/sql/v1/context.cpp index b95a559cac5..8d412ba2477 100644 --- a/yql/essentials/sql/v1/context.cpp +++ b/yql/essentials/sql/v1/context.cpp @@ -121,15 +121,8 @@ TContext::TContext(const TLexers& lexers, const TParsers& parsers, Libraries.emplace(lib, TLibraryStuff()); } - Scoped = MakeIntrusive<TScopedState>(); + Scoped = CreateScopedState(); AllScopes.push_back(Scoped); - Scoped->UnicodeLiterals = settings.UnicodeLiterals; - if (settings.DefaultCluster) { - Scoped->CurrCluster = TDeferredAtom({}, settings.DefaultCluster); - auto provider = GetClusterProvider(settings.DefaultCluster); - YQL_ENSURE(provider); - Scoped->CurrService = *provider; - } Position_.File = settings.File; @@ -491,6 +484,20 @@ bool TContext::UseUnordered(const TTableRef& table) const { return YtProviderName == table.Service; } +TScopedStatePtr TContext::CreateScopedState() const { + auto state = MakeIntrusive<TScopedState>(); + state->UnicodeLiterals = Settings.UnicodeLiterals; + + if (Settings.DefaultCluster) { + state->CurrCluster = TDeferredAtom({}, Settings.DefaultCluster); + + const auto provider = GetClusterProvider(Settings.DefaultCluster); + YQL_ENSURE(provider); + state->CurrService = *provider; + } + + return state; +} TMaybe<EColumnRefState> GetFunctionArgColumnStatus(TContext& ctx, const TString& module, const TString& func, size_t argIndex) { static const TSet<TStringBuf> denyForAllArgs = { diff --git a/yql/essentials/sql/v1/context.h b/yql/essentials/sql/v1/context.h index 59ef9353915..aa30982c35c 100644 --- a/yql/essentials/sql/v1/context.h +++ b/yql/essentials/sql/v1/context.h @@ -245,6 +245,8 @@ namespace NSQLTranslationV1 { TVector<NSQLTranslation::TSQLHint> PullHintForToken(NYql::TPosition tokenPos); void WarnUnusedHints(); + TScopedStatePtr CreateScopedState() const; + private: IOutputStream& MakeIssue(NYql::ESeverity severity, NYql::TIssueCode code, NYql::TPosition pos); diff --git a/yql/essentials/sql/v1/format/sql_format.cpp b/yql/essentials/sql/v1/format/sql_format.cpp index e45adc280d8..845a5091986 100644 --- a/yql/essentials/sql/v1/format/sql_format.cpp +++ b/yql/essentials/sql/v1/format/sql_format.cpp @@ -696,7 +696,10 @@ private: case TRule_sql_stmt_core::kAltSqlStmtCore14: // export case TRule_sql_stmt_core::kAltSqlStmtCore32: // drop external data source case TRule_sql_stmt_core::kAltSqlStmtCore34: // drop replication + case TRule_sql_stmt_core::kAltSqlStmtCore47: // drop resource pool + case TRule_sql_stmt_core::kAltSqlStmtCore54: // drop resource pool classifier case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer + case TRule_sql_stmt_core::kAltSqlStmtCore65: // drop streaming query return true; case TRule_sql_stmt_core::kAltSqlStmtCore3: { // named nodes const auto& stmt = msg.GetAlt_sql_stmt_core3().GetRule_named_nodes_stmt1(); @@ -1605,6 +1608,42 @@ private: VisitAllFields(TRule_drop_resource_pool_classifier_stmt::GetDescriptor(), msg); } + void VisitStreamingQuerySettings(const TRule_streaming_query_settings& msg) { + VisitKeyword(msg.GetToken1()); + NewLine(); + PushCurrentIndent(); + + Visit(msg.GetRule_streaming_query_setting2()); + for (const auto& setting : msg.GetBlock3()) { + Visit(setting.GetToken1()); + NewLine(); + Visit(setting.GetRule_streaming_query_setting2()); + } + + if (msg.HasBlock4()) { + TokenIndex_++; + } + + PopCurrentIndent(); + NewLine(); + VisitKeyword(msg.GetToken5()); + } + + void VisitCreateStreamingQuery(const TRule_create_streaming_query_stmt& msg) { + NewLine(); + VisitAllFields(TRule_create_streaming_query_stmt::GetDescriptor(), msg); + } + + void VisitAlterStreamingQuery(const TRule_alter_streaming_query_stmt& msg) { + NewLine(); + VisitAllFields(TRule_alter_streaming_query_stmt::GetDescriptor(), msg); + } + + void VisitDropStreamingQuery(const TRule_drop_streaming_query_stmt& msg) { + NewLine(); + VisitAllFields(TRule_drop_streaming_query_stmt::GetDescriptor(), msg); + } + void VisitAllFields(const NProtoBuf::Descriptor* descr, const NProtoBuf::Message& msg) { VisitAllFieldsImpl<TPrettyVisitor, &TPrettyVisitor::Visit>(this, descr, msg); } @@ -3097,6 +3136,10 @@ TStaticData::TStaticData() {TRule_alter_sequence_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterSequence)}, {TRule_alter_database_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterDatabase)}, {TRule_show_create_table_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitShowCreateTable)}, + {TRule_streaming_query_settings::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitStreamingQuerySettings)}, + {TRule_create_streaming_query_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateStreamingQuery)}, + {TRule_alter_streaming_query_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterStreamingQuery)}, + {TRule_drop_streaming_query_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropStreamingQuery)}, }) , ObfuscatingVisitDispatch({ {TToken::GetDescriptor(), MakeObfuscatingFunctor(&TObfuscatingVisitor::VisitToken)}, diff --git a/yql/essentials/sql/v1/format/sql_format_ut.h b/yql/essentials/sql/v1/format/sql_format_ut.h index 0e2924d1272..8c710ed78dd 100644 --- a/yql/essentials/sql/v1/format/sql_format_ut.h +++ b/yql/essentials/sql/v1/format/sql_format_ut.h @@ -1920,3 +1920,57 @@ Y_UNIT_TEST(ValueConstructor) { TSetup setup; setup.Run(cases); } + +Y_UNIT_TEST(CreateStreamingQuery) { + TCases cases = {{ + "creAte sTReaMing qUErY TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "CREATE STREAMING QUERY TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + }, { + "creAte sTReaMing qUErY If Not ExIsTs TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "CREATE STREAMING QUERY IF NOT EXISTS TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + }, { + "creAte oR ReplAce sTReaMing qUErY TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "CREATE OR REPLACE STREAMING QUERY TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + }, { + "creAte sTReaMing qUErY TheQuery wiTh (option = tRuE) As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "CREATE STREAMING QUERY TheQuery WITH (\n\toption = TRUE\n) AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + } + }; + + TSetup setup; + setup.Run(cases); +} + +Y_UNIT_TEST(AlterStreamingQuery) { + TCases cases = {{ + "aLTer sTReaMing qUErY TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "ALTER STREAMING QUERY TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + }, { + "aLTer sTReaMing qUErY If ExIsTs TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "ALTER STREAMING QUERY IF EXISTS TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + }, { + "aLTer sTReaMing qUErY TheQuery sEt (option = tRuE)", + "ALTER STREAMING QUERY TheQuery SET (\n\toption = TRUE\n);\n" + }, { + "aLTer sTReaMing qUErY TheQuery sEt (option = tRuE) As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do", + "ALTER STREAMING QUERY TheQuery SET (\n\toption = TRUE\n) AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n" + } + }; + + TSetup setup; + setup.Run(cases); +} + +Y_UNIT_TEST(DropStreamingQuery) { + TCases cases = {{ + "dRop sTReaMing qUErY TheQuery", + "DROP STREAMING QUERY TheQuery;\n" + } , { + "dRop sTReaMing qUErY If ExIsTs TheQuery", + "DROP STREAMING QUERY IF EXISTS TheQuery;\n" + } + }; + + TSetup setup; + setup.Run(cases); +} diff --git a/yql/essentials/sql/v1/node.h b/yql/essentials/sql/v1/node.h index 10c2afcabe5..3d0b79f18aa 100644 --- a/yql/essentials/sql/v1/node.h +++ b/yql/essentials/sql/v1/node.h @@ -1452,6 +1452,14 @@ namespace NSQLTranslationV1 { TString At; }; + struct TStreamingQuerySettings { + inline static constexpr char RESERVED_FEATURE_PREFIX[] = "__"; + inline static constexpr char QUERY_TEXT_FEATURE[] = "__query_text"; + inline static constexpr char QUERY_AST_FEATURE[] = "__query_ast"; + + std::map<TString, TDeferredAtom> Features; + }; + TString IdContent(TContext& ctx, const TString& str); TString IdContentFromString(TContext& ctx, const TString& str); TTableHints GetContextHints(TContext& ctx); @@ -1567,7 +1575,7 @@ namespace NSQLTranslationV1 { TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId, bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context); TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId, - std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context); + bool missingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context); TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId, bool missingOk, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context); TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id, diff --git a/yql/essentials/sql/v1/object_processing.cpp b/yql/essentials/sql/v1/object_processing.cpp index 64b46d612d4..f9028b01b84 100644 --- a/yql/essentials/sql/v1/object_processing.cpp +++ b/yql/essentials/sql/v1/object_processing.cpp @@ -2,9 +2,30 @@ #include <yql/essentials/core/sql_types/yql_callable_names.h> -namespace NSQLTranslationV1 { using namespace NYql; +namespace NSQLTranslationV1 { + +namespace { + +bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) { + for (const auto& [key, value] : features) { + if (value.HasNode() && !value.Build()->Init(ctx, src)) { + return false; + } + } + + return true; +} + +} // anonymous namespace + +TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped) + : Scoped_(scoped) + , ServiceId(Scoped_->CurrService) + , Cluster(Scoped_->CurrCluster) +{} + INode::TPtr TObjectProcessorImpl::BuildKeys() const { auto keys = Y("Key"); keys = L(keys, Q(Y(Q("objectId"), Y("String", BuildQuotedAtom(Pos_, ObjectId_))))); @@ -17,12 +38,9 @@ TObjectProcessorImpl::TObjectProcessorImpl(TPosition pos, const TString& objectI , TObjectOperatorContext(context) , ObjectId_(objectId) , TypeId_(typeId) -{ - -} +{} bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) { - Y_UNUSED(src); Scoped_->UseCluster(ServiceId, Cluster); auto options = FillFeatures(BuildOptions()); auto keys = BuildKeys(); @@ -35,7 +53,17 @@ bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) { return TAstListNode::DoInit(ctx, src); } -INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const { +TObjectProcessorImpl::TPtr TObjectProcessorImpl::DoClone() const { + return {}; +} + +TObjectProcessorWithFeatures::TObjectProcessorWithFeatures(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features) + : TBase(pos, objectId, typeId, context) + , Features_(std::move(features)) +{} + +INode::TPtr TObjectProcessorWithFeatures::FillFeatures(INode::TPtr options) const { if (!Features_.empty()) { auto features = Y(); for (auto&& i : Features_) { @@ -47,42 +75,75 @@ INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const { } options->Add(Q(Y(Q("features"), Q(features)))); } - if (!FeaturesToReset_.empty()) { - auto reset = Y(); - for (const auto& featureName : FeaturesToReset_) { - reset->Add(BuildQuotedAtom(Pos_, featureName)); - } - options->Add(Q(Y(Q("resetFeatures"), Q(reset)))); - } + return options; } -namespace { +bool TObjectProcessorWithFeatures::DoInit(TContext& ctx, ISource* src) { + if (!InitFeatures(ctx, src, Features_)) { + return false; + } -bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) { - for (auto& [key, value] : features) { - if (value.HasNode() && !value.Build()->Init(ctx, src)) { - return false; - } + return TObjectProcessorImpl::DoInit(ctx, src); +} + +INode::TPtr TCreateObject::BuildOptions() const { + TString mode; + + if (ExistingOk_) { + mode = "createObjectIfNotExists"; + } else if (ReplaceIfExists_) { + mode = "createObjectOrReplace"; + } else { + mode = "createObject"; } - return true; + + return Y(Q(Y(Q("mode"), Q(mode)))); } +TCreateObject::TCreateObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, bool existingOk, bool replaceIfExists) + : TBase(pos, objectId, typeId, context, std::move(features)) + , ExistingOk_(existingOk) + , ReplaceIfExists_(replaceIfExists) +{} + +INode::TPtr TUpsertObject::BuildOptions() const { + return Y(Q(Y(Q("mode"), Q("upsertObject")))); } -bool TCreateObject::DoInit(TContext& ctx, ISource* src) { - if (!InitFeatures(ctx, src, Features_)) { - return false; +TAlterObject::TAlterObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, std::set<TString>&& featuresToReset, bool missingOk) + : TBase(pos, objectId, typeId, context, std::move(features)) + , FeaturesToReset_(std::move(featuresToReset)) + , MissingOk_(missingOk) +{} + +INode::TPtr TAlterObject::FillFeatures(INode::TPtr options) const { + options = TBase::FillFeatures(options); + + if (!FeaturesToReset_.empty()) { + auto reset = Y(); + for (const auto& featureName : FeaturesToReset_) { + reset->Add(BuildQuotedAtom(Pos_, featureName)); + } + options->Add(Q(Y(Q("resetFeatures"), Q(reset)))); } - return TObjectProcessorImpl::DoInit(ctx, src); + return options; } -TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped) - : Scoped_(scoped) - , ServiceId(Scoped_->CurrService) - , Cluster(Scoped_->CurrCluster) -{ - +INode::TPtr TAlterObject::BuildOptions() const { + return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "alterObjectIfExists" : "alterObject")))); } +TDropObject::TDropObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, bool missingOk) + : TBase(pos, objectId, typeId, context, std::move(features)) + , MissingOk_(missingOk) +{} + +INode::TPtr TDropObject::BuildOptions() const { + return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "dropObjectIfExists" : "dropObject")))); } + +} // NSQLTranslationV1 diff --git a/yql/essentials/sql/v1/object_processing.h b/yql/essentials/sql/v1/object_processing.h index df3f6d13975..e3ab60a4ab9 100644 --- a/yql/essentials/sql/v1/object_processing.h +++ b/yql/essentials/sql/v1/object_processing.h @@ -1,4 +1,5 @@ #pragma once + #include "node.h" #include "context.h" @@ -7,100 +8,107 @@ namespace NSQLTranslationV1 { class TObjectOperatorContext { protected: TScopedStatePtr Scoped_; + public: TString ServiceId; TDeferredAtom Cluster; + TObjectOperatorContext(const TObjectOperatorContext& baseItem) = default; + TObjectOperatorContext(TScopedStatePtr scoped); }; class TObjectProcessorImpl: public TAstListNode, public TObjectOperatorContext { -protected: using TBase = TAstListNode; + TString ObjectId_; TString TypeId_; + INode::TPtr BuildKeys() const; + +protected: virtual INode::TPtr BuildOptions() const = 0; + virtual INode::TPtr FillFeatures(INode::TPtr options) const = 0; - INode::TPtr BuildKeys() const; + public: TObjectProcessorImpl(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context); bool DoInit(TContext& ctx, ISource* src) override; - TPtr DoClone() const final { - return {}; - } + TPtr DoClone() const final; }; -class TCreateObject: public TObjectProcessorImpl { +class TObjectProcessorWithFeatures: public TObjectProcessorImpl { +protected: + using TFeatureMap = std::map<TString, TDeferredAtom>; + private: using TBase = TObjectProcessorImpl; - std::map<TString, TDeferredAtom> Features_; - std::set<TString> FeaturesToReset_; + + TFeatureMap Features_; + protected: + INode::TPtr FillFeatures(INode::TPtr options) const override; + +public: + bool DoInit(TContext& ctx, ISource* src) override; + + TObjectProcessorWithFeatures(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features); +}; + +class TCreateObject final: public TObjectProcessorWithFeatures { + using TBase = TObjectProcessorWithFeatures; + bool ExistingOk_ = false; bool ReplaceIfExists_ = false; + protected: - virtual INode::TPtr BuildOptions() const override { - TString mode; - if (ExistingOk_) { - mode = "createObjectIfNotExists"; - } else if (ReplaceIfExists_) { - mode = "createObjectOrReplace"; - } else { - mode = "createObject"; - } - - return Y(Q(Y(Q("mode"), Q(mode)))); - } - virtual INode::TPtr FillFeatures(INode::TPtr options) const override; - bool DoInit(TContext& ctx, ISource* src) override; + INode::TPtr BuildOptions() const final; + public: - TCreateObject(TPosition pos, const TString& objectId, - const TString& typeId, bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context) - : TBase(pos, objectId, typeId, context) - , Features_(std::move(features)) - , FeaturesToReset_(std::move(featuresToReset)) - , ExistingOk_(existingOk) - , ReplaceIfExists_(replaceIfExists) { - } + TCreateObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, bool existingOk, bool replaceIfExists); }; -class TUpsertObject final: public TCreateObject { -private: - using TBase = TCreateObject; +class TUpsertObject final: public TObjectProcessorWithFeatures { + using TBase = TObjectProcessorWithFeatures; + protected: - virtual INode::TPtr BuildOptions() const override { - return Y(Q(Y(Q("mode"), Q("upsertObject")))); - } + INode::TPtr BuildOptions() const final; + public: using TBase::TBase; }; -class TAlterObject final: public TCreateObject { -private: - using TBase = TCreateObject; +class TAlterObject final: public TObjectProcessorWithFeatures { + using TBase = TObjectProcessorWithFeatures; + + std::set<TString> FeaturesToReset_; + bool MissingOk_ = false; + protected: - virtual INode::TPtr BuildOptions() const override { - return Y(Q(Y(Q("mode"), Q("alterObject")))); - } + INode::TPtr BuildOptions() const final; + + INode::TPtr FillFeatures(INode::TPtr options) const final; + public: - using TBase::TBase; + TAlterObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, std::set<TString>&& featuresToReset, bool missingOk); }; -class TDropObject final: public TCreateObject { -private: - using TBase = TCreateObject; - bool MissingOk() const { - return ExistingOk_; // Because we were derived from TCreateObject - } +class TDropObject final: public TObjectProcessorWithFeatures { + using TBase = TObjectProcessorWithFeatures; + + bool MissingOk_ = false; + protected: - virtual INode::TPtr BuildOptions() const override { - return Y(Q(Y(Q("mode"), Q(MissingOk() ? "dropObjectIfExists" : "dropObject")))); - } + INode::TPtr BuildOptions() const final; + public: - using TBase::TBase; + TDropObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, bool missingOk); }; -} +} // NSQLTranslationV1 diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp index 5937ab680e0..ab863cc32ff 100644 --- a/yql/essentials/sql/v1/query.cpp +++ b/yql/essentials/sql/v1/query.cpp @@ -2475,21 +2475,22 @@ private: TNodePtr BuildUpsertObjectOperation(TPosition pos, const TString& objectId, const TString& typeId, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context) { - return new TUpsertObject(pos, objectId, typeId, false, false, std::move(features), std::set<TString>(), context); + return new TUpsertObject(pos, objectId, typeId, context, std::move(features)); } + TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId, bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context) { - return new TCreateObject(pos, objectId, typeId, existingOk, replaceIfExists, std::move(features), std::set<TString>(), context); + return new TCreateObject(pos, objectId, typeId, context, std::move(features), existingOk, replaceIfExists); } + TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId, - std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context) -{ - return new TAlterObject(pos, secretId, typeId, false, false, std::move(features), std::move(featuresToReset), context); + bool missingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context) { + return new TAlterObject(pos, secretId, typeId, context, std::move(features), std::move(featuresToReset), missingOk); } + TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId, - bool missingOk, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context) -{ - return new TDropObject(pos, secretId, typeId, missingOk, false, std::move(options), std::set<TString>(), context); + bool missingOk, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context) { + return new TDropObject(pos, secretId, typeId, context, std::move(options), missingOk); } TNodePtr BuildDropRoles(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TVector<TDeferredAtom>& toDrop, bool isUser, bool missingOk, TScopedStatePtr scoped) { diff --git a/yql/essentials/sql/v1/sql.cpp b/yql/essentials/sql/v1/sql.cpp index 3d8951cb83e..7f60d87d6a1 100644 --- a/yql/essentials/sql/v1/sql.cpp +++ b/yql/essentials/sql/v1/sql.cpp @@ -184,6 +184,9 @@ bool NeedUseForAllStatements(const TRule_sql_stmt_core::AltCase& subquery) { case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer case TRule_sql_stmt_core::kAltSqlStmtCore61: // alter database case TRule_sql_stmt_core::kAltSqlStmtCore62: // show create table + case TRule_sql_stmt_core::kAltSqlStmtCore63: // create streaming query + case TRule_sql_stmt_core::kAltSqlStmtCore64: // alter streaming query + case TRule_sql_stmt_core::kAltSqlStmtCore65: // drop streaming query return false; } } diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index d48fb6ff023..b84e2df0add 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -954,7 +954,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& return false; } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore29: { @@ -1053,7 +1053,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", std::move(kv), std::move(toReset), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", false, std::move(kv), std::move(toReset), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore32: { @@ -1323,7 +1323,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& return false; } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore41: @@ -1486,7 +1486,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", std::move(kv), std::move(toReset), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", false, std::move(kv), std::move(toReset), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore47: { @@ -1722,7 +1722,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", std::move(kv), std::move(toReset), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", false, std::move(kv), std::move(toReset), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore54: { @@ -2001,6 +2001,114 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildShowCreate(Ctx_.Pos(), tr, type, Ctx_.Scoped)); break; } + case TRule_sql_stmt_core::kAltSqlStmtCore63: { + // create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref + // (WITH streaming_query_settings)? + // streaming_query_definition; + + const auto& node = core.GetAlt_sql_stmt_core63().GetRule_create_streaming_query_stmt1(); + Ctx_.Token(node.GetToken1()); + + // object_ref + TObjectOperatorContext context(Ctx_.Scoped); + const auto& objectPath = ParseObjectPath(node.GetRule_object_ref6(), context); + if (!objectPath) { + return false; + } + + // OR REPLACE + const bool replaceIfExists = node.HasBlock2(); + if (replaceIfExists) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken1().GetId(), OR) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken2().GetId(), REPLACE) + ); + } + + // IF NOT EXISTS + const bool existingOk = node.HasBlock5(); + if (existingOk) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken1().GetId(), IF) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken2().GetId(), NOT) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken3().GetId(), EXISTS) + ); + } + + // WITH streaming_query_settings + TStreamingQuerySettings settings; + if (node.HasBlock7()) { + const auto& features = node.GetBlock7().GetRule_create_streaming_query_features1(); + Ctx_.Token(features.GetToken1()); + + if (!ParseStreamingQuerySettings(features.GetRule_streaming_query_settings2(), settings)) { + return false; + } + } + + // streaming_query_definition + if (!ParseStreamingQueryDefinition(node.GetRule_streaming_query_definition8(), settings)) { + return false; + } + + AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", existingOk, replaceIfExists, std::move(settings.Features), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore64: { + // alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref + // alter_streaming_query_action; + + const auto& node = core.GetAlt_sql_stmt_core64().GetRule_alter_streaming_query_stmt1(); + + // object_ref + TObjectOperatorContext context(Ctx_.Scoped); + const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context); + if (!objectPath) { + return false; + } + + // IF EXISTS + const bool missingOk = node.HasBlock4(); + if (missingOk) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS) + ); + } + + // alter_streaming_query_action + TStreamingQuerySettings settings; + if (!ParseAlterStreamingQueryAction(node.GetRule_alter_streaming_query_action6(), settings)) { + return false; + } + + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, std::move(settings.Features), {}, context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore65: { + // drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref; + + const auto& node = core.GetAlt_sql_stmt_core65().GetRule_drop_streaming_query_stmt1(); + + // object_ref + TObjectOperatorContext context(Ctx_.Scoped); + const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context); + if (!objectPath) { + return false; + } + + // IF EXISTS + const bool missingOk = node.HasBlock4(); + if (missingOk) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS) + ); + } + + AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, {}, context)); + break; + } case TRule_sql_stmt_core::ALT_NOT_SET: Ctx_.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName); AltNotImplemented("sql_stmt_core", core); diff --git a/yql/essentials/sql/v1/sql_translation.cpp b/yql/essentials/sql/v1/sql_translation.cpp index d6a420949c3..7d1dc51c20f 100644 --- a/yql/essentials/sql/v1/sql_translation.cpp +++ b/yql/essentials/sql/v1/sql_translation.cpp @@ -4,11 +4,13 @@ #include "sql_query.h" #include "sql_values.h" #include "sql_select.h" +#include "object_processing.h" #include "source.h" #include "antlr_token.h" #include <yql/essentials/sql/settings/partitioning.h> #include <yql/essentials/sql/v1/proto_parser/proto_parser.h> +#include <yql/essentials/utils/yql_paths.h> #include <util/generic/scope.h> #include <util/string/join.h> @@ -5541,4 +5543,184 @@ bool TSqlTranslation::ParseResourcePoolClassifierSettings(std::map<TString, TDef } } +TMaybe<TString> TSqlTranslation::ParseObjectPath(const TRule_object_ref& node, TObjectOperatorContext& context) { + // object_ref: (cluster_expr .)? id_or_at + + if (node.HasBlock1()) { + if (!ClusterExpr(node.GetBlock1().GetRule_cluster_expr1(), false, context.ServiceId, context.Cluster)) { + return Nothing(); + } + } + + const auto& [hasAt, objectId] = Id(node.GetRule_id_or_at2(), *this); + if (hasAt) { + Error() << "'@' is not allowed prefix for object name"; + return Nothing(); + } + + return BuildTablePath(Ctx_.GetPrefixPath(context.ServiceId, context.Cluster), objectId); +} + +bool TSqlTranslation::ParseStreamingQuerySetting(const TRule_streaming_query_setting& node, TStreamingQuerySettings& settings) { + // streaming_query_setting: an_id_or_type = (id_or_type | STRING_VALUE | bool_value) + + const auto& id = to_lower(Id(node.GetRule_an_id_or_type1(), *this)); + if (id.StartsWith(TStreamingQuerySettings::RESERVED_FEATURE_PREFIX)) { + Error() << "Streaming query parameter name should not start with prefix '" << TStreamingQuerySettings::RESERVED_FEATURE_PREFIX << "': " << to_upper(id); + return false; + } + + const auto [it, inserted] = settings.Features.emplace(id, TDeferredAtom{}); + if (!inserted) { + Error() << "Found duplicated parameter: " << to_upper(id); + return false; + } + + const auto& valueNode = node.GetRule_streaming_query_setting_value3(); + switch (valueNode.GetAltCase()) { + case TRule_streaming_query_setting_value::kAltStreamingQuerySettingValue1: { + it->second = TDeferredAtom(Ctx_.Pos(), Id(valueNode.GetAlt_streaming_query_setting_value1().GetRule_id_or_type1(), *this)); + break; + } + case TRule_streaming_query_setting_value::kAltStreamingQuerySettingValue2: { + const auto& strToken = Ctx_.Token(valueNode.GetAlt_streaming_query_setting_value2().GetToken1()); + const auto& strValue = StringContent(Ctx_, Ctx_.Pos(), strToken); + if (!strValue) { + Error() << "Cannot parse string correctly: " << strToken; + return false; + } + + it->second = TDeferredAtom(Ctx_.Pos(), strValue->Content); + break; + } + case TRule_streaming_query_setting_value::kAltStreamingQuerySettingValue3: { + it->second = TDeferredAtom(BuildLiteralBool( + Ctx_.Pos(), + FromString<bool>(Ctx_.Token(valueNode.GetAlt_streaming_query_setting_value3().GetRule_bool_value1().GetToken1())) + ), Ctx_); + break; + } + case TRule_streaming_query_setting_value::ALT_NOT_SET: { + Y_ABORT("You should change implementation according to grammar changes"); + return false; + } + } + + return true; +} + +bool TSqlTranslation::ParseStreamingQuerySettings(const TRule_streaming_query_settings& node, TStreamingQuerySettings& settings) { + // streaming_query_settings: ( + // streaming_query_setting + // (, streaming_query_setting)* ,? + // ) + + if (!ParseStreamingQuerySetting(node.GetRule_streaming_query_setting2(), settings)) { + return false; + } + + for (const auto& setting : node.GetBlock3()) { + if (!ParseStreamingQuerySetting(setting.GetRule_streaming_query_setting2(), settings)) { + return false; + } + } + + return true; +} + +bool TSqlTranslation::ParseStreamingQueryDefinition(const TRule_streaming_query_definition& node, TStreamingQuerySettings& settings) { + // streaming_query_definition: AS DO (BEGIN define_action_or_subquery_body END DO); + + Ctx_.Token(node.GetToken1()); + + // Save query ast to perform type check and validation of allowed expressions + + const auto saveScoped = Ctx_.Scoped; + Ctx_.Scoped = Ctx_.CreateScopedState(); // Reset scoped context to interrupt inheritance of global settings and named nodes + Ctx_.AllScopes.push_back(Ctx_.Scoped); + Ctx_.Scoped->Local = TScopedState::TLocal{}; + Ctx_.ScopeLevel++; + TSqlQuery query(Ctx_, Ctx_.Settings.Mode, false); + TBlocks innerBlocks; + + const auto& inlineAction = node.GetRule_inline_action3(); + const bool hasValidBody = DefineActionOrSubqueryBody(query, innerBlocks, inlineAction.GetRule_define_action_or_subquery_body2()); + auto queryNode = hasValidBody ? BuildQuery(Ctx_.Pos(), innerBlocks, false, Ctx_.Scoped, Ctx_.SeqMode) : nullptr; + WarnUnusedNodes(); + Ctx_.ScopeLevel--; + Ctx_.Scoped = saveScoped; + + if (!queryNode) { + return false; + } + + TNodePtr blockNode = new TAstListNodeImpl(Ctx_.Pos()); + blockNode->Add("block"); + blockNode->Add(blockNode->Q(queryNode)); + settings.Features[TStreamingQuerySettings::QUERY_AST_FEATURE] = TDeferredAtom(blockNode, Ctx_); + + // Extract whole query text between BEGIN and END tokens + + const auto& queryBegin = inlineAction.GetToken1(); + Y_DEBUG_ABORT_UNLESS(IS_TOKEN(Ctx_.Settings.Antlr4Parser, queryBegin.GetId(), BEGIN)); + + const auto& queryEnd = inlineAction.GetToken3(); + Y_DEBUG_ABORT_UNLESS(IS_TOKEN(Ctx_.Settings.Antlr4Parser, queryEnd.GetId(), END)); + + auto beginPos = GetQueryPosition(Ctx_.Query, queryBegin, Ctx_.Settings.Antlr4Parser); + const auto endPos = GetQueryPosition(Ctx_.Query, queryEnd, Ctx_.Settings.Antlr4Parser); + if (beginPos == std::string::npos || endPos == std::string::npos) { + Error() << "Failed to parse streaming query definition"; + return false; + } + + beginPos += queryBegin.value().size(); + settings.Features[TStreamingQuerySettings::QUERY_TEXT_FEATURE] = TDeferredAtom(Ctx_.Pos(), Ctx_.Query.substr(beginPos, endPos - beginPos)); + + return true; +} + +bool TSqlTranslation::ParseAlterStreamingQueryAction(const TRule_alter_streaming_query_action& node, TStreamingQuerySettings& settings) { + // alter_streaming_query_action: + // (SET streaming_query_settings) + // | (SET streaming_query_settings)? streaming_query_definition + + switch (node.GetAltCase()) { + case TRule_alter_streaming_query_action::kAltAlterStreamingQueryAction1: { + const auto& alterSettingsNode = node.GetAlt_alter_streaming_query_action1().GetRule_alter_streaming_query_set_settings1(); + Ctx_.Token(alterSettingsNode.GetToken1()); + + if (!ParseStreamingQuerySettings(alterSettingsNode.GetRule_streaming_query_settings2(), settings)) { + return false; + } + + break; + } + case TRule_alter_streaming_query_action::kAltAlterStreamingQueryAction2: { + const auto& action = node.GetAlt_alter_streaming_query_action2(); + + if (action.HasBlock1()) { + const auto& alterSettingsNode = action.GetBlock1().GetRule_alter_streaming_query_set_settings1(); + Ctx_.Token(alterSettingsNode.GetToken1()); + + if (!ParseStreamingQuerySettings(alterSettingsNode.GetRule_streaming_query_settings2(), settings)) { + return false; + } + } + + if (!ParseStreamingQueryDefinition(action.GetRule_streaming_query_definition2(), settings)) { + return false; + } + + break; + } + case TRule_alter_streaming_query_action::ALT_NOT_SET: { + Y_ABORT("You should change implementation according to grammar changes"); + return false; + } + } + + return true; +} + } // namespace NSQLTranslationV1 diff --git a/yql/essentials/sql/v1/sql_translation.h b/yql/essentials/sql/v1/sql_translation.h index 619f4276be8..c17a2d43a48 100644 --- a/yql/essentials/sql/v1/sql_translation.h +++ b/yql/essentials/sql/v1/sql_translation.h @@ -270,6 +270,12 @@ protected: bool ParseDatabaseSettings(const TRule_database_settings& in, THashMap<TString, TNodePtr>& out); bool ParseDatabaseSetting(const TRule_database_setting& in, THashMap<TString, TNodePtr>& out); + TMaybe<TString> ParseObjectPath(const TRule_object_ref& node, TObjectOperatorContext& context); + bool ParseStreamingQuerySetting(const TRule_streaming_query_setting& node, TStreamingQuerySettings& settings); + bool ParseStreamingQuerySettings(const TRule_streaming_query_settings& node, TStreamingQuerySettings& settings); + bool ParseStreamingQueryDefinition(const TRule_streaming_query_definition& node, TStreamingQuerySettings& settings); + bool ParseAlterStreamingQueryAction(const TRule_alter_streaming_query_action& node, TStreamingQuerySettings& settings); + bool ValidateAuthMethod(const std::map<TString, TDeferredAtom>& result); bool ValidateExternalTable(const TCreateTableParameters& params); diff --git a/yql/essentials/sql/v1/sql_ut_common.h b/yql/essentials/sql/v1/sql_ut_common.h index 518331289f5..5b0c5a28f26 100644 --- a/yql/essentials/sql/v1/sql_ut_common.h +++ b/yql/essentials/sql/v1/sql_ut_common.h @@ -9304,3 +9304,399 @@ Y_UNIT_TEST_SUITE(HoppingWindow) { ); } } + +Y_UNIT_TEST_SUITE(StreamingQuery) { + Y_UNIT_TEST(CreateStreamingQueryBasic) { + NYql::TAstParseResult res = SqlToYql(R"sql( +USE plato; +-- Some comment +CREATE STREAMING QUERY MyQuery AS DO BEGIN +USE plato; +$source = SELECT * FROM Input; +INSERT INTO Output1 SELECT * FROM $source; +INSERT INTO Output2 SELECT * FROM $source;END DO; +USE hahn; +-- Other comment + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "createObject") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '"\nUSE plato;\n$source = SELECT * FROM Input;\nINSERT INTO Output1 SELECT * FROM $source;\nINSERT INTO Output2 SELECT * FROM $source;")))#"); + } + }; + + TWordCountHive elementStat = { {TString("createObject"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObject"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(CreateStreamingQueryWithSettings) { + NYql::TAstParseResult res = SqlToYql(R"sql( +USE plato; +-- Some comment +CREATE STREAMING QUERY MyQuery WITH ( + RUN = TRUE, + RESOURCE_POOL = my_pool +) AS DO BEGIN +USE plato; +$source = SELECT * FROM Input; +INSERT INTO Output1 SELECT * FROM $source; +INSERT INTO Output2 SELECT * FROM $source;END DO; +USE hahn; +-- Other comment + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "createObject") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '"\nUSE plato;\n$source = SELECT * FROM Input;\nINSERT INTO Output1 SELECT * FROM $source;\nINSERT INTO Output2 SELECT * FROM $source;") '('"resource_pool" '"my_pool") '('"run" (Bool '"true")))#"); + } + }; + + TWordCountHive elementStat = { {TString("createObject"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObject"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(CreateOrReplaceStreamingQuery) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + CREATE OR REPLACE STREAMING QUERY MyQuery AS DO BEGIN /* create or replace */ SELECT 42; END DO; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "createObjectOrReplace") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* create or replace */ SELECT 42; ")))#"); + } + }; + + TWordCountHive elementStat = { {TString("createObjectOrReplace"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObjectOrReplace"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(CreateStreamingQueryIfNotExists) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + CREATE STREAMING QUERY IF NOT EXISTS MyQuery AS DO BEGIN /* create if not exists */ SELECT 42; END DO; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "createObjectIfNotExists") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* create if not exists */ SELECT 42; ")))#"); + } + }; + + TWordCountHive elementStat = { {TString("createObjectIfNotExists"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObjectIfNotExists"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(CreateStreamingQueryWithTablePrefix) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + PRAGMA TablePathPrefix='/aba'; + CREATE STREAMING QUERY MyQuery AS DO BEGIN SELECT * FROM hahn.Input; END DO; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "createObject") { + UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyQuery"); + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" SELECT * FROM hahn.Input; ")))#"); + } + }; + + TWordCountHive elementStat = { {TString("createObject"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObject"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(CreateStreamingQueryWithBadArguments) { +#if ANTLR_VER == 3 + ExpectFailWithError(R"sql( + USE plato; + CREATE STREAMING QUERY MyQuery WITH (OPTION = "VALUE"); + )sql" , "<main>:3:66: Error: Unexpected token ';' : syntax error...\n\n"); +#else + ExpectFailWithError(R"sql( + USE plato; + CREATE STREAMING QUERY MyQuery WITH (OPTION = "VALUE"); + )sql" , "<main>:3:66: Error: mismatched input ';' expecting AS\n"); +#endif + + ExpectFailWithError(R"sql( + USE plato; + CREATE STREAMING QUERY MyQuery WITH ( + DUPLICATE_SETTING = "first_value", + DUPLICATE_SETTING = "second_value" + ) AS + DO BEGIN + USE plato; + INSERT INTO Output SELECT * FROM Input; + END DO; + )sql" , "<main>:5:17: Error: Found duplicated parameter: DUPLICATE_SETTING\n"); + + ExpectFailWithError(R"sql( + USE plato; + CREATE STREAMING QUERY MyQuery WITH ( + `__QUERY_TEXT` = "SELECT 42" + ) AS + DO BEGIN + USE plato; + INSERT INTO Output SELECT * FROM Input; + END DO; + )sql" , "<main>:4:17: Error: Streaming query parameter name should not start with prefix '__': __QUERY_TEXT\n"); + + ExpectFailWithError(R"sql( + USE plato; + $named_node = 42; + CREATE STREAMING QUERY MyQuery AS + DO BEGIN + SELECT $named_node; + END DO; + )sql" , "<main>:6:24: Error: Unknown name: $named_node\n"); + } + + Y_UNIT_TEST(AlterStreamingQuerySetQuery) { + NYql::TAstParseResult res = SqlToYql(R"sql( +USE plato; +-- Some comment +ALTER STREAMING QUERY MyQuery AS DO BEGIN +USE plato; +$source = SELECT * FROM Input; +INSERT INTO Output1 SELECT * FROM $source; +INSERT INTO Output2 SELECT * FROM $source;END DO; +USE hahn; +-- Other comment + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "alterObject") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '"\nUSE plato;\n$source = SELECT * FROM Input;\nINSERT INTO Output1 SELECT * FROM $source;\nINSERT INTO Output2 SELECT * FROM $source;")))#"); + } + }; + + TWordCountHive elementStat = { {TString("alterObject"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObject"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(AlterStreamingQuerySetOptions) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + ALTER STREAMING QUERY MyQuery SET ( + WAIT_CHECKPOINT = TRUE, + RESOURCE_POOL = other_pool + ); + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"resource_pool" '"other_pool") '('"wait_checkpoint" (Bool '"true"))))#"); + UNIT_ASSERT_STRING_CONTAINS(line, "alterObject"); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(AlterStreamingQuerySetBothOptionsAndQuery) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + ALTER STREAMING QUERY MyQuery SET ( + WAIT_CHECKPOINT = TRUE, + RESOURCE_POOL = other_pool + ) AS DO BEGIN /* alter */ SELECT 42; END DO; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "alterObject") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* alter */ SELECT 42; ") '('"resource_pool" '"other_pool") '('"wait_checkpoint" (Bool '"true"))))#"); + } + }; + + TWordCountHive elementStat = { {TString("alterObject"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObject"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(AlterStreamingQueryIfExists) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + ALTER STREAMING QUERY IF EXISTS MyQuery AS DO BEGIN /* alter if exists */ SELECT 42; END DO; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "alterObjectIfExists") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* alter if exists */ SELECT 42; ")))#"); + } + }; + + TWordCountHive elementStat = { {TString("alterObjectIfExists"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObjectIfExists"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(AlterStreamingQueryWithTablePrefix) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + PRAGMA TablePathPrefix='/aba'; + ALTER STREAMING QUERY MyQuery AS DO BEGIN SELECT * FROM hahn.Input; END DO; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "alterObject") { + UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyQuery"); + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#"); + } + + if (word == "__query_text") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" SELECT * FROM hahn.Input; ")))#"); + } + }; + + TWordCountHive elementStat = { {TString("alterObject"), 0}, {TString("__query_text"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObject"]); + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]); + } + + Y_UNIT_TEST(AlterStreamingQueryWithBadArguments) { +#if ANTLR_VER == 3 + ExpectFailWithError(R"sql( + USE plato; + ALTER STREAMING QUERY MyQuery; + )sql" , "<main>:3:41: Error: Unexpected token ';' : cannot match to any predicted input...\n\n"); +#else + ExpectFailWithError(R"sql( + USE plato; + ALTER STREAMING QUERY MyQuery; + )sql" , "<main>:3:41: Error: mismatched input ';' expecting {AS, SET}\n"); +#endif + + ExpectFailWithError(R"sql( + USE plato; + ALTER STREAMING QUERY MyQuery SET ( + DUPLICATE_SETTING = "first_value", + DUPLICATE_SETTING = "second_value" + ); + )sql" , "<main>:5:17: Error: Found duplicated parameter: DUPLICATE_SETTING\n"); + + ExpectFailWithError(R"sql( + USE plato; + ALTER STREAMING QUERY MyQuery SET ( + `__QUERY_TEXT` = "SELECT 42" + ); + )sql" , "<main>:4:17: Error: Streaming query parameter name should not start with prefix '__': __QUERY_TEXT\n"); + + ExpectFailWithError(R"sql( + USE plato; + $named_node = 42; + ALTER STREAMING QUERY MyQuery AS + DO BEGIN + SELECT $named_node; + END DO; + )sql" , "<main>:6:24: Error: Unknown name: $named_node\n"); + } + + Y_UNIT_TEST(DropStreamingQueryBasic) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + DROP STREAMING QUERY MyQuery; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features")); + UNIT_ASSERT_STRING_CONTAINS(line, "dropObject"); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(DropStreamingQueryIfExists) { + NYql::TAstParseResult res = SqlToYql(R"sql( + USE plato; + DROP STREAMING QUERY IF EXISTS MyQuery; + )sql"); + UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString()); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features")); + UNIT_ASSERT_STRING_CONTAINS(line, "dropObjectIfExists"); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } +} |
