summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql/v1/sql_query.cpp
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2025-08-28 16:56:17 +0300
committergrigoriypisar <[email protected]>2025-08-28 17:21:58 +0300
commit45c5e8ac55972ded21c846f9e05a3d61d197e3a4 (patch)
tree058b5d184adcc94e57a8262c9dcde768d5589eb7 /yql/essentials/sql/v1/sql_query.cpp
parent9be8744bb96ab00cbf25afb60ab54db5cbad95dc (diff)
added sql syntax for CREATE / ALTER DROP STREAMING QUERY
Добавлен синтаксис (переиспользован код для inline action): ``` CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] query_name [WITH ( key = value ... )] AS DO BEGIN ... END DO; ALTER STREAMING QUERY [IF EXISTS] query_name [SET( key = value ... )] [AS DO BEGIN ... END DO]; DROP STREAMING QUERY [IF EXISTS] query_name; ``` commit_hash:29fa6aa7e61ecf45112480fe3c1df8fab542354e
Diffstat (limited to 'yql/essentials/sql/v1/sql_query.cpp')
-rw-r--r--yql/essentials/sql/v1/sql_query.cpp118
1 files changed, 113 insertions, 5 deletions
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp
index d48fb6ff023..b84e2df0add 100644
--- a/yql/essentials/sql/v1/sql_query.cpp
+++ b/yql/essentials/sql/v1/sql_query.cpp
@@ -954,7 +954,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore29: {
@@ -1053,7 +1053,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", std::move(kv), std::move(toReset), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", false, std::move(kv), std::move(toReset), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore32: {
@@ -1323,7 +1323,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore41:
@@ -1486,7 +1486,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", std::move(kv), std::move(toReset), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", false, std::move(kv), std::move(toReset), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore47: {
@@ -1722,7 +1722,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
- AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", std::move(kv), std::move(toReset), context));
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", false, std::move(kv), std::move(toReset), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore54: {
@@ -2001,6 +2001,114 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildShowCreate(Ctx_.Pos(), tr, type, Ctx_.Scoped));
break;
}
+ case TRule_sql_stmt_core::kAltSqlStmtCore63: {
+ // create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref
+ // (WITH streaming_query_settings)?
+ // streaming_query_definition;
+
+ const auto& node = core.GetAlt_sql_stmt_core63().GetRule_create_streaming_query_stmt1();
+ Ctx_.Token(node.GetToken1());
+
+ // object_ref
+ TObjectOperatorContext context(Ctx_.Scoped);
+ const auto& objectPath = ParseObjectPath(node.GetRule_object_ref6(), context);
+ if (!objectPath) {
+ return false;
+ }
+
+ // OR REPLACE
+ const bool replaceIfExists = node.HasBlock2();
+ if (replaceIfExists) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken1().GetId(), OR) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken2().GetId(), REPLACE)
+ );
+ }
+
+ // IF NOT EXISTS
+ const bool existingOk = node.HasBlock5();
+ if (existingOk) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken1().GetId(), IF) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken2().GetId(), NOT) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken3().GetId(), EXISTS)
+ );
+ }
+
+ // WITH streaming_query_settings
+ TStreamingQuerySettings settings;
+ if (node.HasBlock7()) {
+ const auto& features = node.GetBlock7().GetRule_create_streaming_query_features1();
+ Ctx_.Token(features.GetToken1());
+
+ if (!ParseStreamingQuerySettings(features.GetRule_streaming_query_settings2(), settings)) {
+ return false;
+ }
+ }
+
+ // streaming_query_definition
+ if (!ParseStreamingQueryDefinition(node.GetRule_streaming_query_definition8(), settings)) {
+ return false;
+ }
+
+ AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", existingOk, replaceIfExists, std::move(settings.Features), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore64: {
+ // alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref
+ // alter_streaming_query_action;
+
+ const auto& node = core.GetAlt_sql_stmt_core64().GetRule_alter_streaming_query_stmt1();
+
+ // object_ref
+ TObjectOperatorContext context(Ctx_.Scoped);
+ const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context);
+ if (!objectPath) {
+ return false;
+ }
+
+ // IF EXISTS
+ const bool missingOk = node.HasBlock4();
+ if (missingOk) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS)
+ );
+ }
+
+ // alter_streaming_query_action
+ TStreamingQuerySettings settings;
+ if (!ParseAlterStreamingQueryAction(node.GetRule_alter_streaming_query_action6(), settings)) {
+ return false;
+ }
+
+ AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, std::move(settings.Features), {}, context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore65: {
+ // drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref;
+
+ const auto& node = core.GetAlt_sql_stmt_core65().GetRule_drop_streaming_query_stmt1();
+
+ // object_ref
+ TObjectOperatorContext context(Ctx_.Scoped);
+ const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context);
+ if (!objectPath) {
+ return false;
+ }
+
+ // IF EXISTS
+ const bool missingOk = node.HasBlock4();
+ if (missingOk) {
+ Y_DEBUG_ABORT_UNLESS(
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) &&
+ IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS)
+ );
+ }
+
+ AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, {}, context));
+ break;
+ }
case TRule_sql_stmt_core::ALT_NOT_SET:
Ctx_.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName);
AltNotImplemented("sql_stmt_core", core);