diff options
| author | grigoriypisar <[email protected]> | 2025-08-28 16:56:17 +0300 |
|---|---|---|
| committer | grigoriypisar <[email protected]> | 2025-08-28 17:21:58 +0300 |
| commit | 45c5e8ac55972ded21c846f9e05a3d61d197e3a4 (patch) | |
| tree | 058b5d184adcc94e57a8262c9dcde768d5589eb7 /yql/essentials/sql/v1/sql_query.cpp | |
| parent | 9be8744bb96ab00cbf25afb60ab54db5cbad95dc (diff) | |
added sql syntax for CREATE / ALTER DROP STREAMING QUERY
Добавлен синтаксис (переиспользован код для inline action):
```
CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] query_name [WITH (
key = value
...
)] AS DO BEGIN
...
END DO;
ALTER STREAMING QUERY [IF EXISTS] query_name [SET(
key = value
...
)] [AS DO BEGIN
...
END DO];
DROP STREAMING QUERY [IF EXISTS] query_name;
```
commit_hash:29fa6aa7e61ecf45112480fe3c1df8fab542354e
Diffstat (limited to 'yql/essentials/sql/v1/sql_query.cpp')
| -rw-r--r-- | yql/essentials/sql/v1/sql_query.cpp | 118 |
1 files changed, 113 insertions, 5 deletions
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index d48fb6ff023..b84e2df0add 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -954,7 +954,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& return false; } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore29: { @@ -1053,7 +1053,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", std::move(kv), std::move(toReset), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "EXTERNAL_DATA_SOURCE", false, std::move(kv), std::move(toReset), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore32: { @@ -1323,7 +1323,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& return false; } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, std::move(kv), std::set<TString>(), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, typeId, false, std::move(kv), std::set<TString>(), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore41: @@ -1486,7 +1486,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", std::move(kv), std::move(toReset), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL", false, std::move(kv), std::move(toReset), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore47: { @@ -1722,7 +1722,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } - AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", std::move(kv), std::move(toReset), context)); + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), objectId, "RESOURCE_POOL_CLASSIFIER", false, std::move(kv), std::move(toReset), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore54: { @@ -2001,6 +2001,114 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildShowCreate(Ctx_.Pos(), tr, type, Ctx_.Scoped)); break; } + case TRule_sql_stmt_core::kAltSqlStmtCore63: { + // create_streaming_query_stmt: CREATE (OR REPLACE)? STREAMING QUERY (IF NOT EXISTS)? object_ref + // (WITH streaming_query_settings)? + // streaming_query_definition; + + const auto& node = core.GetAlt_sql_stmt_core63().GetRule_create_streaming_query_stmt1(); + Ctx_.Token(node.GetToken1()); + + // object_ref + TObjectOperatorContext context(Ctx_.Scoped); + const auto& objectPath = ParseObjectPath(node.GetRule_object_ref6(), context); + if (!objectPath) { + return false; + } + + // OR REPLACE + const bool replaceIfExists = node.HasBlock2(); + if (replaceIfExists) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken1().GetId(), OR) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock2().GetToken2().GetId(), REPLACE) + ); + } + + // IF NOT EXISTS + const bool existingOk = node.HasBlock5(); + if (existingOk) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken1().GetId(), IF) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken2().GetId(), NOT) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock5().GetToken3().GetId(), EXISTS) + ); + } + + // WITH streaming_query_settings + TStreamingQuerySettings settings; + if (node.HasBlock7()) { + const auto& features = node.GetBlock7().GetRule_create_streaming_query_features1(); + Ctx_.Token(features.GetToken1()); + + if (!ParseStreamingQuerySettings(features.GetRule_streaming_query_settings2(), settings)) { + return false; + } + } + + // streaming_query_definition + if (!ParseStreamingQueryDefinition(node.GetRule_streaming_query_definition8(), settings)) { + return false; + } + + AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", existingOk, replaceIfExists, std::move(settings.Features), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore64: { + // alter_streaming_query_stmt: ALTER STREAMING QUERY (IF EXISTS)? object_ref + // alter_streaming_query_action; + + const auto& node = core.GetAlt_sql_stmt_core64().GetRule_alter_streaming_query_stmt1(); + + // object_ref + TObjectOperatorContext context(Ctx_.Scoped); + const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context); + if (!objectPath) { + return false; + } + + // IF EXISTS + const bool missingOk = node.HasBlock4(); + if (missingOk) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS) + ); + } + + // alter_streaming_query_action + TStreamingQuerySettings settings; + if (!ParseAlterStreamingQueryAction(node.GetRule_alter_streaming_query_action6(), settings)) { + return false; + } + + AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, std::move(settings.Features), {}, context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore65: { + // drop_streaming_query_stmt: DROP STREAMING QUERY (IF EXISTS)? object_ref; + + const auto& node = core.GetAlt_sql_stmt_core65().GetRule_drop_streaming_query_stmt1(); + + // object_ref + TObjectOperatorContext context(Ctx_.Scoped); + const auto& objectPath = ParseObjectPath(node.GetRule_object_ref5(), context); + if (!objectPath) { + return false; + } + + // IF EXISTS + const bool missingOk = node.HasBlock4(); + if (missingOk) { + Y_DEBUG_ABORT_UNLESS( + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken1().GetId(), IF) && + IS_TOKEN(Ctx_.Settings.Antlr4Parser, node.GetBlock4().GetToken2().GetId(), EXISTS) + ); + } + + AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx_.Pos(), *objectPath, "STREAMING_QUERY", missingOk, {}, context)); + break; + } case TRule_sql_stmt_core::ALT_NOT_SET: Ctx_.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName); AltNotImplemented("sql_stmt_core", core); |
