diff options
author | tesseract <[email protected]> | 2025-01-21 12:50:29 +0300 |
---|---|---|
committer | tesseract <[email protected]> | 2025-01-21 14:32:19 +0300 |
commit | e677409ecb6106695a976307290b2f6bad3d72c0 (patch) | |
tree | 7c4fe8c7334a8f814506c857a08322ea800a8b79 /yql/essentials/sql/v1/sql_query.cpp | |
parent | e2324a4c7934ecbc80eb47f70d2586c4995499b5 (diff) |
YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb
Diffstat (limited to 'yql/essentials/sql/v1/sql_query.cpp')
-rw-r--r-- | yql/essentials/sql/v1/sql_query.cpp | 173 |
1 files changed, 169 insertions, 4 deletions
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index eb1a174b30c..a04ce659861 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -143,7 +143,68 @@ static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings, return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, false); } -bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) { +static bool TransferSettingsEntry(std::map<TString, TNodePtr>& out, + const TRule_transfer_settings_entry& in, TSqlExpression& ctx, bool create) +{ + auto key = IdEx(in.GetRule_an_id1(), ctx); + auto value = ctx.Build(in.GetRule_expr3()); + + if (!value) { + ctx.Context().Error() << "Invalid transfer setting: " << key.Name; + return false; + } + + TSet<TString> configSettings = { + "connection_string", + "endpoint", + "database", + "token", + "token_secret_name", + "user", + "password", + "password_secret_name", + }; + + TSet<TString> stateSettings = { + "state", + "failover_mode", + }; + + const auto keyName = to_lower(key.Name); + if (!configSettings.count(keyName) && !stateSettings.contains(keyName)) { + ctx.Context().Error() << "Unknown transfer setting: " << key.Name; + return false; + } + + if (create && stateSettings.count(keyName)) { + ctx.Context().Error() << key.Name << " is not supported in CREATE"; + return false; + } + + if (!out.emplace(keyName, value).second) { + ctx.Context().Error() << "Duplicate transfer setting: " << key.Name; + } + + return true; +} + +static bool TransferSettings(std::map<TString, TNodePtr>& out, + const TRule_transfer_settings& in, TSqlExpression& ctx, bool create) +{ + if (!TransferSettingsEntry(out, in.GetRule_transfer_settings_entry1(), ctx, create)) { + return false; + } + + for (auto& block : in.GetBlock2()) { + if (!TransferSettingsEntry(out, block.GetRule_transfer_settings_entry2(), ctx, create)) { + return false; + } + } + + return true; +} + +bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core, size_t statementNumber) { TString internalStatementName; TString humanStatementName; ParseStatementName(core, internalStatementName, humanStatementName); @@ -161,6 +222,10 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& return false; } + if (NeedUseForAllStatements(altCase)) { + Ctx.ForAllStatementsParts.push_back(statementNumber); + } + switch (altCase) { case TRule_sql_stmt_core::kAltSqlStmtCore1: { bool success = false; @@ -1763,6 +1828,103 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildAlterSequence(pos, service, cluster, id, params, Ctx.Scoped)); break; } + case TRule_sql_stmt_core::kAltSqlStmtCore58: { + // create_transfer_stmt: CREATE TRANSFER + + auto& node = core.GetAlt_sql_stmt_core58().GetRule_create_transfer_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref3().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + auto prefixPath = Ctx.GetPrefixPath(context.ServiceId, context.Cluster); + + std::map<TString, TNodePtr> settings; + TSqlExpression expr(Ctx, Mode); + if (!TransferSettings(settings, node.GetRule_transfer_settings11(), expr, true)) { + return false; + } + + const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + const TString source = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second; + const TString target = Id(node.GetRule_object_ref7().GetRule_id_or_at2(), *this).second; + TString transformLambda; + if (node.GetBlock8().HasRule_lambda_or_parameter2()) { + if (!ParseTransferLambda(transformLambda, node.GetBlock8().GetRule_lambda_or_parameter2())) { + return false; + } + } + + AddStatementToBlocks(blocks, BuildCreateTransfer(Ctx.Pos(), BuildTablePath(prefixPath, id), + std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore59: { + // alter_transfer_stmt: ALTER TRANSFER + auto& node = core.GetAlt_sql_stmt_core59().GetRule_alter_transfer_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref3().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + std::map<TString, TNodePtr> settings; + std::optional<TString> transformLambda; + TSqlExpression expr(Ctx, Mode); + + auto transferAlterAction = [&](std::optional<TString>& transformLambda, const TRule_alter_transfer_action& in) + { + if (in.HasAlt_alter_transfer_action1()) { + return TransferSettings(settings, in.GetAlt_alter_transfer_action1().GetRule_alter_transfer_set_setting1().GetRule_transfer_settings3(), expr, false); + } else if (in.HasAlt_alter_transfer_action2()) { + TString lb; + if (!ParseTransferLambda(lb, in.GetAlt_alter_transfer_action2().GetRule_alter_transfer_set_using1().GetRule_lambda_or_parameter3())) { + return false; + } + transformLambda = lb; + return true; + } + + return false; + }; + + if (!transferAlterAction(transformLambda, node.GetRule_alter_transfer_action4())) { + return false; + } + for (auto& block : node.GetBlock5()) { + if (!transferAlterAction(transformLambda, block.GetRule_alter_transfer_action2())) { + return false; + } + } + + const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildAlterTransfer(Ctx.Pos(), + BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id), + std::move(transformLambda), std::move(settings), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore60: { + // drop_transfer_stmt: DROP TRANSFER + auto& node = core.GetAlt_sql_stmt_core60().GetRule_drop_transfer_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref3().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildDropTransfer(Ctx.Pos(), + BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id), + node.HasBlock4(), context)); + break; + } case TRule_sql_stmt_core::ALT_NOT_SET: Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName); AltNotImplemented("sql_stmt_core", core); @@ -3471,12 +3633,13 @@ TNodePtr TSqlQuery::Build(const TSQLv1ParserAST& ast) { Ctx.PopCurrentBlocks(); }; if (query.Alt_case() == TRule_sql_query::kAltSqlQuery1) { + size_t statementNumber = 0; const auto& statements = query.GetAlt_sql_query1().GetRule_sql_stmt_list1(); - if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) { + if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) { return nullptr; } for (auto block: statements.GetBlock3()) { - if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) { + if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) { return nullptr; } } @@ -3546,8 +3709,10 @@ TNodePtr TSqlQuery::Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_co Y_DEFER { Ctx.PopCurrentBlocks(); }; + + size_t statementNumber = 0; for (const auto& statement : statements) { - if (!Statement(blocks, statement)) { + if (!Statement(blocks, statement, statementNumber++)) { return nullptr; } } |