diff options
author | tesseract <[email protected]> | 2025-01-21 12:50:29 +0300 |
---|---|---|
committer | tesseract <[email protected]> | 2025-01-21 14:32:19 +0300 |
commit | e677409ecb6106695a976307290b2f6bad3d72c0 (patch) | |
tree | 7c4fe8c7334a8f814506c857a08322ea800a8b79 | |
parent | e2324a4c7934ecbc80eb47f70d2586c4995499b5 (diff) |
YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb
25 files changed, 1018 insertions, 191 deletions
diff --git a/yql/essentials/providers/common/provider/yql_provider.cpp b/yql/essentials/providers/common/provider/yql_provider.cpp index 79156cd8d37..0b7a1ef6609 100644 --- a/yql/essentials/providers/common/provider/yql_provider.cpp +++ b/yql/essentials/providers/common/provider/yql_provider.cpp @@ -604,6 +604,60 @@ TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprCon return ret; } +TWriteTransferSettings ParseWriteTransferSettings(TExprList node, TExprContext& ctx) { + TMaybeNode<TCoAtom> mode; + TMaybeNode<TCoAtom> source; + TMaybeNode<TCoAtom> target; + TMaybeNode<TCoAtom> transformLambda; + TVector<TCoNameValueTuple> settings; + TVector<TCoNameValueTuple> other; + + for (auto child : node) { + if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) { + auto tuple = maybeTuple.Cast(); + auto name = tuple.Name().Value(); + + if (name == "mode") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + mode = tuple.Value().Cast<TCoAtom>(); + } else if (name == "source") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + source = tuple.Value().Cast<TCoAtom>(); + } else if (name == "target") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + target = tuple.Value().Cast<TCoAtom>(); + } else if (name == "transformLambda") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + transformLambda = tuple.Value().Cast<TCoAtom>(); + } else if (name == "settings") { + YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>()); + for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) { + settings.push_back(item); + } + } else { + other.push_back(tuple); + } + } + } + + const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add(settings) + .Done(); + + const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add(other) + .Done(); + + TWriteTransferSettings ret(builtOther); + ret.Mode = mode; + ret.Source = source; + ret.Target = target; + ret.TransformLambda = transformLambda; + ret.TransferSettings = builtSettings; + + return ret; +} + TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) { TMaybeNode<TCoAtom> mode; TVector<TCoAtom> roles; @@ -858,7 +912,7 @@ bool FillUsedFilesImpl( if (node.GetTypeAnn()) { usedPgExtensions |= node.GetTypeAnn()->GetUsedPgExtensions(); } - + if (node.IsCallable("PgResolvedCall")) { auto procId = FromString<ui32>(node.Child(1)->Content()); const auto& proc = NPg::LookupProc(procId); @@ -1072,7 +1126,7 @@ void FillSecureParams( } } -bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files, +bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files, const TTypeAnnotationContext& types, TPositionHandle pos, TExprContext& ctx) { TUserDataBlock block; @@ -1140,7 +1194,7 @@ bool FillUsedFiles( return false; } } - + Y_ENSURE(remainingPgExtensions == 0); if (!needFullPgCatalog) { return true; diff --git a/yql/essentials/providers/common/provider/yql_provider.h b/yql/essentials/providers/common/provider/yql_provider.h index 978621d4b16..ea9992dd5e2 100644 --- a/yql/essentials/providers/common/provider/yql_provider.h +++ b/yql/essentials/providers/common/provider/yql_provider.h @@ -97,6 +97,19 @@ struct TWriteReplicationSettings { {} }; +struct TWriteTransferSettings { + NNodes::TMaybeNode<NNodes::TCoAtom> Mode; + NNodes::TMaybeNode<NNodes::TCoAtom> Source; + NNodes::TMaybeNode<NNodes::TCoAtom> Target; + NNodes::TMaybeNode<NNodes::TCoAtom> TransformLambda; + NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TransferSettings; + NNodes::TCoNameValueTupleList Other; + + TWriteTransferSettings(const NNodes::TCoNameValueTupleList& other) + : Other(other) + {} +}; + struct TWriteRoleSettings { NNodes::TMaybeNode<NNodes::TCoAtom> Mode; NNodes::TMaybeNode<NNodes::TCoAtomList> Roles; @@ -168,6 +181,7 @@ TVector<TString> GetResOrPullColumnHints(const TExprNode& node); TWriteTableSettings ParseWriteTableSettings(NNodes::TExprList node, TExprContext& ctx); TWriteTopicSettings ParseWriteTopicSettings(NNodes::TExprList node, TExprContext& ctx); TWriteReplicationSettings ParseWriteReplicationSettings(NNodes::TExprList node, TExprContext& ctx); +TWriteTransferSettings ParseWriteTransferSettings(NNodes::TExprList node, TExprContext& ctx); TWriteRoleSettings ParseWriteRoleSettings(NNodes::TExprList node, TExprContext& ctx); TWriteObjectSettings ParseWriteObjectSettings(NNodes::TExprList node, TExprContext& ctx); diff --git a/yql/essentials/sql/sql.cpp b/yql/essentials/sql/sql.cpp index 55409d5ba3f..6ecd365a906 100644 --- a/yql/essentials/sql/sql.cpp +++ b/yql/essentials/sql/sql.cpp @@ -144,6 +144,10 @@ namespace NSQLTranslation { } NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings) { + return SqlASTToYql("", protoAst, hints, settings); + } + + NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings) { NYql::TAstParseResult result; switch (settings.SyntaxVersion) { case 0: @@ -161,7 +165,7 @@ namespace NSQLTranslation { return NSQLTranslationV0::SqlASTToYql(protoAst, settings); case 1: - return NSQLTranslationV1::SqlASTToYql(protoAst, hints, settings); + return NSQLTranslationV1::SqlASTToYql(query, protoAst, hints, settings); default: result.Issues.AddIssue(NYql::YqlIssue(NYql::TPosition(), NYql::TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unknown SQL syntax version: " << settings.SyntaxVersion)); diff --git a/yql/essentials/sql/sql.h b/yql/essentials/sql/sql.h index 26a61b22313..f0f844c88a1 100644 --- a/yql/essentials/sql/sql.h +++ b/yql/essentials/sql/sql.h @@ -21,7 +21,11 @@ namespace NSQLTranslation { google::protobuf::Message* SqlAST(const TString& query, const TString& queryName, NYql::TIssues& issues, size_t maxErrors, const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr); ILexer::TPtr SqlLexer(const TString& query, NYql::TIssues& issues, const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr); + + /*[[deprecated]] Use SqlASTToYql(query, protoAst, hints, settings)*/ NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings); + NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings); + TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const TTranslationSettings& settings, NYql::TWarningRules* warningRules = nullptr, ui16* actualSyntaxVersion = nullptr, TVector<NYql::TStmtParseInfo>* stmtParseInfo = nullptr); diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in index 192a1a6026f..510afaa501e 100644 --- a/yql/essentials/sql/v1/SQLv1.g.in +++ b/yql/essentials/sql/v1/SQLv1.g.in @@ -76,6 +76,9 @@ sql_stmt_core: | backup_stmt | restore_stmt | alter_sequence_stmt + | create_transfer_stmt + | alter_transfer_stmt + | drop_transfer_stmt ; expr: @@ -906,6 +909,29 @@ alter_replication_set_setting: SET LPAREN replication_settings RPAREN; drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?; +create_transfer_stmt: CREATE TRANSFER object_ref + FROM object_ref TO object_ref (USING lambda_or_parameter)? + WITH LPAREN transfer_settings RPAREN +; + +lambda_or_parameter: + lambda + | bind_parameter +; +transfer_settings: transfer_settings_entry (COMMA transfer_settings_entry)*; +transfer_settings_entry: an_id EQUALS expr; + +alter_transfer_stmt: ALTER TRANSFER object_ref alter_transfer_action (COMMA alter_transfer_action)*; +alter_transfer_action: + alter_transfer_set_setting + | alter_transfer_set_using +; + +alter_transfer_set_setting: SET LPAREN transfer_settings RPAREN; +alter_transfer_set_using: SET USING lambda_or_parameter; + +drop_transfer_stmt: DROP TRANSFER object_ref CASCADE?; + action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*; define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE; @@ -1466,6 +1492,7 @@ keyword_as_compat: | TO | TOPIC | TRANSACTION + | TRANSFER | TRIGGER | TYPE | UNCONDITIONAL @@ -1693,6 +1720,7 @@ keyword_compat: ( | TO | TOPIC | TRANSACTION + | TRANSFER | TRIGGER | TYPE | UNCONDITIONAL @@ -2072,6 +2100,7 @@ TIES: T I E S; TO: T O; TOPIC: T O P I C; TRANSACTION: T R A N S A C T I O N; +TRANSFER: T R A N S F E R; TRIGGER: T R I G G E R; TRUE: T R U E; TUPLE: T U P L E; diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in index 3ae4ad6492d..2ddfd0c748c 100644 --- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in +++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in @@ -75,6 +75,9 @@ sql_stmt_core: | backup_stmt | restore_stmt | alter_sequence_stmt + | create_transfer_stmt + | alter_transfer_stmt + | drop_transfer_stmt ; expr: @@ -905,6 +908,30 @@ alter_replication_set_setting: SET LPAREN replication_settings RPAREN; drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?; +lambda_or_parameter: + lambda + | bind_parameter +; + +create_transfer_stmt: CREATE TRANSFER object_ref + FROM object_ref TO object_ref (USING lambda_or_parameter)? + WITH LPAREN transfer_settings RPAREN +; + +transfer_settings: transfer_settings_entry (COMMA transfer_settings_entry)*; +transfer_settings_entry: an_id EQUALS expr; + +alter_transfer_stmt: ALTER TRANSFER object_ref alter_transfer_action (COMMA alter_transfer_action)*; +alter_transfer_action: + alter_transfer_set_setting + | alter_transfer_set_using +; + +alter_transfer_set_setting: SET LPAREN transfer_settings RPAREN; +alter_transfer_set_using: SET USING lambda_or_parameter; + +drop_transfer_stmt: DROP TRANSFER object_ref CASCADE?; + action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*; define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE; @@ -1465,6 +1492,7 @@ keyword_as_compat: | TO | TOPIC | TRANSACTION + | TRANSFER | TRIGGER | TYPE | UNCONDITIONAL @@ -1692,6 +1720,7 @@ keyword_compat: ( | TO | TOPIC | TRANSACTION + | TRANSFER | TRIGGER | TYPE | UNCONDITIONAL @@ -2071,6 +2100,7 @@ TIES: T I E S; TO: T O; TOPIC: T O P I C; TRANSACTION: T R A N S A C T I O N; +TRANSFER: T R A N S F E R; TRIGGER: T R I G G E R; TRUE: T R U E; TUPLE: T U P L E; diff --git a/yql/essentials/sql/v1/context.cpp b/yql/essentials/sql/v1/context.cpp index 43afc53c38b..d28c31469b4 100644 --- a/yql/essentials/sql/v1/context.cpp +++ b/yql/essentials/sql/v1/context.cpp @@ -83,12 +83,14 @@ THashMap<TStringBuf, TPragmaMaybeField> CTX_PRAGMA_MAYBE_FIELDS = { TContext::TContext(const NSQLTranslation::TTranslationSettings& settings, const NSQLTranslation::TSQLHints& hints, - TIssues& issues) + TIssues& issues, + const TString& query) : ClusterMapping(settings.ClusterMapping) , PathPrefix(settings.PathPrefix) , ClusterPathPrefixes(settings.ClusterPathPrefixes) , SQLHints(hints) , Settings(settings) + , Query(query) , Pool(new TMemoryPool(4096)) , Issues(issues) , IncrementMonCounterFunction(settings.IncrementCounter) diff --git a/yql/essentials/sql/v1/context.h b/yql/essentials/sql/v1/context.h index a5001fd157c..85a4739eaf5 100644 --- a/yql/essentials/sql/v1/context.h +++ b/yql/essentials/sql/v1/context.h @@ -92,7 +92,8 @@ namespace NSQLTranslationV1 { public: TContext(const NSQLTranslation::TTranslationSettings& settings, const NSQLTranslation::TSQLHints& hints, - NYql::TIssues& issues); + NYql::TIssues& issues, + const TString& query = {}); virtual ~TContext(); @@ -237,6 +238,7 @@ namespace NSQLTranslationV1 { THashMap<TString, std::pair<TPosition, TNodePtr>> Variables; THashSet<TString> WeakVariables; NSQLTranslation::TTranslationSettings Settings; + const TString Query; std::unique_ptr<TMemoryPool> Pool; NYql::TIssues& Issues; TMap<TString, TNodePtr> UniversalAliases; @@ -328,6 +330,7 @@ namespace NSQLTranslationV1 { bool DistinctOverWindow = false; bool SeqMode = false; bool EmitUnionMerge = false; + TVector<size_t> ForAllStatementsParts; }; class TColumnRefScope { diff --git a/yql/essentials/sql/v1/format/sql_format.cpp b/yql/essentials/sql/v1/format/sql_format.cpp index b6986310aa1..26d7eb9287b 100644 --- a/yql/essentials/sql/v1/format/sql_format.cpp +++ b/yql/essentials/sql/v1/format/sql_format.cpp @@ -165,144 +165,6 @@ bool Validate(const TParsedTokenList& query, const TParsedTokenList& formattedQu return in == inEnd && out == outEnd && parenthesesBalance == 0; } -enum EParenType { - Open, - Close, - None -}; - -using TAdvanceCallback = std::function<EParenType(TTokenIterator& curr, TTokenIterator end)>; - -TTokenIterator SkipToNextBalanced(TTokenIterator begin, TTokenIterator end, const TAdvanceCallback& advance) { - i64 level = 0; - TTokenIterator curr = begin; - while (curr != end) { - switch (advance(curr, end)) { - case EParenType::Open: { - ++level; - break; - } - case EParenType::Close: { - --level; - if (level < 0) { - return end; - } else if (level == 0) { - return curr; - } - break; - } - case EParenType::None: - break; - } - } - return curr; -} - -TTokenIterator GetNextStatementBegin(TTokenIterator begin, TTokenIterator end) { - TAdvanceCallback advanceLambdaBody = [](TTokenIterator& curr, TTokenIterator end) -> EParenType { - Y_UNUSED(end); - if (curr->Name == "LBRACE_CURLY") { - ++curr; - return EParenType::Open; - } else if (curr->Name == "RBRACE_CURLY") { - ++curr; - return EParenType::Close; - } else { - ++curr; - return EParenType::None; - } - }; - - TAdvanceCallback advanceAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType { - auto tmp = curr; - if (curr->Name == "DEFINE") { - ++curr; - curr = SkipWSOrComment(curr, end); - if (curr != end && (curr->Name == "ACTION" || curr->Name == "SUBQUERY")) { - ++curr; - return EParenType::Open; - } - } else if (curr->Name == "END") { - ++curr; - curr = SkipWSOrComment(curr, end); - if (curr != end && curr->Name == "DEFINE") { - ++curr; - return EParenType::Close; - } - } - - curr = tmp; - ++curr; - return EParenType::None; - }; - - TAdvanceCallback advanceInlineAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType { - auto tmp = curr; - if (curr->Name == "DO") { - ++curr; - curr = SkipWSOrComment(curr, end); - if (curr != end && curr->Name == "BEGIN") { - ++curr; - return EParenType::Open; - } - } else if (curr->Name == "END") { - ++curr; - curr = SkipWSOrComment(curr, end); - if (curr != end && curr->Name == "DO") { - ++curr; - return EParenType::Close; - } - } - - curr = tmp; - ++curr; - return EParenType::None; - }; - - TTokenIterator curr = begin; - while (curr != end) { - bool matched = false; - for (auto cb : {advanceLambdaBody, advanceAction, advanceInlineAction}) { - TTokenIterator tmp = curr; - if (cb(tmp, end) == EParenType::Open) { - curr = SkipToNextBalanced(curr, end, cb); - matched = true; - if (curr == end) { - return curr; - } - } - } - if (matched) { - continue; - } - if (curr->Name == "SEMICOLON") { - auto next = SkipWS(curr + 1, end); - while (next != end && next->Name == "COMMENT" && curr->Line == next->Line) { - curr = next; - next = SkipWS(next + 1, end); - } - ++curr; - break; - } - ++curr; - } - - return curr; -} - -void SplitByStatements(TTokenIterator begin, TTokenIterator end, TVector<TTokenIterator>& output) { - output.clear(); - if (begin == end) { - return; - } - output.push_back(begin); - auto curr = begin; - while (curr != end) { - curr = GetNextStatementBegin(curr, end); - output.push_back(curr); - } -} - enum class EScope { Default, TypeName, @@ -833,6 +695,7 @@ private: case TRule_sql_stmt_core::kAltSqlStmtCore14: // export case TRule_sql_stmt_core::kAltSqlStmtCore32: // drop external data source case TRule_sql_stmt_core::kAltSqlStmtCore34: // drop replication + case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer return true; case TRule_sql_stmt_core::kAltSqlStmtCore3: { // named nodes const auto& stmt = msg.GetAlt_sql_stmt_core3().GetRule_named_nodes_stmt1(); @@ -1541,6 +1404,21 @@ private: VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg); } + void VisitCreateTransfer(const TRule_create_transfer_stmt& msg) { + NewLine(); + VisitAllFields(TRule_create_transfer_stmt::GetDescriptor(), msg); + } + + void VisitAlterTransfer(const TRule_alter_transfer_stmt& msg) { + NewLine(); + VisitAllFields(TRule_alter_transfer_stmt::GetDescriptor(), msg); + } + + void VisitDropTransfer(const TRule_drop_transfer_stmt& msg) { + NewLine(); + VisitAllFields(TRule_drop_transfer_stmt::GetDescriptor(), msg); + } + void VisitCreateResourcePool(const TRule_create_resource_pool_stmt& msg) { NewLine(); VisitAllFields(TRule_create_resource_pool_stmt::GetDescriptor(), msg); @@ -3107,6 +2985,9 @@ TStaticData::TStaticData() {TRule_create_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateAsyncReplication)}, {TRule_alter_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterAsyncReplication)}, {TRule_drop_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropAsyncReplication)}, + {TRule_create_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateTransfer)}, + {TRule_alter_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTransfer)}, + {TRule_drop_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropTransfer)}, {TRule_create_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateTopic)}, {TRule_alter_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTopic)}, {TRule_drop_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropTopic)}, @@ -3188,42 +3069,15 @@ public: } auto lexer = NSQLTranslationV1::MakeLexer(parsedSettings.AnsiLexer, parsedSettings.Antlr4Parser); - TParsedTokenList allTokens; - auto onNextToken = [&](NSQLTranslation::TParsedToken&& token) { - if (token.Name != "EOF") { - allTokens.push_back(token); - } - }; - - if (!lexer->Tokenize(query, "Query", onNextToken, issues, NSQLTranslation::SQL_MAX_PARSER_ERRORS)) { + TVector<TString> statements; + if (!NSQLTranslationV1::SplitQueryToStatements(query, lexer, statements, issues)) { return false; } - TVector<TTokenIterator> statements; - SplitByStatements(allTokens.begin(), allTokens.end(), statements); TStringBuilder finalFormattedQuery; bool prevAddLine = false; TMaybe<ui32> prevStmtCoreAltCase; - for (size_t i = 1; i < statements.size(); ++i) { - TStringBuilder currentQueryBuilder; - for (auto it = statements[i - 1]; it != statements[i]; ++it) { - currentQueryBuilder << it->Content; - } - - TString currentQuery = currentQueryBuilder; - currentQuery = StripStringLeft(currentQuery); - bool isBlank = true; - for (auto c : currentQuery) { - if (c != ';') { - isBlank = false; - break; - } - }; - - if (isBlank) { - continue; - } - + for (const TString& currentQuery : statements) { TVector<NSQLTranslation::TParsedToken> comments; TParsedTokenList parsedTokens, stmtTokens; auto onNextRawToken = [&](NSQLTranslation::TParsedToken&& token) { diff --git a/yql/essentials/sql/v1/format/sql_format_ut.h b/yql/essentials/sql/v1/format/sql_format_ut.h index bd62ddf3687..a8fd6b7b3ff 100644 --- a/yql/essentials/sql/v1/format/sql_format_ut.h +++ b/yql/essentials/sql/v1/format/sql_format_ut.h @@ -383,6 +383,26 @@ Y_UNIT_TEST(AsyncReplication) { setup.Run(cases); } +Y_UNIT_TEST(Transfer) { + TCases cases = { + {"create transfer user from topic1 to table1 with (user='foo')", + "CREATE TRANSFER user FROM topic1 TO table1 WITH (user = 'foo');\n"}, + {"alter transfer user set (user='foo')", + "ALTER TRANSFER user SET (user = 'foo');\n"}, + {"drop transfer user", + "DROP TRANSFER user;\n"}, + {"drop transfer user cascade", + "DROP TRANSFER user CASCADE;\n"}, + {"create transfer user from topic1 to table1 using ($x) -> { $y = cast($x as String); return $y ; } with (user='foo')", + "CREATE TRANSFER user FROM topic1 TO table1 USING ($x) -> {\n $y = CAST($x AS String);\n RETURN $y;\n} WITH (user = 'foo');\n"}, + {"create transfer user from topic1 to table1 using $xxx with (user='foo')", + "CREATE TRANSFER user FROM topic1 TO table1 USING $xxx WITH (user = 'foo');\n"}, + }; + + TSetup setup; + setup.Run(cases); +} + Y_UNIT_TEST(ExternalTableOperations) { TCases cases = { {"creAte exTernAl TabLe usEr (a int) With (a = \"b\")", diff --git a/yql/essentials/sql/v1/lexer/lexer.cpp b/yql/essentials/sql/v1/lexer/lexer.cpp index 1d38ec3d8b0..2609e0f7f6f 100644 --- a/yql/essentials/sql/v1/lexer/lexer.cpp +++ b/yql/essentials/sql/v1/lexer/lexer.cpp @@ -8,8 +8,11 @@ #include <yql/essentials/parser/proto_ast/gen/v1_ansi/SQLv1Lexer.h> #include <yql/essentials/parser/proto_ast/gen/v1_antlr4/SQLv1Antlr4Lexer.h> #include <yql/essentials/parser/proto_ast/gen/v1_ansi_antlr4/SQLv1Antlr4Lexer.h> +#include <yql/essentials/sql/v1/sql.h> #include <util/string/ascii.h> +#include <util/string/builder.h> +#include <util/string/strip.h> #if defined(_tsan_enabled_) #include <util/system/mutex.h> @@ -80,4 +83,204 @@ bool IsProbablyKeyword(const NSQLTranslation::TParsedToken& token) { return AsciiEqualsIgnoreCase(token.Name, token.Content); } +using NSQLTranslation::TParsedTokenList; +using TTokenIterator = TParsedTokenList::const_iterator; + +namespace { + +enum EParenType { + Open, + Close, + None +}; + +using TAdvanceCallback = std::function<EParenType(TTokenIterator& curr, TTokenIterator end)>; + +TTokenIterator SkipWS(TTokenIterator curr, TTokenIterator end) { + while (curr != end && curr->Name == "WS") { + ++curr; + } + return curr; +} + +TTokenIterator SkipWSOrComment(TTokenIterator curr, TTokenIterator end) { + while (curr != end && (curr->Name == "WS" || curr->Name == "COMMENT")) { + ++curr; + } + return curr; +} + +TTokenIterator SkipToNextBalanced(TTokenIterator begin, TTokenIterator end, const TAdvanceCallback& advance) { + i64 level = 0; + TTokenIterator curr = begin; + while (curr != end) { + switch (advance(curr, end)) { + case EParenType::Open: { + ++level; + break; + } + case EParenType::Close: { + --level; + if (level < 0) { + return end; + } else if (level == 0) { + return curr; + } + break; + } + case EParenType::None: + break; + } + } + return curr; +} + +TTokenIterator GetNextStatementBegin(TTokenIterator begin, TTokenIterator end) { + TAdvanceCallback advanceLambdaBody = [](TTokenIterator& curr, TTokenIterator end) -> EParenType { + Y_UNUSED(end); + if (curr->Name == "LBRACE_CURLY") { + ++curr; + return EParenType::Open; + } else if (curr->Name == "RBRACE_CURLY") { + ++curr; + return EParenType::Close; + } else { + ++curr; + return EParenType::None; + } + }; + + TAdvanceCallback advanceAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType { + auto tmp = curr; + if (curr->Name == "DEFINE") { + ++curr; + curr = SkipWSOrComment(curr, end); + if (curr != end && (curr->Name == "ACTION" || curr->Name == "SUBQUERY")) { + ++curr; + return EParenType::Open; + } + } else if (curr->Name == "END") { + ++curr; + curr = SkipWSOrComment(curr, end); + if (curr != end && curr->Name == "DEFINE") { + ++curr; + return EParenType::Close; + } + } + + curr = tmp; + ++curr; + return EParenType::None; + }; + + TAdvanceCallback advanceInlineAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType { + auto tmp = curr; + if (curr->Name == "DO") { + ++curr; + curr = SkipWSOrComment(curr, end); + if (curr != end && curr->Name == "BEGIN") { + ++curr; + return EParenType::Open; + } + } else if (curr->Name == "END") { + ++curr; + curr = SkipWSOrComment(curr, end); + if (curr != end && curr->Name == "DO") { + ++curr; + return EParenType::Close; + } + } + + curr = tmp; + ++curr; + return EParenType::None; + }; + + TTokenIterator curr = begin; + while (curr != end) { + bool matched = false; + for (auto cb : {advanceLambdaBody, advanceAction, advanceInlineAction}) { + TTokenIterator tmp = curr; + if (cb(tmp, end) == EParenType::Open) { + curr = SkipToNextBalanced(curr, end, cb); + matched = true; + if (curr == end) { + return curr; + } + } + } + if (matched) { + continue; + } + if (curr->Name == "SEMICOLON") { + auto next = SkipWS(curr + 1, end); + while (next != end && next->Name == "COMMENT" && curr->Line == next->Line) { + curr = next; + next = SkipWS(next + 1, end); + } + ++curr; + break; + } + ++curr; + } + + return curr; +} + +void SplitByStatements(TTokenIterator begin, TTokenIterator end, TVector<TTokenIterator>& output) { + output.clear(); + if (begin == end) { + return; + } + output.push_back(begin); + auto curr = begin; + while (curr != end) { + curr = GetNextStatementBegin(curr, end); + output.push_back(curr); + } +} + +} + +bool SplitQueryToStatements(const TString& query, NSQLTranslation::ILexer::TPtr& lexer, TVector<TString>& statements, NYql::TIssues& issues) { + TParsedTokenList allTokens; + auto onNextToken = [&](NSQLTranslation::TParsedToken&& token) { + if (token.Name != "EOF") { + allTokens.push_back(token); + } + }; + + if (!lexer->Tokenize(query, "Query", onNextToken, issues, NSQLTranslation::SQL_MAX_PARSER_ERRORS)) { + return false; + } + + TVector<TTokenIterator> statementsTokens; + SplitByStatements(allTokens.begin(), allTokens.end(), statementsTokens); + + for (size_t i = 1; i < statementsTokens.size(); ++i) { + TStringBuilder currentQueryBuilder; + for (auto it = statementsTokens[i - 1]; it != statementsTokens[i]; ++it) { + currentQueryBuilder << it->Content; + } + TString statement = currentQueryBuilder; + statement = StripStringLeft(statement); + + bool isBlank = true; + for (auto c : statement) { + if (c != ';') { + isBlank = false; + break; + } + }; + + if (isBlank) { + continue; + } + + statements.push_back(statement); + } + + return true; +} + } // namespace NSQLTranslationV1 diff --git a/yql/essentials/sql/v1/lexer/lexer.h b/yql/essentials/sql/v1/lexer/lexer.h index 25bfe28f81f..d43efc6387f 100644 --- a/yql/essentials/sql/v1/lexer/lexer.h +++ b/yql/essentials/sql/v1/lexer/lexer.h @@ -12,4 +12,6 @@ NSQLTranslation::ILexer::TPtr MakeLexer(bool ansi, bool antlr4); // in SELECT * FROM ... GROUP BY ... - group is a keyword. bool IsProbablyKeyword(const NSQLTranslation::TParsedToken& token); +bool SplitQueryToStatements(const TString& query, NSQLTranslation::ILexer::TPtr& lexer, + TVector<TString>& statements, NYql::TIssues& issues); } diff --git a/yql/essentials/sql/v1/node.h b/yql/essentials/sql/v1/node.h index 609cd82dd48..e19245f695b 100644 --- a/yql/essentials/sql/v1/node.h +++ b/yql/essentials/sql/v1/node.h @@ -1548,6 +1548,14 @@ namespace NSQLTranslationV1 { std::map<TString, TNodePtr>&& settings, const TObjectOperatorContext& context); TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context); + TNodePtr BuildCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target, + const TString&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context); + TNodePtr BuildAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context); + TNodePtr BuildDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context); TNodePtr BuildWriteResult(TPosition pos, const TString& label, TNodePtr settings); TNodePtr BuildCommitClusters(TPosition pos); TNodePtr BuildRollbackClusters(TPosition pos); diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp index c74f96a27ca..a1323972c44 100644 --- a/yql/essentials/sql/v1/query.cpp +++ b/yql/essentials/sql/v1/query.cpp @@ -2628,6 +2628,159 @@ TNodePtr BuildAlterAsyncReplication(TPosition pos, const TString& id, return new TAlterAsyncReplication(pos, id, std::move(settings), context); } +class TTransfer + : public TAstListNode + , protected TObjectOperatorContext +{ +protected: + virtual INode::TPtr FillOptions(INode::TPtr options) const = 0; + +public: + explicit TTransfer(TPosition pos, const TString& id, const TString& mode, const TObjectOperatorContext& context) + : TAstListNode(pos) + , TObjectOperatorContext(context) + , Id(id) + , Mode(mode) + { + } + + bool DoInit(TContext& ctx, ISource* src) override { + Scoped->UseCluster(ServiceId, Cluster); + + auto keys = Y("Key", Q(Y(Q("transfer"), Y("String", BuildQuotedAtom(Pos, Id))))); + auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode))))); + + Add("block", Q(Y( + Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, ServiceId), Scoped->WrapCluster(Cluster, ctx))), + Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(options))), + Y("return", ctx.PragmaAutoCommit ? Y(TString(CommitName), "world", "sink") : AstNode("world")) + ))); + + return TAstListNode::DoInit(ctx, src); + } + + TPtr DoClone() const final { + return {}; + } + +private: + const TString Id; + const TString Mode; + +}; // TTransfer + +class TCreateTransfer final: public TTransfer { +public: + explicit TCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target, + const TString&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) + : TTransfer(pos, id, "create", context) + , Source(std::move(source)) + , Target(std::move(target)) + , TransformLambda(std::move(transformLambda)) + , Settings(std::move(settings)) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + options = L(options, Q(Y(Q("source"), Q(Source)))); + options = L(options, Q(Y(Q("target"), Q(Target)))); + options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda)))); + + if (!Settings.empty()) { + auto settings = Y(); + for (auto&& [k, v] : Settings) { + if (v) { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v))); + } else { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k)))); + } + } + options = L(options, Q(Y(Q("settings"), Q(settings)))); + } + + return options; + } + +private: + const TString Source; + const TString Target; + const TString TransformLambda; + std::map<TString, TNodePtr> Settings; + +}; // TCreateTransfer + +TNodePtr BuildCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target, + const TString&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) +{ + return new TCreateTransfer(pos, id, std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context); +} + +class TDropTransfer final: public TTransfer { +public: + explicit TDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) + : TTransfer(pos, id, cascade ? "dropCascade" : "drop", context) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + return options; + } + +}; // TDropTransfer + +TNodePtr BuildDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) { + return new TDropTransfer(pos, id, cascade, context); +} + +class TAlterTransfer final: public TTransfer { +public: + explicit TAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) + : TTransfer(pos, id, "alter", context) + , TransformLambda(std::move(transformLambda)) + , Settings(std::move(settings)) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda ? TransformLambda.value() : "")))); + + if (!Settings.empty()) { + auto settings = Y(); + for (auto&& [k, v] : Settings) { + if (v) { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v))); + } else { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k)))); + } + } + options = L(options, Q(Y(Q("settings"), Q(settings)))); + } + + return options; + } + +private: + const std::optional<TString> TransformLambda; + std::map<TString, TNodePtr> Settings; + +}; // TAlterTransfer + +TNodePtr BuildAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) +{ + return new TAlterTransfer(pos, id, std::move(transformLambda), std::move(settings), context); +} + static const TMap<EWriteColumnMode, TString> columnModeToStrMapMR { {EWriteColumnMode::Default, ""}, {EWriteColumnMode::Insert, "append"}, diff --git a/yql/essentials/sql/v1/sql.cpp b/yql/essentials/sql/v1/sql.cpp index 3e5dba78f3c..503f3e45da9 100644 --- a/yql/essentials/sql/v1/sql.cpp +++ b/yql/essentials/sql/v1/sql.cpp @@ -75,13 +75,21 @@ void SqlASTsToYqlsImpl(NYql::TAstParseResult& res, const std::vector<::NSQLv1Gen } } -NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, +NYql::TAstParseResult SqlASTToYql( + const google::protobuf::Message& protoAst, + const NSQLTranslation::TSQLHints& hints, + const NSQLTranslation::TTranslationSettings& settings) { + return SqlASTToYql("", protoAst, hints, settings); +} + +NYql::TAstParseResult SqlASTToYql(const TString& query, + const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings) { YQL_ENSURE(IsQueryMode(settings.Mode)); TAstParseResult res; - TContext ctx(settings, hints, res.Issues); + TContext ctx(settings, hints, res.Issues, query); SqlASTToYqlImpl(res, protoAst, ctx); res.ActualSyntaxType = NYql::ESyntaxType::YQLv1; return res; @@ -99,7 +107,7 @@ NYql::TAstParseResult SqlToYql(const TString& query, const NSQLTranslation::TTra return res; } - TContext ctx(settings, hints, res.Issues); + TContext ctx(settings, hints, res.Issues, query); NSQLTranslation::TErrorCollectorOverIssues collector(res.Issues, settings.MaxErrors, settings.File); google::protobuf::Message* ast(SqlAST(query, queryName, collector, settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena)); @@ -177,11 +185,14 @@ bool NeedUseForAllStatements(const TRule_sql_stmt_core::AltCase& subquery) { case TRule_sql_stmt_core::kAltSqlStmtCore55: // backup case TRule_sql_stmt_core::kAltSqlStmtCore56: // restore case TRule_sql_stmt_core::kAltSqlStmtCore57: // alter sequence + case TRule_sql_stmt_core::kAltSqlStmtCore58: // create transfer + case TRule_sql_stmt_core::kAltSqlStmtCore59: // alter transfer + case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer return false; } } -TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules, +TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& queryText, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules, TVector<NYql::TStmtParseInfo>* stmtParseInfo) { TVector<TAstParseResult> result; @@ -191,14 +202,14 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS NSQLTranslation::TSQLHints hints; auto lexer = MakeLexer(settings.AnsiLexer, settings.Antlr4Parser); YQL_ENSURE(lexer); - if (!CollectSqlHints(*lexer, query, queryName, settings.File, hints, issues, settings.MaxErrors, settings.Antlr4Parser)) { + if (!CollectSqlHints(*lexer, queryText, queryName, settings.File, hints, issues, settings.MaxErrors, settings.Antlr4Parser)) { return result; } TContext ctx(settings, hints, issues); NSQLTranslation::TErrorCollectorOverIssues collector(issues, settings.MaxErrors, settings.File); - google::protobuf::Message* astProto(SqlAST(query, queryName, collector, settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena)); + google::protobuf::Message* astProto(SqlAST(queryText, queryName, collector, settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena)); if (astProto) { auto ast = static_cast<const TSQLv1ParserAST&>(*astProto); const auto& query = ast.GetRule_sql_query(); @@ -209,7 +220,7 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS if (NeedUseForAllStatements(statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2().Alt_case())) { commonStates.push_back(statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2()); } else { - TContext ctx(settings, hints, issues); + TContext ctx(settings, hints, issues, queryText); result.emplace_back(); if (stmtParseInfo) { stmtParseInfo->push_back({}); @@ -223,7 +234,7 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS commonStates.push_back(block.GetRule_sql_stmt2().GetRule_sql_stmt_core2()); continue; } - TContext ctx(settings, hints, issues); + TContext ctx(settings, hints, issues, queryText); result.emplace_back(); if (stmtParseInfo) { stmtParseInfo->push_back({}); @@ -245,4 +256,28 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS return result; } +bool SplitQueryToStatements(const TString& query, TVector<TString>& statements, NYql::TIssues& issues, + const NSQLTranslation::TTranslationSettings& settings) { + auto lexer = NSQLTranslationV1::MakeLexer(settings.AnsiLexer, settings.Antlr4Parser); + + TVector<TString> parts; + if (!SplitQueryToStatements(query, lexer, parts, issues)) { + return false; + } + + for (auto& currentQuery : parts) { + NYql::TIssues parserIssues; + auto message = NSQLTranslationV1::SqlAST(currentQuery, "Query", parserIssues, NSQLTranslation::SQL_MAX_PARSER_ERRORS, + settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena); + if (!message) { + // Skip empty statements + continue; + } + + statements.push_back(std::move(currentQuery)); + } + + return true; +} + } // namespace NSQLTranslationV1 diff --git a/yql/essentials/sql/v1/sql.h b/yql/essentials/sql/v1/sql.h index 0a12c45d308..6e59aaf6914 100644 --- a/yql/essentials/sql/v1/sql.h +++ b/yql/essentials/sql/v1/sql.h @@ -3,6 +3,7 @@ #include <yql/essentials/ast/yql_ast.h> #include <yql/essentials/parser/lexer_common/hints.h> #include <yql/essentials/parser/proto_ast/common.h> +#include <yql/essentials/parser/proto_ast/gen/v1_proto_split/SQLv1Parser.pb.main.h> #include <yql/essentials/public/issue/yql_warning.h> #include <yql/essentials/public/issue/yql_issue_manager.h> #include <yql/essentials/sql/settings/translation_settings.h> @@ -16,7 +17,16 @@ namespace NSQLTranslation { namespace NSQLTranslationV1 { NYql::TAstParseResult SqlToYql(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules = nullptr); + + /*[[deprecated]] Use SqlASTToYql(query, protoAst, hints, settings)*/ NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings); + NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings); + TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules, TVector<NYql::TStmtParseInfo>* stmtParseInfo = nullptr); + bool NeedUseForAllStatements(const NSQLv1Generated::TRule_sql_stmt_core::AltCase& subquery); + + bool SplitQueryToStatements(const TString& query, TVector<TString>& statements, NYql::TIssues& issues, + const NSQLTranslation::TTranslationSettings& settings); + } // namespace NSQLTranslationV1 diff --git a/yql/essentials/sql/v1/sql_expression.cpp b/yql/essentials/sql/v1/sql_expression.cpp index a7a9877ccbd..199f15f2939 100644 --- a/yql/essentials/sql/v1/sql_expression.cpp +++ b/yql/essentials/sql/v1/sql_expression.cpp @@ -34,6 +34,31 @@ TNodePtr TSqlExpression::Build(const TRule_expr& node) { } } +TNodePtr TSqlExpression::Build(const TRule_lambda_or_parameter& node) { + // lambda_or_parameter: + // lambda + // | bind_parameter + switch (node.Alt_case()) { + case TRule_lambda_or_parameter::kAltLambdaOrParameter1: { + return LambdaRule(node.alt_lambda_or_parameter1().GetRule_lambda1()); + } + case TRule_lambda_or_parameter::kAltLambdaOrParameter2: { + TString named; + if (!NamedNodeImpl(node.GetAlt_lambda_or_parameter2().GetRule_bind_parameter1(), named, *this)) { + return nullptr; + } + auto namedNode = GetNamedNode(named); + if (!namedNode) { + return nullptr; + } + + return namedNode; + } + case TRule_lambda_or_parameter::ALT_NOT_SET: + Y_ABORT("You should change implementation according to grammar changes"); + } + } + TNodePtr TSqlExpression::SubExpr(const TRule_mul_subexpr& node, const TTrailingQuestions& tail) { // mul_subexpr: con_subexpr (DOUBLE_PIPE con_subexpr)*; auto getNode = [](const TRule_mul_subexpr::TBlock2& b) -> const TRule_con_subexpr& { return b.GetRule_con_subexpr2(); }; diff --git a/yql/essentials/sql/v1/sql_expression.h b/yql/essentials/sql/v1/sql_expression.h index 64b9dd8a690..4f9100722ca 100644 --- a/yql/essentials/sql/v1/sql_expression.h +++ b/yql/essentials/sql/v1/sql_expression.h @@ -22,6 +22,7 @@ public: } TNodePtr Build(const TRule_expr& node); + TNodePtr Build(const TRule_lambda_or_parameter& node); void SetSmartParenthesisMode(ESmartParenthesis mode) { SmartParenthesisMode = mode; diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index eb1a174b30c..a04ce659861 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -143,7 +143,68 @@ static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings, return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, false); } -bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) { +static bool TransferSettingsEntry(std::map<TString, TNodePtr>& out, + const TRule_transfer_settings_entry& in, TSqlExpression& ctx, bool create) +{ + auto key = IdEx(in.GetRule_an_id1(), ctx); + auto value = ctx.Build(in.GetRule_expr3()); + + if (!value) { + ctx.Context().Error() << "Invalid transfer setting: " << key.Name; + return false; + } + + TSet<TString> configSettings = { + "connection_string", + "endpoint", + "database", + "token", + "token_secret_name", + "user", + "password", + "password_secret_name", + }; + + TSet<TString> stateSettings = { + "state", + "failover_mode", + }; + + const auto keyName = to_lower(key.Name); + if (!configSettings.count(keyName) && !stateSettings.contains(keyName)) { + ctx.Context().Error() << "Unknown transfer setting: " << key.Name; + return false; + } + + if (create && stateSettings.count(keyName)) { + ctx.Context().Error() << key.Name << " is not supported in CREATE"; + return false; + } + + if (!out.emplace(keyName, value).second) { + ctx.Context().Error() << "Duplicate transfer setting: " << key.Name; + } + + return true; +} + +static bool TransferSettings(std::map<TString, TNodePtr>& out, + const TRule_transfer_settings& in, TSqlExpression& ctx, bool create) +{ + if (!TransferSettingsEntry(out, in.GetRule_transfer_settings_entry1(), ctx, create)) { + return false; + } + + for (auto& block : in.GetBlock2()) { + if (!TransferSettingsEntry(out, block.GetRule_transfer_settings_entry2(), ctx, create)) { + return false; + } + } + + return true; +} + +bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core, size_t statementNumber) { TString internalStatementName; TString humanStatementName; ParseStatementName(core, internalStatementName, humanStatementName); @@ -161,6 +222,10 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& return false; } + if (NeedUseForAllStatements(altCase)) { + Ctx.ForAllStatementsParts.push_back(statementNumber); + } + switch (altCase) { case TRule_sql_stmt_core::kAltSqlStmtCore1: { bool success = false; @@ -1763,6 +1828,103 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildAlterSequence(pos, service, cluster, id, params, Ctx.Scoped)); break; } + case TRule_sql_stmt_core::kAltSqlStmtCore58: { + // create_transfer_stmt: CREATE TRANSFER + + auto& node = core.GetAlt_sql_stmt_core58().GetRule_create_transfer_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref3().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + auto prefixPath = Ctx.GetPrefixPath(context.ServiceId, context.Cluster); + + std::map<TString, TNodePtr> settings; + TSqlExpression expr(Ctx, Mode); + if (!TransferSettings(settings, node.GetRule_transfer_settings11(), expr, true)) { + return false; + } + + const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + const TString source = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second; + const TString target = Id(node.GetRule_object_ref7().GetRule_id_or_at2(), *this).second; + TString transformLambda; + if (node.GetBlock8().HasRule_lambda_or_parameter2()) { + if (!ParseTransferLambda(transformLambda, node.GetBlock8().GetRule_lambda_or_parameter2())) { + return false; + } + } + + AddStatementToBlocks(blocks, BuildCreateTransfer(Ctx.Pos(), BuildTablePath(prefixPath, id), + std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore59: { + // alter_transfer_stmt: ALTER TRANSFER + auto& node = core.GetAlt_sql_stmt_core59().GetRule_alter_transfer_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref3().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + std::map<TString, TNodePtr> settings; + std::optional<TString> transformLambda; + TSqlExpression expr(Ctx, Mode); + + auto transferAlterAction = [&](std::optional<TString>& transformLambda, const TRule_alter_transfer_action& in) + { + if (in.HasAlt_alter_transfer_action1()) { + return TransferSettings(settings, in.GetAlt_alter_transfer_action1().GetRule_alter_transfer_set_setting1().GetRule_transfer_settings3(), expr, false); + } else if (in.HasAlt_alter_transfer_action2()) { + TString lb; + if (!ParseTransferLambda(lb, in.GetAlt_alter_transfer_action2().GetRule_alter_transfer_set_using1().GetRule_lambda_or_parameter3())) { + return false; + } + transformLambda = lb; + return true; + } + + return false; + }; + + if (!transferAlterAction(transformLambda, node.GetRule_alter_transfer_action4())) { + return false; + } + for (auto& block : node.GetBlock5()) { + if (!transferAlterAction(transformLambda, block.GetRule_alter_transfer_action2())) { + return false; + } + } + + const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildAlterTransfer(Ctx.Pos(), + BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id), + std::move(transformLambda), std::move(settings), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore60: { + // drop_transfer_stmt: DROP TRANSFER + auto& node = core.GetAlt_sql_stmt_core60().GetRule_drop_transfer_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref3().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildDropTransfer(Ctx.Pos(), + BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id), + node.HasBlock4(), context)); + break; + } case TRule_sql_stmt_core::ALT_NOT_SET: Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName); AltNotImplemented("sql_stmt_core", core); @@ -3471,12 +3633,13 @@ TNodePtr TSqlQuery::Build(const TSQLv1ParserAST& ast) { Ctx.PopCurrentBlocks(); }; if (query.Alt_case() == TRule_sql_query::kAltSqlQuery1) { + size_t statementNumber = 0; const auto& statements = query.GetAlt_sql_query1().GetRule_sql_stmt_list1(); - if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) { + if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) { return nullptr; } for (auto block: statements.GetBlock3()) { - if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) { + if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) { return nullptr; } } @@ -3546,8 +3709,10 @@ TNodePtr TSqlQuery::Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_co Y_DEFER { Ctx.PopCurrentBlocks(); }; + + size_t statementNumber = 0; for (const auto& statement : statements) { - if (!Statement(blocks, statement)) { + if (!Statement(blocks, statement, statementNumber++)) { return nullptr; } } diff --git a/yql/essentials/sql/v1/sql_query.h b/yql/essentials/sql/v1/sql_query.h index 03fd85df6b3..ea5b917f8f2 100644 --- a/yql/essentials/sql/v1/sql_query.h +++ b/yql/essentials/sql/v1/sql_query.h @@ -20,7 +20,7 @@ public: TNodePtr Build(const TSQLv1ParserAST& ast); TNodePtr Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_core>& ast); - bool Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core); + bool Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core, size_t statementNumber); private: bool DeclareStatement(const TRule_declare_stmt& stmt); bool ExportStatement(const TRule_export_stmt& stmt); diff --git a/yql/essentials/sql/v1/sql_translation.cpp b/yql/essentials/sql/v1/sql_translation.cpp index 9989295291a..a4c55cad71d 100644 --- a/yql/essentials/sql/v1/sql_translation.cpp +++ b/yql/essentials/sql/v1/sql_translation.cpp @@ -83,7 +83,7 @@ TNodePtr BuildViewSelect( const TString& contextRecreationQuery ) { TIssues issues; - TContext context(parentContext.Settings, {}, issues); + TContext context(parentContext.Settings, {}, issues, parentContext.Query); if (!RecreateContext(context, context.Settings, contextRecreationQuery)) { parentContext.Issues.AddIssues(issues); return nullptr; @@ -4651,13 +4651,15 @@ bool TSqlTranslation::DefineActionOrSubqueryBody(TSqlQuery& query, TBlocks& bloc Y_DEFER { Ctx.PopCurrentBlocks(); }; - if (!query.Statement(blocks, body.GetBlock2().GetRule_sql_stmt_core1())) { + + size_t statementNumber = 0; + if (!query.Statement(blocks, body.GetBlock2().GetRule_sql_stmt_core1(), statementNumber++)) { return false; } for (const auto& nestedStmtItem : body.GetBlock2().GetBlock2()) { const auto& nestedStmt = nestedStmtItem.GetRule_sql_stmt_core2(); - if (!query.Statement(blocks, nestedStmt)) { + if (!query.Statement(blocks, nestedStmt, statementNumber++)) { return false; } } @@ -5096,6 +5098,101 @@ bool TSqlTranslation::ParseViewQuery( return true; } +namespace { + +static std::string::size_type GetQueryPosition(const TString& query, const NSQLv1Generated::TToken& token, bool antlr4) { + if (1 == token.GetLine() && 0 == token.GetColumn()) { + return 0; + } + + TPosition pos = {0, 1}; + TTextWalker walker(pos, antlr4); + + std::string::size_type position = 0; + for (char c : query) { + walker.Advance(c); + ++position; + + if (pos.Row == token.GetLine() && pos.Column == token.GetColumn()) { + return position; + } + } + + return std::string::npos; +} + +static TString GetLambdaText(TTranslation& ctx, TContext& Ctx, const TRule_lambda_or_parameter& lambdaOrParameter) { + static const TString statementSeparator = ";\n"; + + TVector<TString> statements; + NYql::TIssues issues; + if (!SplitQueryToStatements(Ctx.Query, statements, issues, Ctx.Settings)) { + return {}; + } + + TStringBuilder result; + for (const auto id : Ctx.ForAllStatementsParts) { + result << statements[id] << "\n"; + } + + switch (lambdaOrParameter.Alt_case()) { + case NSQLv1Generated::TRule_lambda_or_parameter::kAltLambdaOrParameter1: { + const auto& lambda = lambdaOrParameter.GetAlt_lambda_or_parameter1().GetRule_lambda1(); + + auto& beginToken = lambda.GetRule_smart_parenthesis1().GetToken1(); + const NSQLv1Generated::TToken* endToken = nullptr; + switch (lambda.GetBlock2().GetBlock2().GetAltCase()) { + case TRule_lambda_TBlock2_TBlock2::AltCase::kAlt1: + endToken = &lambda.GetBlock2().GetBlock2().GetAlt1().GetToken3(); + break; + case TRule_lambda_TBlock2_TBlock2::AltCase::kAlt2: + endToken = &lambda.GetBlock2().GetBlock2().GetAlt2().GetToken3(); + break; + case TRule_lambda_TBlock2_TBlock2::AltCase::ALT_NOT_SET: + Y_ABORT("You should change implementation according to grammar changes"); + } + + auto begin = GetQueryPosition(Ctx.Query, beginToken, Ctx.Settings.Antlr4Parser); + auto end = GetQueryPosition(Ctx.Query, *endToken, Ctx.Settings.Antlr4Parser); + if (begin == std::string::npos || end == std::string::npos) { + return {}; + } + + result << "$__ydb_transfer_lambda = " << Ctx.Query.substr(begin, end - begin + endToken->value().size()) << statementSeparator; + + return result; + } + case NSQLv1Generated::TRule_lambda_or_parameter::kAltLambdaOrParameter2: { + const auto& valueBlock = lambdaOrParameter.GetAlt_lambda_or_parameter2().GetRule_bind_parameter1().GetBlock2(); + const auto id = Id(valueBlock.GetAlt1().GetRule_an_id_or_type1(), ctx); + result << "$__ydb_transfer_lambda = $" << id << statementSeparator; + return result; + } + case NSQLv1Generated::TRule_lambda_or_parameter::ALT_NOT_SET: + Y_ABORT("You should change implementation according to grammar changes"); + } +} + +} + +bool TSqlTranslation::ParseTransferLambda( + TString& lambdaText, + const TRule_lambda_or_parameter& lambdaOrParameter) { + + TSqlExpression expr(Ctx, Ctx.Settings.Mode); + auto result = expr.Build(lambdaOrParameter); + if (!result) { + return false; + } + + lambdaText = GetLambdaText(*this, Ctx, lambdaOrParameter); + if (lambdaText.empty()) { + Ctx.Error() << "Cannot parse lambda correctly"; + } + + return !lambdaText.empty(); +} + class TReturningListColumns : public INode { public: TReturningListColumns(TPosition pos) diff --git a/yql/essentials/sql/v1/sql_translation.h b/yql/essentials/sql/v1/sql_translation.h index 75e9c2f7d48..74b78ae3306 100644 --- a/yql/essentials/sql/v1/sql_translation.h +++ b/yql/essentials/sql/v1/sql_translation.h @@ -263,6 +263,7 @@ protected: TVector<TDeferredAtom>& addTables, TVector<TDeferredAtom>& removeTables, const TRule_alter_backup_collection_entries& entries); + bool ParseTransferLambda(TString& lambdaText, const TRule_lambda_or_parameter& lambdaOrParameter); bool ValidateAuthMethod(const std::map<TString, TDeferredAtom>& result); bool ValidateExternalTable(const TCreateTableParameters& params); diff --git a/yql/essentials/sql/v1/sql_ut.cpp b/yql/essentials/sql/v1/sql_ut.cpp index e7b30928c32..9baf35f8f4e 100644 --- a/yql/essentials/sql/v1/sql_ut.cpp +++ b/yql/essentials/sql/v1/sql_ut.cpp @@ -4,6 +4,7 @@ #include <yql/essentials/providers/common/provider/yql_provider_names.h> #include <yql/essentials/sql/sql.h> +#include <yql/essentials/sql/v1/sql.h> #include <util/generic/map.h> #include <library/cpp/testing/unittest/registar.h> @@ -287,6 +288,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT(SqlToYql("USE plato; SELECT REPLICATION FROM REPLICATION").IsOk()); } + Y_UNIT_TEST(TransferKeywordNotReservedForNames) { + UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE TRANSFER (TRANSFER Uint32, PRIMARY KEY (TRANSFER));").IsOk()); + UNIT_ASSERT(SqlToYql("USE plato; SELECT TRANSFER FROM TRANSFER").IsOk()); + } + Y_UNIT_TEST(SecondsKeywordNotReservedForNames) { UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE SECONDS (SECONDS Uint32, PRIMARY KEY (SECONDS));").IsOk()); UNIT_ASSERT(SqlToYql("USE plato; SELECT SECONDS FROM SECONDS").IsOk()); @@ -8056,3 +8062,60 @@ Y_UNIT_TEST_SUITE(ColumnFamily) { UNIT_ASSERT_STRING_CONTAINS(res.Issues.ToString(), "COMPRESSION_LEVEL value should be an integer"); } } + +Y_UNIT_TEST_SUITE(QuerySplit) { + Y_UNIT_TEST(Simple) { + TString query = R"( + ; + -- Comment 1 + SELECT * From Input; -- Comment 2 + -- Comment 3 + $a = "a"; + + -- Comment 9 + ; + + -- Comment 10 + + -- Comment 8 + + $b = ($x) -> { + -- comment 4 + return /* Comment 5 */ $x; + -- Comment 6 + }; + + // Comment 7 + + + + )"; + + google::protobuf::Arena Arena; + + NSQLTranslation::TTranslationSettings settings; + settings.AnsiLexer = false; + settings.Antlr4Parser = true; + settings.Arena = &Arena; + + TVector<TString> statements; + NYql::TIssues issues; + + UNIT_ASSERT(NSQLTranslationV1::SplitQueryToStatements(query, statements, issues, settings)); + + UNIT_ASSERT_VALUES_EQUAL(statements.size(), 3); + + UNIT_ASSERT_VALUES_EQUAL(statements[0], "-- Comment 1\n SELECT * From Input; -- Comment 2\n"); + UNIT_ASSERT_VALUES_EQUAL(statements[1], R"(-- Comment 3 + $a = "a";)"); + UNIT_ASSERT_VALUES_EQUAL(statements[2], R"(-- Comment 10 + + -- Comment 8 + + $b = ($x) -> { + -- comment 4 + return /* Comment 5 */ $x; + -- Comment 6 + };)"); + } +} diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.cpp b/yql/essentials/sql/v1/sql_ut_antlr4.cpp index 5337f573510..05e85605ef4 100644 --- a/yql/essentials/sql/v1/sql_ut_antlr4.cpp +++ b/yql/essentials/sql/v1/sql_ut_antlr4.cpp @@ -287,6 +287,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT(SqlToYql("USE plato; SELECT REPLICATION FROM REPLICATION").IsOk()); } + Y_UNIT_TEST(TransferKeywordNotReservedForNames) { + UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE TRANSFER (TRANSFER Uint32, PRIMARY KEY (TRANSFER));").IsOk()); + UNIT_ASSERT(SqlToYql("USE plato; SELECT TRANSFER FROM TRANSFER").IsOk()); + } + Y_UNIT_TEST(SecondsKeywordNotReservedForNames) { UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE SECONDS (SECONDS Uint32, PRIMARY KEY (SECONDS));").IsOk()); UNIT_ASSERT(SqlToYql("USE plato; SELECT SECONDS FROM SECONDS").IsOk()); @@ -8028,3 +8033,48 @@ Y_UNIT_TEST_SUITE(ColumnFamily) { UNIT_ASSERT_STRING_CONTAINS(res.Issues.ToString(), "COMPRESSION_LEVEL value should be an integer"); } } + +Y_UNIT_TEST_SUITE(Transfer) { + Y_UNIT_TEST(Lambda) { + NYql::TAstParseResult res = SqlToYql(R"( use plato; + -- Русский коммент, empty statement + ; + + -- befor comment + $a = "А"; + + SELECT * FROM Input; + + $b = ($x) -> { return $a || $x; }; + + CREATE TRANSFER `TransferName` + FROM `TopicName` TO `TableName` + USING ($x) -> { + -- internal comment + return $b($x); + } + WITH ( + CONNECTION_STRING = "grpc://localhost:2135/?database=/Root" + ); + )"); + + UNIT_ASSERT_C(res.IsOk(), res.Issues.ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(res.Issues.Size(), 0, res.Issues.ToString()); + + const auto programm = GetPrettyPrint(res); + + Cerr << ">>>>> Root " << programm << Endl; + auto expected = R"('transformLambda 'use plato; +-- befor comment + $a = "А"; +$b = ($x) -> { return $a || $x; }; +$__ydb_transfer_lambda = ($x) -> { + -- internal comment + return $b($x); + }; +))"; + + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, programm.find(expected)); + + } +} diff --git a/yql/essentials/tools/sql2yql/sql2yql.cpp b/yql/essentials/tools/sql2yql/sql2yql.cpp index dd313b451cb..ba30d95e4d2 100644 --- a/yql/essentials/tools/sql2yql/sql2yql.cpp +++ b/yql/essentials/tools/sql2yql/sql2yql.cpp @@ -317,7 +317,7 @@ int BuildAST(int argc, char* argv[]) { auto lexer = SqlLexer(query, parseRes.Issues, settings); if (lexer && CollectSqlHints(*lexer, query, queryFile, settings.File, hints, parseRes.Issues, settings.MaxErrors, settings.Antlr4Parser)) { - parseRes = NSQLTranslation::SqlASTToYql(*ast, hints, settings); + parseRes = NSQLTranslation::SqlASTToYql(query, *ast, hints, settings); } } } else { |