summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql/v1/sql_query.cpp
diff options
context:
space:
mode:
authortesseract <[email protected]>2025-01-21 12:50:29 +0300
committertesseract <[email protected]>2025-01-21 14:32:19 +0300
commite677409ecb6106695a976307290b2f6bad3d72c0 (patch)
tree7c4fe8c7334a8f814506c857a08322ea800a8b79 /yql/essentials/sql/v1/sql_query.cpp
parente2324a4c7934ecbc80eb47f70d2586c4995499b5 (diff)
YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb
Diffstat (limited to 'yql/essentials/sql/v1/sql_query.cpp')
-rw-r--r--yql/essentials/sql/v1/sql_query.cpp173
1 files changed, 169 insertions, 4 deletions
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp
index eb1a174b30c..a04ce659861 100644
--- a/yql/essentials/sql/v1/sql_query.cpp
+++ b/yql/essentials/sql/v1/sql_query.cpp
@@ -143,7 +143,68 @@ static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings,
return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, false);
}
-bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) {
+static bool TransferSettingsEntry(std::map<TString, TNodePtr>& out,
+ const TRule_transfer_settings_entry& in, TSqlExpression& ctx, bool create)
+{
+ auto key = IdEx(in.GetRule_an_id1(), ctx);
+ auto value = ctx.Build(in.GetRule_expr3());
+
+ if (!value) {
+ ctx.Context().Error() << "Invalid transfer setting: " << key.Name;
+ return false;
+ }
+
+ TSet<TString> configSettings = {
+ "connection_string",
+ "endpoint",
+ "database",
+ "token",
+ "token_secret_name",
+ "user",
+ "password",
+ "password_secret_name",
+ };
+
+ TSet<TString> stateSettings = {
+ "state",
+ "failover_mode",
+ };
+
+ const auto keyName = to_lower(key.Name);
+ if (!configSettings.count(keyName) && !stateSettings.contains(keyName)) {
+ ctx.Context().Error() << "Unknown transfer setting: " << key.Name;
+ return false;
+ }
+
+ if (create && stateSettings.count(keyName)) {
+ ctx.Context().Error() << key.Name << " is not supported in CREATE";
+ return false;
+ }
+
+ if (!out.emplace(keyName, value).second) {
+ ctx.Context().Error() << "Duplicate transfer setting: " << key.Name;
+ }
+
+ return true;
+}
+
+static bool TransferSettings(std::map<TString, TNodePtr>& out,
+ const TRule_transfer_settings& in, TSqlExpression& ctx, bool create)
+{
+ if (!TransferSettingsEntry(out, in.GetRule_transfer_settings_entry1(), ctx, create)) {
+ return false;
+ }
+
+ for (auto& block : in.GetBlock2()) {
+ if (!TransferSettingsEntry(out, block.GetRule_transfer_settings_entry2(), ctx, create)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core, size_t statementNumber) {
TString internalStatementName;
TString humanStatementName;
ParseStatementName(core, internalStatementName, humanStatementName);
@@ -161,6 +222,10 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}
+ if (NeedUseForAllStatements(altCase)) {
+ Ctx.ForAllStatementsParts.push_back(statementNumber);
+ }
+
switch (altCase) {
case TRule_sql_stmt_core::kAltSqlStmtCore1: {
bool success = false;
@@ -1763,6 +1828,103 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildAlterSequence(pos, service, cluster, id, params, Ctx.Scoped));
break;
}
+ case TRule_sql_stmt_core::kAltSqlStmtCore58: {
+ // create_transfer_stmt: CREATE TRANSFER
+
+ auto& node = core.GetAlt_sql_stmt_core58().GetRule_create_transfer_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref3().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ auto prefixPath = Ctx.GetPrefixPath(context.ServiceId, context.Cluster);
+
+ std::map<TString, TNodePtr> settings;
+ TSqlExpression expr(Ctx, Mode);
+ if (!TransferSettings(settings, node.GetRule_transfer_settings11(), expr, true)) {
+ return false;
+ }
+
+ const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second;
+ const TString source = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second;
+ const TString target = Id(node.GetRule_object_ref7().GetRule_id_or_at2(), *this).second;
+ TString transformLambda;
+ if (node.GetBlock8().HasRule_lambda_or_parameter2()) {
+ if (!ParseTransferLambda(transformLambda, node.GetBlock8().GetRule_lambda_or_parameter2())) {
+ return false;
+ }
+ }
+
+ AddStatementToBlocks(blocks, BuildCreateTransfer(Ctx.Pos(), BuildTablePath(prefixPath, id),
+ std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore59: {
+ // alter_transfer_stmt: ALTER TRANSFER
+ auto& node = core.GetAlt_sql_stmt_core59().GetRule_alter_transfer_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref3().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ std::map<TString, TNodePtr> settings;
+ std::optional<TString> transformLambda;
+ TSqlExpression expr(Ctx, Mode);
+
+ auto transferAlterAction = [&](std::optional<TString>& transformLambda, const TRule_alter_transfer_action& in)
+ {
+ if (in.HasAlt_alter_transfer_action1()) {
+ return TransferSettings(settings, in.GetAlt_alter_transfer_action1().GetRule_alter_transfer_set_setting1().GetRule_transfer_settings3(), expr, false);
+ } else if (in.HasAlt_alter_transfer_action2()) {
+ TString lb;
+ if (!ParseTransferLambda(lb, in.GetAlt_alter_transfer_action2().GetRule_alter_transfer_set_using1().GetRule_lambda_or_parameter3())) {
+ return false;
+ }
+ transformLambda = lb;
+ return true;
+ }
+
+ return false;
+ };
+
+ if (!transferAlterAction(transformLambda, node.GetRule_alter_transfer_action4())) {
+ return false;
+ }
+ for (auto& block : node.GetBlock5()) {
+ if (!transferAlterAction(transformLambda, block.GetRule_alter_transfer_action2())) {
+ return false;
+ }
+ }
+
+ const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildAlterTransfer(Ctx.Pos(),
+ BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id),
+ std::move(transformLambda), std::move(settings), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore60: {
+ // drop_transfer_stmt: DROP TRANSFER
+ auto& node = core.GetAlt_sql_stmt_core60().GetRule_drop_transfer_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref3().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildDropTransfer(Ctx.Pos(),
+ BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id),
+ node.HasBlock4(), context));
+ break;
+ }
case TRule_sql_stmt_core::ALT_NOT_SET:
Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName);
AltNotImplemented("sql_stmt_core", core);
@@ -3471,12 +3633,13 @@ TNodePtr TSqlQuery::Build(const TSQLv1ParserAST& ast) {
Ctx.PopCurrentBlocks();
};
if (query.Alt_case() == TRule_sql_query::kAltSqlQuery1) {
+ size_t statementNumber = 0;
const auto& statements = query.GetAlt_sql_query1().GetRule_sql_stmt_list1();
- if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) {
+ if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) {
return nullptr;
}
for (auto block: statements.GetBlock3()) {
- if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) {
+ if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) {
return nullptr;
}
}
@@ -3546,8 +3709,10 @@ TNodePtr TSqlQuery::Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_co
Y_DEFER {
Ctx.PopCurrentBlocks();
};
+
+ size_t statementNumber = 0;
for (const auto& statement : statements) {
- if (!Statement(blocks, statement)) {
+ if (!Statement(blocks, statement, statementNumber++)) {
return nullptr;
}
}