summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2025-08-28 16:56:17 +0300
committergrigoriypisar <[email protected]>2025-08-28 17:21:58 +0300
commit45c5e8ac55972ded21c846f9e05a3d61d197e3a4 (patch)
tree058b5d184adcc94e57a8262c9dcde768d5589eb7 /yql/essentials/sql
parent9be8744bb96ab00cbf25afb60ab54db5cbad95dc (diff)
added sql syntax for CREATE / ALTER DROP STREAMING QUERY
Добавлен синтаксис (переиспользован код для inline action): ``` CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] query_name [WITH ( key = value ... )] AS DO BEGIN ... END DO; ALTER STREAMING QUERY [IF EXISTS] query_name [SET( key = value ... )] [AS DO BEGIN ... END DO]; DROP STREAMING QUERY [IF EXISTS] query_name; ``` commit_hash:29fa6aa7e61ecf45112480fe3c1df8fab542354e
Diffstat (limited to 'yql/essentials/sql')
-rw-r--r--yql/essentials/sql/v1/SQLv1.g.in33
-rw-r--r--yql/essentials/sql/v1/SQLv1Antlr4.g.in33
-rw-r--r--yql/essentials/sql/v1/complete/sql_complete_ut.cpp3
-rw-r--r--yql/essentials/sql/v1/context.cpp23
-rw-r--r--yql/essentials/sql/v1/context.h2
-rw-r--r--yql/essentials/sql/v1/format/sql_format.cpp43
-rw-r--r--yql/essentials/sql/v1/format/sql_format_ut.h54
-rw-r--r--yql/essentials/sql/v1/node.h10
-rw-r--r--yql/essentials/sql/v1/object_processing.cpp121
-rw-r--r--yql/essentials/sql/v1/object_processing.h116
-rw-r--r--yql/essentials/sql/v1/query.cpp17
-rw-r--r--yql/essentials/sql/v1/sql.cpp3
-rw-r--r--yql/essentials/sql/v1/sql_query.cpp118
-rw-r--r--yql/essentials/sql/v1/sql_translation.cpp182
-rw-r--r--yql/essentials/sql/v1/sql_translation.h6
-rw-r--r--yql/essentials/sql/v1/sql_ut_common.h396
16 files changed, 1054 insertions, 106 deletions
diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in
index 55b650d796a..1d86d4eefe4 100644
--- a/yql/essentials/sql/v1/SQLv1.g.in
+++ b/yql/essentials/sql/v1/SQLv1.g.in
@@ -81,6 +81,9 @@ sql_stmt_core:
| drop_transfer_stmt
| alter_database_stmt
| show_create_table_stmt
+ | create_streaming_query_stmt
+ | alter_streaming_query_stmt
+ | drop_streaming_query_stmt
;
expr:
@@ -613,6 +616,33 @@ alter_external_data_source_action:
drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE (IF EXISTS)? object_ref;
+create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref
+ create_streaming_query_features?
+ streaming_query_definition
+;
+create_streaming_query_features: WITH streaming_query_settings;
+
+alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref
+ alter_streaming_query_action
+;
+alter_streaming_query_action:
+ alter_streaming_query_set_settings
+ | alter_streaming_query_set_settings? streaming_query_definition
+;
+alter_streaming_query_set_settings: SET streaming_query_settings;
+
+streaming_query_settings: LPAREN streaming_query_setting (COMMA streaming_query_setting)* COMMA? RPAREN;
+streaming_query_setting: an_id_or_type EQUALS streaming_query_setting_value;
+streaming_query_setting_value:
+ id_or_type
+ | STRING_VALUE
+ | bool_value
+;
+
+streaming_query_definition: AS DO inline_action;
+
+drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref;
+
create_view_stmt: CREATE VIEW (IF NOT EXISTS)? object_ref
create_object_features?
AS select_stmt
@@ -1509,6 +1539,7 @@ keyword_as_compat:
| SEQUENCE
| SOURCE
| START
+ | STREAMING
| SUBQUERY
| SUBSET
| SYMBOLS
@@ -1739,6 +1770,7 @@ keyword_compat: (
| SEQUENCE
| SOURCE
| START
+ | STREAMING
| SUBQUERY
| SUBSET
| SYMBOLS
@@ -2118,6 +2150,7 @@ SEQUENCE: S E Q U E N C E;
SOURCE: S O U R C E;
START: S T A R T;
STREAM: S T R E A M;
+STREAMING: S T R E A M I N G;
STRUCT: S T R U C T;
SUBQUERY: S U B Q U E R Y;
SUBSET: S U B S E T;
diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
index fa26fd74415..e03f6b556f3 100644
--- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in
+++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
@@ -80,6 +80,9 @@ sql_stmt_core:
| drop_transfer_stmt
| alter_database_stmt
| show_create_table_stmt
+ | create_streaming_query_stmt
+ | alter_streaming_query_stmt
+ | drop_streaming_query_stmt
;
expr:
@@ -612,6 +615,33 @@ alter_external_data_source_action:
drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE (IF EXISTS)? object_ref;
+create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref
+ create_streaming_query_features?
+ streaming_query_definition
+;
+create_streaming_query_features: WITH streaming_query_settings;
+
+alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref
+ alter_streaming_query_action
+;
+alter_streaming_query_action:
+ alter_streaming_query_set_settings
+ | alter_streaming_query_set_settings? streaming_query_definition
+;
+alter_streaming_query_set_settings: SET streaming_query_settings;
+
+streaming_query_settings: LPAREN streaming_query_setting (COMMA streaming_query_setting)* COMMA? RPAREN;
+streaming_query_setting: an_id_or_type EQUALS streaming_query_setting_value;
+streaming_query_setting_value:
+ id_or_type
+ | STRING_VALUE
+ | bool_value
+;
+
+streaming_query_definition: AS DO inline_action;
+
+drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref;
+
create_view_stmt: CREATE VIEW (IF NOT EXISTS)? object_ref
create_object_features?
AS select_stmt
@@ -1509,6 +1539,7 @@ keyword_as_compat:
| SEQUENCE
| SOURCE
| START
+ | STREAMING
| SUBQUERY
| SUBSET
| SYMBOLS
@@ -1739,6 +1770,7 @@ keyword_compat: (
| SEQUENCE
| SOURCE
| START
+ | STREAMING
| SUBQUERY
| SUBSET
| SYMBOLS
@@ -2120,6 +2152,7 @@ SEQUENCE: S E Q U E N C E;
SOURCE: S O U R C E;
START: S T A R T;
STREAM: S T R E A M;
+STREAMING: S T R E A M I N G;
STRUCT: S T R U C T;
SUBQUERY: S U B Q U E R Y;
SUBSET: S U B S E T;
diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
index 52eb2033717..faab7508f85 100644
--- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
+++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
@@ -297,6 +297,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
{Keyword, "OBJECT"},
{Keyword, "RESOURCE POOL"},
{Keyword, "SEQUENCE"},
+ {Keyword, "STREAMING QUERY"},
{Keyword, "TABLE"},
{Keyword, "TABLESTORE"},
{Keyword, "TOPIC"},
@@ -317,6 +318,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
{Keyword, "OBJECT"},
{Keyword, "OR REPLACE"},
{Keyword, "RESOURCE POOL"},
+ {Keyword, "STREAMING QUERY"},
{Keyword, "TABLE"},
{Keyword, "TABLESTORE"},
{Keyword, "TEMP TABLE"},
@@ -371,6 +373,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
{Keyword, "GROUP"},
{Keyword, "OBJECT"},
{Keyword, "RESOURCE POOL"},
+ {Keyword, "STREAMING QUERY"},
{Keyword, "TABLE"},
{Keyword, "TABLESTORE"},
{Keyword, "TOPIC"},
diff --git a/yql/essentials/sql/v1/context.cpp b/yql/essentials/sql/v1/context.cpp
index b95a559cac5..8d412ba2477 100644
--- a/yql/essentials/sql/v1/context.cpp
+++ b/yql/essentials/sql/v1/context.cpp
@@ -121,15 +121,8 @@ TContext::TContext(const TLexers& lexers, const TParsers& parsers,
Libraries.emplace(lib, TLibraryStuff());
}
- Scoped = MakeIntrusive<TScopedState>();
+ Scoped = CreateScopedState();
AllScopes.push_back(Scoped);
- Scoped->UnicodeLiterals = settings.UnicodeLiterals;
- if (settings.DefaultCluster) {
- Scoped->CurrCluster = TDeferredAtom({}, settings.DefaultCluster);
- auto provider = GetClusterProvider(settings.DefaultCluster);
- YQL_ENSURE(provider);
- Scoped->CurrService = *provider;
- }
Position_.File = settings.File;
@@ -491,6 +484,20 @@ bool TContext::UseUnordered(const TTableRef& table) const {
return YtProviderName == table.Service;
}
+TScopedStatePtr TContext::CreateScopedState() const {
+ auto state = MakeIntrusive<TScopedState>();
+ state->UnicodeLiterals = Settings.UnicodeLiterals;
+
+ if (Settings.DefaultCluster) {
+ state->CurrCluster = TDeferredAtom({}, Settings.DefaultCluster);
+
+ const auto provider = GetClusterProvider(Settings.DefaultCluster);
+ YQL_ENSURE(provider);
+ state->CurrService = *provider;
+ }
+
+ return state;
+}
TMaybe<EColumnRefState> GetFunctionArgColumnStatus(TContext& ctx, const TString& module, const TString& func, size_t argIndex) {
static const TSet<TStringBuf> denyForAllArgs = {
diff --git a/yql/essentials/sql/v1/context.h b/yql/essentials/sql/v1/context.h
index 59ef9353915..aa30982c35c 100644
--- a/yql/essentials/sql/v1/context.h
+++ b/yql/essentials/sql/v1/context.h
@@ -245,6 +245,8 @@ namespace NSQLTranslationV1 {
TVector<NSQLTranslation::TSQLHint> PullHintForToken(NYql::TPosition tokenPos);
void WarnUnusedHints();
+ TScopedStatePtr CreateScopedState() const;
+
private:
IOutputStream& MakeIssue(NYql::ESeverity severity, NYql::TIssueCode code, NYql::TPosition pos);
diff --git a/yql/essentials/sql/v1/format/sql_format.cpp b/yql/essentials/sql/v1/format/sql_format.cpp
index e45adc280d8..845a5091986 100644
--- a/yql/essentials/sql/v1/format/sql_format.cpp
+++ b/yql/essentials/sql/v1/format/sql_format.cpp
@@ -696,7 +696,10 @@ private:
case TRule_sql_stmt_core::kAltSqlStmtCore14: // export
case TRule_sql_stmt_core::kAltSqlStmtCore32: // drop external data source
case TRule_sql_stmt_core::kAltSqlStmtCore34: // drop replication
+ case TRule_sql_stmt_core::kAltSqlStmtCore47: // drop resource pool
+ case TRule_sql_stmt_core::kAltSqlStmtCore54: // drop resource pool classifier
case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer
+ case TRule_sql_stmt_core::kAltSqlStmtCore65: // drop streaming query
return true;
case TRule_sql_stmt_core::kAltSqlStmtCore3: { // named nodes
const auto& stmt = msg.GetAlt_sql_stmt_core3().GetRule_named_nodes_stmt1();
@@ -1605,6 +1608,42 @@ private:
VisitAllFields(TRule_drop_resource_pool_classifier_stmt::GetDescriptor(), msg);
}
+ void VisitStreamingQuerySettings(const TRule_streaming_query_settings& msg) {
+ VisitKeyword(msg.GetToken1());
+ NewLine();
+ PushCurrentIndent();
+
+ Visit(msg.GetRule_streaming_query_setting2());
+ for (const auto& setting : msg.GetBlock3()) {
+ Visit(setting.GetToken1());
+ NewLine();
+ Visit(setting.GetRule_streaming_query_setting2());
+ }
+
+ if (msg.HasBlock4()) {
+ TokenIndex_++;
+ }
+
+ PopCurrentIndent();
+ NewLine();
+ VisitKeyword(msg.GetToken5());
+ }
+
+ void VisitCreateStreamingQuery(const TRule_create_streaming_query_stmt& msg) {
+ NewLine();
+ VisitAllFields(TRule_create_streaming_query_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitAlterStreamingQuery(const TRule_alter_streaming_query_stmt& msg) {
+ NewLine();
+ VisitAllFields(TRule_alter_streaming_query_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitDropStreamingQuery(const TRule_drop_streaming_query_stmt& msg) {
+ NewLine();
+ VisitAllFields(TRule_drop_streaming_query_stmt::GetDescriptor(), msg);
+ }
+
void VisitAllFields(const NProtoBuf::Descriptor* descr, const NProtoBuf::Message& msg) {
VisitAllFieldsImpl<TPrettyVisitor, &TPrettyVisitor::Visit>(this, descr, msg);
}
@@ -3097,6 +3136,10 @@ TStaticData::TStaticData()
{TRule_alter_sequence_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterSequence)},
{TRule_alter_database_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterDatabase)},
{TRule_show_create_table_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitShowCreateTable)},
+ {TRule_streaming_query_settings::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitStreamingQuerySettings)},
+ {TRule_create_streaming_query_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateStreamingQuery)},
+ {TRule_alter_streaming_query_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterStreamingQuery)},
+ {TRule_drop_streaming_query_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropStreamingQuery)},
})
, ObfuscatingVisitDispatch({
{TToken::GetDescriptor(), MakeObfuscatingFunctor(&TObfuscatingVisitor::VisitToken)},
diff --git a/yql/essentials/sql/v1/format/sql_format_ut.h b/yql/essentials/sql/v1/format/sql_format_ut.h
index 0e2924d1272..8c710ed78dd 100644
--- a/yql/essentials/sql/v1/format/sql_format_ut.h
+++ b/yql/essentials/sql/v1/format/sql_format_ut.h
@@ -1920,3 +1920,57 @@ Y_UNIT_TEST(ValueConstructor) {
TSetup setup;
setup.Run(cases);
}
+
+Y_UNIT_TEST(CreateStreamingQuery) {
+ TCases cases = {{
+ "creAte sTReaMing qUErY TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "CREATE STREAMING QUERY TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }, {
+ "creAte sTReaMing qUErY If Not ExIsTs TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "CREATE STREAMING QUERY IF NOT EXISTS TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }, {
+ "creAte oR ReplAce sTReaMing qUErY TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "CREATE OR REPLACE STREAMING QUERY TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }, {
+ "creAte sTReaMing qUErY TheQuery wiTh (option = tRuE) As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "CREATE STREAMING QUERY TheQuery WITH (\n\toption = TRUE\n) AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+}
+
+Y_UNIT_TEST(AlterStreamingQuery) {
+ TCases cases = {{
+ "aLTer sTReaMing qUErY TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "ALTER STREAMING QUERY TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }, {
+ "aLTer sTReaMing qUErY If ExIsTs TheQuery As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "ALTER STREAMING QUERY IF EXISTS TheQuery AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }, {
+ "aLTer sTReaMing qUErY TheQuery sEt (option = tRuE)",
+ "ALTER STREAMING QUERY TheQuery SET (\n\toption = TRUE\n);\n"
+ }, {
+ "aLTer sTReaMing qUErY TheQuery sEt (option = tRuE) As dO BeGin ;;\n\nInSeRT iNTo TheTable SELect 1;; eNd Do",
+ "ALTER STREAMING QUERY TheQuery SET (\n\toption = TRUE\n) AS DO BEGIN\nINSERT INTO TheTable\nSELECT\n\t1\n;\nEND DO;\n"
+ }
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+}
+
+Y_UNIT_TEST(DropStreamingQuery) {
+ TCases cases = {{
+ "dRop sTReaMing qUErY TheQuery",
+ "DROP STREAMING QUERY TheQuery;\n"
+ } , {
+ "dRop sTReaMing qUErY If ExIsTs TheQuery",
+ "DROP STREAMING QUERY IF EXISTS TheQuery;\n"
+ }
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+}
diff --git a/yql/essentials/sql/v1/node.h b/yql/essentials/sql/v1/node.h
index 10c2afcabe5..3d0b79f18aa 100644
--- a/yql/essentials/sql/v1/node.h
+++ b/yql/essentials/sql/v1/node.h
@@ -1452,6 +1452,14 @@ namespace NSQLTranslationV1 {
TString At;
};
+ struct TStreamingQuerySettings {
+ inline static constexpr char RESERVED_FEATURE_PREFIX[] = "__";
+ inline static constexpr char QUERY_TEXT_FEATURE[] = "__query_text";
+ inline static constexpr char QUERY_AST_FEATURE[] = "__query_ast";
+
+ std::map<TString, TDeferredAtom> Features;
+ };
+
TString IdContent(TContext& ctx, const TString& str);
TString IdContentFromString(TContext& ctx, const TString& str);
TTableHints GetContextHints(TContext& ctx);
@@ -1567,7 +1575,7 @@ namespace NSQLTranslationV1 {
TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
- std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context);
+ bool missingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context);
TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
bool missingOk, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context);
TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id,
diff --git a/yql/essentials/sql/v1/object_processing.cpp b/yql/essentials/sql/v1/object_processing.cpp
index 64b46d612d4..f9028b01b84 100644
--- a/yql/essentials/sql/v1/object_processing.cpp
+++ b/yql/essentials/sql/v1/object_processing.cpp
@@ -2,9 +2,30 @@
#include <yql/essentials/core/sql_types/yql_callable_names.h>
-namespace NSQLTranslationV1 {
using namespace NYql;
+namespace NSQLTranslationV1 {
+
+namespace {
+
+bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) {
+ for (const auto& [key, value] : features) {
+ if (value.HasNode() && !value.Build()->Init(ctx, src)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+} // anonymous namespace
+
+TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped)
+ : Scoped_(scoped)
+ , ServiceId(Scoped_->CurrService)
+ , Cluster(Scoped_->CurrCluster)
+{}
+
INode::TPtr TObjectProcessorImpl::BuildKeys() const {
auto keys = Y("Key");
keys = L(keys, Q(Y(Q("objectId"), Y("String", BuildQuotedAtom(Pos_, ObjectId_)))));
@@ -17,12 +38,9 @@ TObjectProcessorImpl::TObjectProcessorImpl(TPosition pos, const TString& objectI
, TObjectOperatorContext(context)
, ObjectId_(objectId)
, TypeId_(typeId)
-{
-
-}
+{}
bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) {
- Y_UNUSED(src);
Scoped_->UseCluster(ServiceId, Cluster);
auto options = FillFeatures(BuildOptions());
auto keys = BuildKeys();
@@ -35,7 +53,17 @@ bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) {
return TAstListNode::DoInit(ctx, src);
}
-INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const {
+TObjectProcessorImpl::TPtr TObjectProcessorImpl::DoClone() const {
+ return {};
+}
+
+TObjectProcessorWithFeatures::TObjectProcessorWithFeatures(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features)
+ : TBase(pos, objectId, typeId, context)
+ , Features_(std::move(features))
+{}
+
+INode::TPtr TObjectProcessorWithFeatures::FillFeatures(INode::TPtr options) const {
if (!Features_.empty()) {
auto features = Y();
for (auto&& i : Features_) {
@@ -47,42 +75,75 @@ INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const {
}
options->Add(Q(Y(Q("features"), Q(features))));
}
- if (!FeaturesToReset_.empty()) {
- auto reset = Y();
- for (const auto& featureName : FeaturesToReset_) {
- reset->Add(BuildQuotedAtom(Pos_, featureName));
- }
- options->Add(Q(Y(Q("resetFeatures"), Q(reset))));
- }
+
return options;
}
-namespace {
+bool TObjectProcessorWithFeatures::DoInit(TContext& ctx, ISource* src) {
+ if (!InitFeatures(ctx, src, Features_)) {
+ return false;
+ }
-bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) {
- for (auto& [key, value] : features) {
- if (value.HasNode() && !value.Build()->Init(ctx, src)) {
- return false;
- }
+ return TObjectProcessorImpl::DoInit(ctx, src);
+}
+
+INode::TPtr TCreateObject::BuildOptions() const {
+ TString mode;
+
+ if (ExistingOk_) {
+ mode = "createObjectIfNotExists";
+ } else if (ReplaceIfExists_) {
+ mode = "createObjectOrReplace";
+ } else {
+ mode = "createObject";
}
- return true;
+
+ return Y(Q(Y(Q("mode"), Q(mode))));
}
+TCreateObject::TCreateObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, bool existingOk, bool replaceIfExists)
+ : TBase(pos, objectId, typeId, context, std::move(features))
+ , ExistingOk_(existingOk)
+ , ReplaceIfExists_(replaceIfExists)
+{}
+
+INode::TPtr TUpsertObject::BuildOptions() const {
+ return Y(Q(Y(Q("mode"), Q("upsertObject"))));
}
-bool TCreateObject::DoInit(TContext& ctx, ISource* src) {
- if (!InitFeatures(ctx, src, Features_)) {
- return false;
+TAlterObject::TAlterObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, std::set<TString>&& featuresToReset, bool missingOk)
+ : TBase(pos, objectId, typeId, context, std::move(features))
+ , FeaturesToReset_(std::move(featuresToReset))
+ , MissingOk_(missingOk)
+{}
+
+INode::TPtr TAlterObject::FillFeatures(INode::TPtr options) const {
+ options = TBase::FillFeatures(options);
+
+ if (!FeaturesToReset_.empty()) {
+ auto reset = Y();
+ for (const auto& featureName : FeaturesToReset_) {
+ reset->Add(BuildQuotedAtom(Pos_, featureName));
+ }
+ options->Add(Q(Y(Q("resetFeatures"), Q(reset))));
}
- return TObjectProcessorImpl::DoInit(ctx, src);
+ return options;
}
-TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped)
- : Scoped_(scoped)
- , ServiceId(Scoped_->CurrService)
- , Cluster(Scoped_->CurrCluster)
-{
-
+INode::TPtr TAlterObject::BuildOptions() const {
+ return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "alterObjectIfExists" : "alterObject"))));
}
+TDropObject::TDropObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, bool missingOk)
+ : TBase(pos, objectId, typeId, context, std::move(features))
+ , MissingOk_(missingOk)
+{}
+
+INode::TPtr TDropObject::BuildOptions() const {
+ return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "dropObjectIfExists" : "dropObject"))));
}
+
+} // NSQLTranslationV1
diff --git a/yql/essentials/sql/v1/object_processing.h b/yql/essentials/sql/v1/object_processing.h
index df3f6d13975..e3ab60a4ab9 100644
--- a/yql/essentials/sql/v1/object_processing.h
+++ b/yql/essentials/sql/v1/object_processing.h
@@ -1,4 +1,5 @@
#pragma once
+
#include "node.h"
#include "context.h"
@@ -7,100 +8,107 @@ namespace NSQLTranslationV1 {
class TObjectOperatorContext {
protected:
TScopedStatePtr Scoped_;
+
public:
TString ServiceId;
TDeferredAtom Cluster;
+
TObjectOperatorContext(const TObjectOperatorContext& baseItem) = default;
+
TObjectOperatorContext(TScopedStatePtr scoped);
};
class TObjectProcessorImpl: public TAstListNode, public TObjectOperatorContext {
-protected:
using TBase = TAstListNode;
+
TString ObjectId_;
TString TypeId_;
+ INode::TPtr BuildKeys() const;
+
+protected:
virtual INode::TPtr BuildOptions() const = 0;
+
virtual INode::TPtr FillFeatures(INode::TPtr options) const = 0;
- INode::TPtr BuildKeys() const;
+
public:
TObjectProcessorImpl(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context);
bool DoInit(TContext& ctx, ISource* src) override;
- TPtr DoClone() const final {
- return {};
- }
+ TPtr DoClone() const final;
};
-class TCreateObject: public TObjectProcessorImpl {
+class TObjectProcessorWithFeatures: public TObjectProcessorImpl {
+protected:
+ using TFeatureMap = std::map<TString, TDeferredAtom>;
+
private:
using TBase = TObjectProcessorImpl;
- std::map<TString, TDeferredAtom> Features_;
- std::set<TString> FeaturesToReset_;
+
+ TFeatureMap Features_;
+
protected:
+ INode::TPtr FillFeatures(INode::TPtr options) const override;
+
+public:
+ bool DoInit(TContext& ctx, ISource* src) override;
+
+ TObjectProcessorWithFeatures(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features);
+};
+
+class TCreateObject final: public TObjectProcessorWithFeatures {
+ using TBase = TObjectProcessorWithFeatures;
+
bool ExistingOk_ = false;
bool ReplaceIfExists_ = false;
+
protected:
- virtual INode::TPtr BuildOptions() const override {
- TString mode;
- if (ExistingOk_) {
- mode = "createObjectIfNotExists";
- } else if (ReplaceIfExists_) {
- mode = "createObjectOrReplace";
- } else {
- mode = "createObject";
- }
-
- return Y(Q(Y(Q("mode"), Q(mode))));
- }
- virtual INode::TPtr FillFeatures(INode::TPtr options) const override;
- bool DoInit(TContext& ctx, ISource* src) override;
+ INode::TPtr BuildOptions() const final;
+
public:
- TCreateObject(TPosition pos, const TString& objectId,
- const TString& typeId, bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
- : TBase(pos, objectId, typeId, context)
- , Features_(std::move(features))
- , FeaturesToReset_(std::move(featuresToReset))
- , ExistingOk_(existingOk)
- , ReplaceIfExists_(replaceIfExists) {
- }
+ TCreateObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, bool existingOk, bool replaceIfExists);
};
-class TUpsertObject final: public TCreateObject {
-private:
- using TBase = TCreateObject;
+class TUpsertObject final: public TObjectProcessorWithFeatures {
+ using TBase = TObjectProcessorWithFeatures;
+
protected:
- virtual INode::TPtr BuildOptions() const override {
- return Y(Q(Y(Q("mode"), Q("upsertObject"))));
- }
+ INode::TPtr BuildOptions() const final;
+
public:
using TBase::TBase;
};
-class TAlterObject final: public TCreateObject {
-private:
- using TBase = TCreateObject;
+class TAlterObject final: public TObjectProcessorWithFeatures {
+ using TBase = TObjectProcessorWithFeatures;
+
+ std::set<TString> FeaturesToReset_;
+ bool MissingOk_ = false;
+
protected:
- virtual INode::TPtr BuildOptions() const override {
- return Y(Q(Y(Q("mode"), Q("alterObject"))));
- }
+ INode::TPtr BuildOptions() const final;
+
+ INode::TPtr FillFeatures(INode::TPtr options) const final;
+
public:
- using TBase::TBase;
+ TAlterObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, std::set<TString>&& featuresToReset, bool missingOk);
};
-class TDropObject final: public TCreateObject {
-private:
- using TBase = TCreateObject;
- bool MissingOk() const {
- return ExistingOk_; // Because we were derived from TCreateObject
- }
+class TDropObject final: public TObjectProcessorWithFeatures {
+ using TBase = TObjectProcessorWithFeatures;
+
+ bool MissingOk_ = false;
+
protected:
- virtual INode::TPtr BuildOptions() const override {
- return Y(Q(Y(Q("mode"), Q(MissingOk() ? "dropObjectIfExists" : "dropObject"))));
- }
+ INode::TPtr BuildOptions() const final;
+
public:
- using TBase::TBase;
+ TDropObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, bool missingOk);
};
-}
+} // NSQLTranslationV1
diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp
index 5937ab680e0..ab863cc32ff 100644
--- a/yql/essentials/sql/v1/query.cpp
+++ b/yql/essentials/sql/v1/query.cpp
@@ -2475,21 +2475,22 @@ private:
TNodePtr BuildUpsertObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context) {
- return new TUpsertObject(pos, objectId, typeId, false, false, std::move(features), std::set<TString>(), context);
+ return new TUpsertObject(pos, objectId, typeId, context, std::move(features));
}
+
TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context) {
- return new TCreateObject(pos, objectId, typeId, existingOk, replaceIfExists, std::move(features), std::set<TString>(), context);
+ return new TCreateObject(pos, objectId, typeId, context, std::move(features), existingOk, replaceIfExists);
}
+
TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
- std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
-{
- return new TAlterObject(pos, secretId, typeId, false, false, std::move(features), std::move(featuresToReset), context);
+ bool missingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context) {
+ return new TAlterObject(pos, secretId, typeId, context, std::move(features), std::move(featuresToReset), missingOk);
}
+
TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
- bool missingOk, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context)
-{
- return new TDropObject(pos, secretId, typeId, missingOk, false, std::move(options), std::set<TString>(), context);
+ bool missingOk, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context) {
+ return new TDropObject(pos, secretId, typeId, context, std::move(options), missingOk);
}
TNodePtr BuildDropRoles(TPosition pos, const TString& service, const TDeferredAtom& cluster, const TVector<TDeferredAtom>& toDrop, bool isUser, bool missingOk, TScopedStatePtr scoped) {
diff --git a/yql/essentials/sql/v1/sql.cpp b/yql/essentials/sql/v1/sql.cpp
index 3d8951cb83e..7f60d87d6a1 100644
--- a/yql/essentials/sql/v1/sql.cpp
+++ b/yql/essentials/sql/v1/sql.cpp
@@ -184,6 +184,9 @@ bool NeedUseForAllStatements(const TRule_sql_stmt_core::AltCase& subquery) {
case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer
case TRule_sql_stmt_core::kAltSqlStmtCore61: // alter database
case TRule_sql_stmt_core::kAltSqlStmtCore62: // show create table
+ case TRule_sql_stmt_core::kAltSqlStmtCore63: // create streaming query
+ case TRule_sql_stmt_core::kAltSqlStmtCore64: // alter streaming query
+ case TRule_sql_stmt_core::kAltSqlStmtCore65: // drop streaming query
return false;
}
}
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp
index d48fb6ff023..b84e2df0add 100644
--- a/yql/essentials/sql/v1/sql_query.cpp
+++ b/yql/essentials/sql/v1/sql_query.cpp
@@ -954,7 +954,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore29: {
@@ -1053,7 +1053,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", std::move(kv), std::move(toReset), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", false, std::move(kv), std::move(toReset), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore32: {
@@ -1323,7 +1323,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore41:
@@ -1486,7 +1486,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", std::move(kv), std::move(toReset), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", false, std::move(kv), std::move(toReset), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore47: {
@@ -1722,7 +1722,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", std::move(kv), std::move(toReset), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", false, std::move(kv), std::move(toReset), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore54: {
@@ -2001,6 +2001,114 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildShowCreate(Ctx_.Pos(), tr, type, Ctx_.Scoped));
break;
}
+ case TRule_sql_stmt_core::kAltSqlStmtCore63: {
+ // create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref
+ // (WITH streaming_query_settings)?
+ // streaming_query_definition;
+
+ const auto& node = core.GetAlt_sql_stmt_core63().GetRule_create_streaming_query_stmt1();
+ Ctx_.Token(node.GetToken1());
+
+ // object_ref
+ TObjectOperatorContext context(Ctx_.Scoped);
+ const auto& objectPath = ParseObjectPath(node.GetRule_object_ref6(), context);
+ if (!objectPath) {
+ return false;
+ }
+
+ // OR REPLACE
+ const bool replaceIfExists = node.HasBlock2();
+ if (replaceIfExists) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken1().GetId(), OR) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken2().GetId(), REPLACE)
+ );
+ }
+
+ // IF NOT EXISTS
+ const bool existingOk = node.HasBlock5();
+ if (existingOk) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken1().GetId(), IF) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken2().GetId(), NOT) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken3().GetId(), EXISTS)
+ );
+ }
+
+ // WITH streaming_query_settings
+ TStreamingQuerySettings settings;
+ if (node.HasBlock7()) {
+ const auto& features = node.GetBlock7().GetRule_create_streaming_query_features1();
+ Ctx_.Token(features.GetToken1());
+
+ if (!ParseStreamingQuerySettings(features.GetRule_streaming_query_settings2(), settings)) {
+ return false;
+ }
+ }
+
+ // streaming_query_definition
+ if (!ParseStreamingQueryDefinition(node.GetRule_streaming_query_definition8(), settings)) {
+ return false;
+ }
+
+ AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", existingOk, replaceIfExists, std::move(settings.Features), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore64: {
+ // alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref
+ // alter_streaming_query_action;
+
+ const auto& node = core.GetAlt_sql_stmt_core64().GetRule_alter_streaming_query_stmt1();
+
+ // object_ref
+ TObjectOperatorContext context(Ctx_.Scoped);
+ const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context);
+ if (!objectPath) {
+ return false;
+ }
+
+ // IF EXISTS
+ const bool missingOk = node.HasBlock4();
+ if (missingOk) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS)
+ );
+ }
+
+ // alter_streaming_query_action
+ TStreamingQuerySettings settings;
+ if (!ParseAlterStreamingQueryAction(node.GetRule_alter_streaming_query_action6(), settings)) {
+ return false;
+ }
+
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, std::move(settings.Features), {}, context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore65: {
+ // drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref;
+
+ const auto& node = core.GetAlt_sql_stmt_core65().GetRule_drop_streaming_query_stmt1();
+
+ // object_ref
+ TObjectOperatorContext context(Ctx_.Scoped);
+ const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context);
+ if (!objectPath) {
+ return false;
+ }
+
+ // IF EXISTS
+ const bool missingOk = node.HasBlock4();
+ if (missingOk) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS)
+ );
+ }
+
+ AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, {}, context));
+ break;
+ }
case TRule_sql_stmt_core::ALT_NOT_SET:
Ctx_.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName);
AltNotImplemented("sql_stmt_core", core);
diff --git a/yql/essentials/sql/v1/sql_translation.cpp b/yql/essentials/sql/v1/sql_translation.cpp
index d6a420949c3..7d1dc51c20f 100644
--- a/yql/essentials/sql/v1/sql_translation.cpp
+++ b/yql/essentials/sql/v1/sql_translation.cpp
@@ -4,11 +4,13 @@
#include "sql_query.h"
#include "sql_values.h"
#include "sql_select.h"
+#include "object_processing.h"
#include "source.h"
#include "antlr_token.h"
#include <yql/essentials/sql/settings/partitioning.h>
#include <yql/essentials/sql/v1/proto_parser/proto_parser.h>
+#include <yql/essentials/utils/yql_paths.h>
#include <util/generic/scope.h>
#include <util/string/join.h>
@@ -5541,4 +5543,184 @@ bool TSqlTranslation::ParseResourcePoolClassifierSettings(std::map<TString, TDef
}
}
+TMaybe<TString> TSqlTranslation::ParseObjectPath(const TRule_object_ref& node, TObjectOperatorContext& context) {
+ // object_ref: (cluster_expr .)? id_or_at
+
+ if (node.HasBlock1()) {
+ if (!ClusterExpr(node.GetBlock1().GetRule_cluster_expr1(), false, context.ServiceId, context.Cluster)) {
+ return Nothing();
+ }
+ }
+
+ const auto& [hasAt, objectId] = Id(node.GetRule_id_or_at2(), *this);
+ if (hasAt) {
+ Error() << "'@' is not allowed prefix for object name";
+ return Nothing();
+ }
+
+ return BuildTablePath(Ctx_.GetPrefixPath(context.ServiceId, context.Cluster), objectId);
+}
+
+bool TSqlTranslation::ParseStreamingQuerySetting(const TRule_streaming_query_setting& node, TStreamingQuerySettings& settings) {
+ // streaming_query_setting: an_id_or_type = (id_or_type | STRING_VALUE | bool_value)
+
+ const auto& id = to_lower(Id(node.GetRule_an_id_or_type1(), *this));
+ if (id.StartsWith(TStreamingQuerySettings::RESERVED_FEATURE_PREFIX)) {
+ Error() << "Streaming query parameter name should not start with prefix '" << TStreamingQuerySettings::RESERVED_FEATURE_PREFIX << "': " << to_upper(id);
+ return false;
+ }
+
+ const auto [it, inserted] = settings.Features.emplace(id, TDeferredAtom{});
+ if (!inserted) {
+ Error() << "Found duplicated parameter: " << to_upper(id);
+ return false;
+ }
+
+ const auto& valueNode = node.GetRule_streaming_query_setting_value3();
+ switch (valueNode.GetAltCase()) {
+ case TRule_streaming_query_setting_value::kAltStreamingQuerySettingValue1: {
+ it->second = TDeferredAtom(Ctx_.Pos(), Id(valueNode.GetAlt_streaming_query_setting_value1().GetRule_id_or_type1(), *this));
+ break;
+ }
+ case TRule_streaming_query_setting_value::kAltStreamingQuerySettingValue2: {
+ const auto& strToken = Ctx_.Token(valueNode.GetAlt_streaming_query_setting_value2().GetToken1());
+ const auto& strValue = StringContent(Ctx_, Ctx_.Pos(), strToken);
+ if (!strValue) {
+ Error() << "Cannot parse string correctly: " << strToken;
+ return false;
+ }
+
+ it->second = TDeferredAtom(Ctx_.Pos(), strValue->Content);
+ break;
+ }
+ case TRule_streaming_query_setting_value::kAltStreamingQuerySettingValue3: {
+ it->second = TDeferredAtom(BuildLiteralBool(
+ Ctx_.Pos(),
+ FromString<bool>(Ctx_.Token(valueNode.GetAlt_streaming_query_setting_value3().GetRule_bool_value1().GetToken1()))
+ ), Ctx_);
+ break;
+ }
+ case TRule_streaming_query_setting_value::ALT_NOT_SET: {
+ Y_ABORT("You should change implementation according to grammar changes");
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool TSqlTranslation::ParseStreamingQuerySettings(const TRule_streaming_query_settings& node, TStreamingQuerySettings& settings) {
+ // streaming_query_settings: (
+ // streaming_query_setting
+ // (, streaming_query_setting)* ,?
+ // )
+
+ if (!ParseStreamingQuerySetting(node.GetRule_streaming_query_setting2(), settings)) {
+ return false;
+ }
+
+ for (const auto& setting : node.GetBlock3()) {
+ if (!ParseStreamingQuerySetting(setting.GetRule_streaming_query_setting2(), settings)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool TSqlTranslation::ParseStreamingQueryDefinition(const TRule_streaming_query_definition& node, TStreamingQuerySettings& settings) {
+ // streaming_query_definition: AS DO (BEGIN define_action_or_subquery_body END DO);
+
+ Ctx_.Token(node.GetToken1());
+
+ // Save query ast to perform type check and validation of allowed expressions
+
+ const auto saveScoped = Ctx_.Scoped;
+ Ctx_.Scoped = Ctx_.CreateScopedState(); // Reset scoped context to interrupt inheritance of global settings and named nodes
+ Ctx_.AllScopes.push_back(Ctx_.Scoped);
+ Ctx_.Scoped->Local = TScopedState::TLocal{};
+ Ctx_.ScopeLevel++;
+ TSqlQuery query(Ctx_, Ctx_.Settings.Mode, false);
+ TBlocks innerBlocks;
+
+ const auto& inlineAction = node.GetRule_inline_action3();
+ const bool hasValidBody = DefineActionOrSubqueryBody(query, innerBlocks, inlineAction.GetRule_define_action_or_subquery_body2());
+ auto queryNode = hasValidBody ? BuildQuery(Ctx_.Pos(), innerBlocks, false, Ctx_.Scoped, Ctx_.SeqMode) : nullptr;
+ WarnUnusedNodes();
+ Ctx_.ScopeLevel--;
+ Ctx_.Scoped = saveScoped;
+
+ if (!queryNode) {
+ return false;
+ }
+
+ TNodePtr blockNode = new TAstListNodeImpl(Ctx_.Pos());
+ blockNode->Add("block");
+ blockNode->Add(blockNode->Q(queryNode));
+ settings.Features[TStreamingQuerySettings::QUERY_AST_FEATURE] = TDeferredAtom(blockNode, Ctx_);
+
+ // Extract whole query text between BEGIN and END tokens
+
+ const auto& queryBegin = inlineAction.GetToken1();
+ Y_DEBUG_ABORT_UNLESS(IS_TOKEN(Ctx_.Settings.Antlr4Parser, queryBegin.GetId(), BEGIN));
+
+ const auto& queryEnd = inlineAction.GetToken3();
+ Y_DEBUG_ABORT_UNLESS(IS_TOKEN(Ctx_.Settings.Antlr4Parser, queryEnd.GetId(), END));
+
+ auto beginPos = GetQueryPosition(Ctx_.Query, queryBegin, Ctx_.Settings.Antlr4Parser);
+ const auto endPos = GetQueryPosition(Ctx_.Query, queryEnd, Ctx_.Settings.Antlr4Parser);
+ if (beginPos == std::string::npos || endPos == std::string::npos) {
+ Error() << "Failed to parse streaming query definition";
+ return false;
+ }
+
+ beginPos += queryBegin.value().size();
+ settings.Features[TStreamingQuerySettings::QUERY_TEXT_FEATURE] = TDeferredAtom(Ctx_.Pos(), Ctx_.Query.substr(beginPos, endPos - beginPos));
+
+ return true;
+}
+
+bool TSqlTranslation::ParseAlterStreamingQueryAction(const TRule_alter_streaming_query_action& node, TStreamingQuerySettings& settings) {
+ // alter_streaming_query_action:
+ // (SET streaming_query_settings)
+ // | (SET streaming_query_settings)? streaming_query_definition
+
+ switch (node.GetAltCase()) {
+ case TRule_alter_streaming_query_action::kAltAlterStreamingQueryAction1: {
+ const auto& alterSettingsNode = node.GetAlt_alter_streaming_query_action1().GetRule_alter_streaming_query_set_settings1();
+ Ctx_.Token(alterSettingsNode.GetToken1());
+
+ if (!ParseStreamingQuerySettings(alterSettingsNode.GetRule_streaming_query_settings2(), settings)) {
+ return false;
+ }
+
+ break;
+ }
+ case TRule_alter_streaming_query_action::kAltAlterStreamingQueryAction2: {
+ const auto& action = node.GetAlt_alter_streaming_query_action2();
+
+ if (action.HasBlock1()) {
+ const auto& alterSettingsNode = action.GetBlock1().GetRule_alter_streaming_query_set_settings1();
+ Ctx_.Token(alterSettingsNode.GetToken1());
+
+ if (!ParseStreamingQuerySettings(alterSettingsNode.GetRule_streaming_query_settings2(), settings)) {
+ return false;
+ }
+ }
+
+ if (!ParseStreamingQueryDefinition(action.GetRule_streaming_query_definition2(), settings)) {
+ return false;
+ }
+
+ break;
+ }
+ case TRule_alter_streaming_query_action::ALT_NOT_SET: {
+ Y_ABORT("You should change implementation according to grammar changes");
+ return false;
+ }
+ }
+
+ return true;
+}
+
} // namespace NSQLTranslationV1
diff --git a/yql/essentials/sql/v1/sql_translation.h b/yql/essentials/sql/v1/sql_translation.h
index 619f4276be8..c17a2d43a48 100644
--- a/yql/essentials/sql/v1/sql_translation.h
+++ b/yql/essentials/sql/v1/sql_translation.h
@@ -270,6 +270,12 @@ protected:
bool ParseDatabaseSettings(const TRule_database_settings& in, THashMap<TString, TNodePtr>& out);
bool ParseDatabaseSetting(const TRule_database_setting& in, THashMap<TString, TNodePtr>& out);
+ TMaybe<TString> ParseObjectPath(const TRule_object_ref& node, TObjectOperatorContext& context);
+ bool ParseStreamingQuerySetting(const TRule_streaming_query_setting& node, TStreamingQuerySettings& settings);
+ bool ParseStreamingQuerySettings(const TRule_streaming_query_settings& node, TStreamingQuerySettings& settings);
+ bool ParseStreamingQueryDefinition(const TRule_streaming_query_definition& node, TStreamingQuerySettings& settings);
+ bool ParseAlterStreamingQueryAction(const TRule_alter_streaming_query_action& node, TStreamingQuerySettings& settings);
+
bool ValidateAuthMethod(const std::map<TString, TDeferredAtom>& result);
bool ValidateExternalTable(const TCreateTableParameters& params);
diff --git a/yql/essentials/sql/v1/sql_ut_common.h b/yql/essentials/sql/v1/sql_ut_common.h
index 518331289f5..5b0c5a28f26 100644
--- a/yql/essentials/sql/v1/sql_ut_common.h
+++ b/yql/essentials/sql/v1/sql_ut_common.h
@@ -9304,3 +9304,399 @@ Y_UNIT_TEST_SUITE(HoppingWindow) {
);
}
}
+
+Y_UNIT_TEST_SUITE(StreamingQuery) {
+ Y_UNIT_TEST(CreateStreamingQueryBasic) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+USE plato;
+-- Some comment
+CREATE STREAMING QUERY MyQuery AS DO BEGIN
+USE plato;
+$source = SELECT * FROM Input;
+INSERT INTO Output1 SELECT * FROM $source;
+INSERT INTO Output2 SELECT * FROM $source;END DO;
+USE hahn;
+-- Other comment
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "createObject") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '"\nUSE plato;\n$source = SELECT * FROM Input;\nINSERT INTO Output1 SELECT * FROM $source;\nINSERT INTO Output2 SELECT * FROM $source;")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("createObject"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObject"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(CreateStreamingQueryWithSettings) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+USE plato;
+-- Some comment
+CREATE STREAMING QUERY MyQuery WITH (
+ RUN = TRUE,
+ RESOURCE_POOL = my_pool
+) AS DO BEGIN
+USE plato;
+$source = SELECT * FROM Input;
+INSERT INTO Output1 SELECT * FROM $source;
+INSERT INTO Output2 SELECT * FROM $source;END DO;
+USE hahn;
+-- Other comment
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "createObject") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '"\nUSE plato;\n$source = SELECT * FROM Input;\nINSERT INTO Output1 SELECT * FROM $source;\nINSERT INTO Output2 SELECT * FROM $source;") '('"resource_pool" '"my_pool") '('"run" (Bool '"true")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("createObject"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObject"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(CreateOrReplaceStreamingQuery) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ CREATE OR REPLACE STREAMING QUERY MyQuery AS DO BEGIN /* create or replace */ SELECT 42; END DO;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "createObjectOrReplace") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* create or replace */ SELECT 42; ")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("createObjectOrReplace"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObjectOrReplace"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(CreateStreamingQueryIfNotExists) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ CREATE STREAMING QUERY IF NOT EXISTS MyQuery AS DO BEGIN /* create if not exists */ SELECT 42; END DO;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "createObjectIfNotExists") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* create if not exists */ SELECT 42; ")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("createObjectIfNotExists"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObjectIfNotExists"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(CreateStreamingQueryWithTablePrefix) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ PRAGMA TablePathPrefix='/aba';
+ CREATE STREAMING QUERY MyQuery AS DO BEGIN SELECT * FROM hahn.Input; END DO;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "createObject") {
+ UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyQuery");
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" SELECT * FROM hahn.Input; ")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("createObject"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["createObject"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(CreateStreamingQueryWithBadArguments) {
+#if ANTLR_VER == 3
+ ExpectFailWithError(R"sql(
+ USE plato;
+ CREATE STREAMING QUERY MyQuery WITH (OPTION = "VALUE");
+ )sql" , "<main>:3:66: Error: Unexpected token ';' : syntax error...\n\n");
+#else
+ ExpectFailWithError(R"sql(
+ USE plato;
+ CREATE STREAMING QUERY MyQuery WITH (OPTION = "VALUE");
+ )sql" , "<main>:3:66: Error: mismatched input ';' expecting AS\n");
+#endif
+
+ ExpectFailWithError(R"sql(
+ USE plato;
+ CREATE STREAMING QUERY MyQuery WITH (
+ DUPLICATE_SETTING = "first_value",
+ DUPLICATE_SETTING = "second_value"
+ ) AS
+ DO BEGIN
+ USE plato;
+ INSERT INTO Output SELECT * FROM Input;
+ END DO;
+ )sql" , "<main>:5:17: Error: Found duplicated parameter: DUPLICATE_SETTING\n");
+
+ ExpectFailWithError(R"sql(
+ USE plato;
+ CREATE STREAMING QUERY MyQuery WITH (
+ `__QUERY_TEXT` = "SELECT 42"
+ ) AS
+ DO BEGIN
+ USE plato;
+ INSERT INTO Output SELECT * FROM Input;
+ END DO;
+ )sql" , "<main>:4:17: Error: Streaming query parameter name should not start with prefix '__': __QUERY_TEXT\n");
+
+ ExpectFailWithError(R"sql(
+ USE plato;
+ $named_node = 42;
+ CREATE STREAMING QUERY MyQuery AS
+ DO BEGIN
+ SELECT $named_node;
+ END DO;
+ )sql" , "<main>:6:24: Error: Unknown name: $named_node\n");
+ }
+
+ Y_UNIT_TEST(AlterStreamingQuerySetQuery) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+USE plato;
+-- Some comment
+ALTER STREAMING QUERY MyQuery AS DO BEGIN
+USE plato;
+$source = SELECT * FROM Input;
+INSERT INTO Output1 SELECT * FROM $source;
+INSERT INTO Output2 SELECT * FROM $source;END DO;
+USE hahn;
+-- Other comment
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "alterObject") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '"\nUSE plato;\n$source = SELECT * FROM Input;\nINSERT INTO Output1 SELECT * FROM $source;\nINSERT INTO Output2 SELECT * FROM $source;")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("alterObject"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObject"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(AlterStreamingQuerySetOptions) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY MyQuery SET (
+ WAIT_CHECKPOINT = TRUE,
+ RESOURCE_POOL = other_pool
+ );
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"resource_pool" '"other_pool") '('"wait_checkpoint" (Bool '"true"))))#");
+ UNIT_ASSERT_STRING_CONTAINS(line, "alterObject");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(AlterStreamingQuerySetBothOptionsAndQuery) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY MyQuery SET (
+ WAIT_CHECKPOINT = TRUE,
+ RESOURCE_POOL = other_pool
+ ) AS DO BEGIN /* alter */ SELECT 42; END DO;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "alterObject") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* alter */ SELECT 42; ") '('"resource_pool" '"other_pool") '('"wait_checkpoint" (Bool '"true"))))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("alterObject"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObject"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(AlterStreamingQueryIfExists) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY IF EXISTS MyQuery AS DO BEGIN /* alter if exists */ SELECT 42; END DO;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "alterObjectIfExists") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" /* alter if exists */ SELECT 42; ")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("alterObjectIfExists"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObjectIfExists"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(AlterStreamingQueryWithTablePrefix) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ PRAGMA TablePathPrefix='/aba';
+ ALTER STREAMING QUERY MyQuery AS DO BEGIN SELECT * FROM hahn.Input; END DO;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "alterObject") {
+ UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyQuery");
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_ast" (block '()#");
+ }
+
+ if (word == "__query_text") {
+ UNIT_ASSERT_STRING_CONTAINS(line, R"#('('"__query_text" '" SELECT * FROM hahn.Input; ")))#");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("alterObject"), 0}, {TString("__query_text"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["alterObject"]);
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["__query_text"]);
+ }
+
+ Y_UNIT_TEST(AlterStreamingQueryWithBadArguments) {
+#if ANTLR_VER == 3
+ ExpectFailWithError(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY MyQuery;
+ )sql" , "<main>:3:41: Error: Unexpected token ';' : cannot match to any predicted input...\n\n");
+#else
+ ExpectFailWithError(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY MyQuery;
+ )sql" , "<main>:3:41: Error: mismatched input ';' expecting {AS, SET}\n");
+#endif
+
+ ExpectFailWithError(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY MyQuery SET (
+ DUPLICATE_SETTING = "first_value",
+ DUPLICATE_SETTING = "second_value"
+ );
+ )sql" , "<main>:5:17: Error: Found duplicated parameter: DUPLICATE_SETTING\n");
+
+ ExpectFailWithError(R"sql(
+ USE plato;
+ ALTER STREAMING QUERY MyQuery SET (
+ `__QUERY_TEXT` = "SELECT 42"
+ );
+ )sql" , "<main>:4:17: Error: Streaming query parameter name should not start with prefix '__': __QUERY_TEXT\n");
+
+ ExpectFailWithError(R"sql(
+ USE plato;
+ $named_node = 42;
+ ALTER STREAMING QUERY MyQuery AS
+ DO BEGIN
+ SELECT $named_node;
+ END DO;
+ )sql" , "<main>:6:24: Error: Unknown name: $named_node\n");
+ }
+
+ Y_UNIT_TEST(DropStreamingQueryBasic) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ DROP STREAMING QUERY MyQuery;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features"));
+ UNIT_ASSERT_STRING_CONTAINS(line, "dropObject");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(DropStreamingQueryIfExists) {
+ NYql::TAstParseResult res = SqlToYql(R"sql(
+ USE plato;
+ DROP STREAMING QUERY IF EXISTS MyQuery;
+ )sql");
+ UNIT_ASSERT_C(res.Root, res.Issues.ToOneLineString());
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features"));
+ UNIT_ASSERT_STRING_CONTAINS(line, "dropObjectIfExists");
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+}