summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <[email protected]>2025-01-21 12:50:29 +0300
committertesseract <[email protected]>2025-01-21 14:32:19 +0300
commite677409ecb6106695a976307290b2f6bad3d72c0 (patch)
tree7c4fe8c7334a8f814506c857a08322ea800a8b79
parente2324a4c7934ecbc80eb47f70d2586c4995499b5 (diff)
YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb
-rw-r--r--yql/essentials/providers/common/provider/yql_provider.cpp60
-rw-r--r--yql/essentials/providers/common/provider/yql_provider.h14
-rw-r--r--yql/essentials/sql/sql.cpp6
-rw-r--r--yql/essentials/sql/sql.h4
-rw-r--r--yql/essentials/sql/v1/SQLv1.g.in29
-rw-r--r--yql/essentials/sql/v1/SQLv1Antlr4.g.in30
-rw-r--r--yql/essentials/sql/v1/context.cpp4
-rw-r--r--yql/essentials/sql/v1/context.h5
-rw-r--r--yql/essentials/sql/v1/format/sql_format.cpp190
-rw-r--r--yql/essentials/sql/v1/format/sql_format_ut.h20
-rw-r--r--yql/essentials/sql/v1/lexer/lexer.cpp203
-rw-r--r--yql/essentials/sql/v1/lexer/lexer.h2
-rw-r--r--yql/essentials/sql/v1/node.h8
-rw-r--r--yql/essentials/sql/v1/query.cpp153
-rw-r--r--yql/essentials/sql/v1/sql.cpp51
-rw-r--r--yql/essentials/sql/v1/sql.h10
-rw-r--r--yql/essentials/sql/v1/sql_expression.cpp25
-rw-r--r--yql/essentials/sql/v1/sql_expression.h1
-rw-r--r--yql/essentials/sql/v1/sql_query.cpp173
-rw-r--r--yql/essentials/sql/v1/sql_query.h2
-rw-r--r--yql/essentials/sql/v1/sql_translation.cpp103
-rw-r--r--yql/essentials/sql/v1/sql_translation.h1
-rw-r--r--yql/essentials/sql/v1/sql_ut.cpp63
-rw-r--r--yql/essentials/sql/v1/sql_ut_antlr4.cpp50
-rw-r--r--yql/essentials/tools/sql2yql/sql2yql.cpp2
25 files changed, 1018 insertions, 191 deletions
diff --git a/yql/essentials/providers/common/provider/yql_provider.cpp b/yql/essentials/providers/common/provider/yql_provider.cpp
index 79156cd8d37..0b7a1ef6609 100644
--- a/yql/essentials/providers/common/provider/yql_provider.cpp
+++ b/yql/essentials/providers/common/provider/yql_provider.cpp
@@ -604,6 +604,60 @@ TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprCon
return ret;
}
+TWriteTransferSettings ParseWriteTransferSettings(TExprList node, TExprContext& ctx) {
+ TMaybeNode<TCoAtom> mode;
+ TMaybeNode<TCoAtom> source;
+ TMaybeNode<TCoAtom> target;
+ TMaybeNode<TCoAtom> transformLambda;
+ TVector<TCoNameValueTuple> settings;
+ TVector<TCoNameValueTuple> other;
+
+ for (auto child : node) {
+ if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
+ auto tuple = maybeTuple.Cast();
+ auto name = tuple.Name().Value();
+
+ if (name == "mode") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ mode = tuple.Value().Cast<TCoAtom>();
+ } else if (name == "source") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ source = tuple.Value().Cast<TCoAtom>();
+ } else if (name == "target") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ target = tuple.Value().Cast<TCoAtom>();
+ } else if (name == "transformLambda") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ transformLambda = tuple.Value().Cast<TCoAtom>();
+ } else if (name == "settings") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
+ for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
+ settings.push_back(item);
+ }
+ } else {
+ other.push_back(tuple);
+ }
+ }
+ }
+
+ const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
+ .Add(settings)
+ .Done();
+
+ const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
+ .Add(other)
+ .Done();
+
+ TWriteTransferSettings ret(builtOther);
+ ret.Mode = mode;
+ ret.Source = source;
+ ret.Target = target;
+ ret.TransformLambda = transformLambda;
+ ret.TransferSettings = builtSettings;
+
+ return ret;
+}
+
TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) {
TMaybeNode<TCoAtom> mode;
TVector<TCoAtom> roles;
@@ -858,7 +912,7 @@ bool FillUsedFilesImpl(
if (node.GetTypeAnn()) {
usedPgExtensions |= node.GetTypeAnn()->GetUsedPgExtensions();
}
-
+
if (node.IsCallable("PgResolvedCall")) {
auto procId = FromString<ui32>(node.Child(1)->Content());
const auto& proc = NPg::LookupProc(procId);
@@ -1072,7 +1126,7 @@ void FillSecureParams(
}
}
-bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files,
+bool AddPgFile(bool isPath, const TString& pathOrContent, const TString& md5, const TString& alias, TUserDataTable& files,
const TTypeAnnotationContext& types, TPositionHandle pos, TExprContext& ctx) {
TUserDataBlock block;
@@ -1140,7 +1194,7 @@ bool FillUsedFiles(
return false;
}
}
-
+
Y_ENSURE(remainingPgExtensions == 0);
if (!needFullPgCatalog) {
return true;
diff --git a/yql/essentials/providers/common/provider/yql_provider.h b/yql/essentials/providers/common/provider/yql_provider.h
index 978621d4b16..ea9992dd5e2 100644
--- a/yql/essentials/providers/common/provider/yql_provider.h
+++ b/yql/essentials/providers/common/provider/yql_provider.h
@@ -97,6 +97,19 @@ struct TWriteReplicationSettings {
{}
};
+struct TWriteTransferSettings {
+ NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
+ NNodes::TMaybeNode<NNodes::TCoAtom> Source;
+ NNodes::TMaybeNode<NNodes::TCoAtom> Target;
+ NNodes::TMaybeNode<NNodes::TCoAtom> TransformLambda;
+ NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TransferSettings;
+ NNodes::TCoNameValueTupleList Other;
+
+ TWriteTransferSettings(const NNodes::TCoNameValueTupleList& other)
+ : Other(other)
+ {}
+};
+
struct TWriteRoleSettings {
NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
NNodes::TMaybeNode<NNodes::TCoAtomList> Roles;
@@ -168,6 +181,7 @@ TVector<TString> GetResOrPullColumnHints(const TExprNode& node);
TWriteTableSettings ParseWriteTableSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteTopicSettings ParseWriteTopicSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteReplicationSettings ParseWriteReplicationSettings(NNodes::TExprList node, TExprContext& ctx);
+TWriteTransferSettings ParseWriteTransferSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteRoleSettings ParseWriteRoleSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteObjectSettings ParseWriteObjectSettings(NNodes::TExprList node, TExprContext& ctx);
diff --git a/yql/essentials/sql/sql.cpp b/yql/essentials/sql/sql.cpp
index 55409d5ba3f..6ecd365a906 100644
--- a/yql/essentials/sql/sql.cpp
+++ b/yql/essentials/sql/sql.cpp
@@ -144,6 +144,10 @@ namespace NSQLTranslation {
}
NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings) {
+ return SqlASTToYql("", protoAst, hints, settings);
+ }
+
+ NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings) {
NYql::TAstParseResult result;
switch (settings.SyntaxVersion) {
case 0:
@@ -161,7 +165,7 @@ namespace NSQLTranslation {
return NSQLTranslationV0::SqlASTToYql(protoAst, settings);
case 1:
- return NSQLTranslationV1::SqlASTToYql(protoAst, hints, settings);
+ return NSQLTranslationV1::SqlASTToYql(query, protoAst, hints, settings);
default:
result.Issues.AddIssue(NYql::YqlIssue(NYql::TPosition(), NYql::TIssuesIds::DEFAULT_ERROR,
TStringBuilder() << "Unknown SQL syntax version: " << settings.SyntaxVersion));
diff --git a/yql/essentials/sql/sql.h b/yql/essentials/sql/sql.h
index 26a61b22313..f0f844c88a1 100644
--- a/yql/essentials/sql/sql.h
+++ b/yql/essentials/sql/sql.h
@@ -21,7 +21,11 @@ namespace NSQLTranslation {
google::protobuf::Message* SqlAST(const TString& query, const TString& queryName, NYql::TIssues& issues, size_t maxErrors,
const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr);
ILexer::TPtr SqlLexer(const TString& query, NYql::TIssues& issues, const TTranslationSettings& settings = {}, ui16* actualSyntaxVersion = nullptr);
+
+ /*[[deprecated]] Use SqlASTToYql(query, protoAst, hints, settings)*/
NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings);
+ NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const TSQLHints& hints, const TTranslationSettings& settings);
+
TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const TTranslationSettings& settings,
NYql::TWarningRules* warningRules = nullptr, ui16* actualSyntaxVersion = nullptr, TVector<NYql::TStmtParseInfo>* stmtParseInfo = nullptr);
diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in
index 192a1a6026f..510afaa501e 100644
--- a/yql/essentials/sql/v1/SQLv1.g.in
+++ b/yql/essentials/sql/v1/SQLv1.g.in
@@ -76,6 +76,9 @@ sql_stmt_core:
| backup_stmt
| restore_stmt
| alter_sequence_stmt
+ | create_transfer_stmt
+ | alter_transfer_stmt
+ | drop_transfer_stmt
;
expr:
@@ -906,6 +909,29 @@ alter_replication_set_setting: SET LPAREN replication_settings RPAREN;
drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?;
+create_transfer_stmt: CREATE TRANSFER object_ref
+ FROM object_ref TO object_ref (USING lambda_or_parameter)?
+ WITH LPAREN transfer_settings RPAREN
+;
+
+lambda_or_parameter:
+ lambda
+ | bind_parameter
+;
+transfer_settings: transfer_settings_entry (COMMA transfer_settings_entry)*;
+transfer_settings_entry: an_id EQUALS expr;
+
+alter_transfer_stmt: ALTER TRANSFER object_ref alter_transfer_action (COMMA alter_transfer_action)*;
+alter_transfer_action:
+ alter_transfer_set_setting
+ | alter_transfer_set_using
+;
+
+alter_transfer_set_setting: SET LPAREN transfer_settings RPAREN;
+alter_transfer_set_using: SET USING lambda_or_parameter;
+
+drop_transfer_stmt: DROP TRANSFER object_ref CASCADE?;
+
action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*;
define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE;
@@ -1466,6 +1492,7 @@ keyword_as_compat:
| TO
| TOPIC
| TRANSACTION
+ | TRANSFER
| TRIGGER
| TYPE
| UNCONDITIONAL
@@ -1693,6 +1720,7 @@ keyword_compat: (
| TO
| TOPIC
| TRANSACTION
+ | TRANSFER
| TRIGGER
| TYPE
| UNCONDITIONAL
@@ -2072,6 +2100,7 @@ TIES: T I E S;
TO: T O;
TOPIC: T O P I C;
TRANSACTION: T R A N S A C T I O N;
+TRANSFER: T R A N S F E R;
TRIGGER: T R I G G E R;
TRUE: T R U E;
TUPLE: T U P L E;
diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
index 3ae4ad6492d..2ddfd0c748c 100644
--- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in
+++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
@@ -75,6 +75,9 @@ sql_stmt_core:
| backup_stmt
| restore_stmt
| alter_sequence_stmt
+ | create_transfer_stmt
+ | alter_transfer_stmt
+ | drop_transfer_stmt
;
expr:
@@ -905,6 +908,30 @@ alter_replication_set_setting: SET LPAREN replication_settings RPAREN;
drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?;
+lambda_or_parameter:
+ lambda
+ | bind_parameter
+;
+
+create_transfer_stmt: CREATE TRANSFER object_ref
+ FROM object_ref TO object_ref (USING lambda_or_parameter)?
+ WITH LPAREN transfer_settings RPAREN
+;
+
+transfer_settings: transfer_settings_entry (COMMA transfer_settings_entry)*;
+transfer_settings_entry: an_id EQUALS expr;
+
+alter_transfer_stmt: ALTER TRANSFER object_ref alter_transfer_action (COMMA alter_transfer_action)*;
+alter_transfer_action:
+ alter_transfer_set_setting
+ | alter_transfer_set_using
+;
+
+alter_transfer_set_setting: SET LPAREN transfer_settings RPAREN;
+alter_transfer_set_using: SET USING lambda_or_parameter;
+
+drop_transfer_stmt: DROP TRANSFER object_ref CASCADE?;
+
action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*;
define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE;
@@ -1465,6 +1492,7 @@ keyword_as_compat:
| TO
| TOPIC
| TRANSACTION
+ | TRANSFER
| TRIGGER
| TYPE
| UNCONDITIONAL
@@ -1692,6 +1720,7 @@ keyword_compat: (
| TO
| TOPIC
| TRANSACTION
+ | TRANSFER
| TRIGGER
| TYPE
| UNCONDITIONAL
@@ -2071,6 +2100,7 @@ TIES: T I E S;
TO: T O;
TOPIC: T O P I C;
TRANSACTION: T R A N S A C T I O N;
+TRANSFER: T R A N S F E R;
TRIGGER: T R I G G E R;
TRUE: T R U E;
TUPLE: T U P L E;
diff --git a/yql/essentials/sql/v1/context.cpp b/yql/essentials/sql/v1/context.cpp
index 43afc53c38b..d28c31469b4 100644
--- a/yql/essentials/sql/v1/context.cpp
+++ b/yql/essentials/sql/v1/context.cpp
@@ -83,12 +83,14 @@ THashMap<TStringBuf, TPragmaMaybeField> CTX_PRAGMA_MAYBE_FIELDS = {
TContext::TContext(const NSQLTranslation::TTranslationSettings& settings,
const NSQLTranslation::TSQLHints& hints,
- TIssues& issues)
+ TIssues& issues,
+ const TString& query)
: ClusterMapping(settings.ClusterMapping)
, PathPrefix(settings.PathPrefix)
, ClusterPathPrefixes(settings.ClusterPathPrefixes)
, SQLHints(hints)
, Settings(settings)
+ , Query(query)
, Pool(new TMemoryPool(4096))
, Issues(issues)
, IncrementMonCounterFunction(settings.IncrementCounter)
diff --git a/yql/essentials/sql/v1/context.h b/yql/essentials/sql/v1/context.h
index a5001fd157c..85a4739eaf5 100644
--- a/yql/essentials/sql/v1/context.h
+++ b/yql/essentials/sql/v1/context.h
@@ -92,7 +92,8 @@ namespace NSQLTranslationV1 {
public:
TContext(const NSQLTranslation::TTranslationSettings& settings,
const NSQLTranslation::TSQLHints& hints,
- NYql::TIssues& issues);
+ NYql::TIssues& issues,
+ const TString& query = {});
virtual ~TContext();
@@ -237,6 +238,7 @@ namespace NSQLTranslationV1 {
THashMap<TString, std::pair<TPosition, TNodePtr>> Variables;
THashSet<TString> WeakVariables;
NSQLTranslation::TTranslationSettings Settings;
+ const TString Query;
std::unique_ptr<TMemoryPool> Pool;
NYql::TIssues& Issues;
TMap<TString, TNodePtr> UniversalAliases;
@@ -328,6 +330,7 @@ namespace NSQLTranslationV1 {
bool DistinctOverWindow = false;
bool SeqMode = false;
bool EmitUnionMerge = false;
+ TVector<size_t> ForAllStatementsParts;
};
class TColumnRefScope {
diff --git a/yql/essentials/sql/v1/format/sql_format.cpp b/yql/essentials/sql/v1/format/sql_format.cpp
index b6986310aa1..26d7eb9287b 100644
--- a/yql/essentials/sql/v1/format/sql_format.cpp
+++ b/yql/essentials/sql/v1/format/sql_format.cpp
@@ -165,144 +165,6 @@ bool Validate(const TParsedTokenList& query, const TParsedTokenList& formattedQu
return in == inEnd && out == outEnd && parenthesesBalance == 0;
}
-enum EParenType {
- Open,
- Close,
- None
-};
-
-using TAdvanceCallback = std::function<EParenType(TTokenIterator& curr, TTokenIterator end)>;
-
-TTokenIterator SkipToNextBalanced(TTokenIterator begin, TTokenIterator end, const TAdvanceCallback& advance) {
- i64 level = 0;
- TTokenIterator curr = begin;
- while (curr != end) {
- switch (advance(curr, end)) {
- case EParenType::Open: {
- ++level;
- break;
- }
- case EParenType::Close: {
- --level;
- if (level < 0) {
- return end;
- } else if (level == 0) {
- return curr;
- }
- break;
- }
- case EParenType::None:
- break;
- }
- }
- return curr;
-}
-
-TTokenIterator GetNextStatementBegin(TTokenIterator begin, TTokenIterator end) {
- TAdvanceCallback advanceLambdaBody = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
- Y_UNUSED(end);
- if (curr->Name == "LBRACE_CURLY") {
- ++curr;
- return EParenType::Open;
- } else if (curr->Name == "RBRACE_CURLY") {
- ++curr;
- return EParenType::Close;
- } else {
- ++curr;
- return EParenType::None;
- }
- };
-
- TAdvanceCallback advanceAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
- auto tmp = curr;
- if (curr->Name == "DEFINE") {
- ++curr;
- curr = SkipWSOrComment(curr, end);
- if (curr != end && (curr->Name == "ACTION" || curr->Name == "SUBQUERY")) {
- ++curr;
- return EParenType::Open;
- }
- } else if (curr->Name == "END") {
- ++curr;
- curr = SkipWSOrComment(curr, end);
- if (curr != end && curr->Name == "DEFINE") {
- ++curr;
- return EParenType::Close;
- }
- }
-
- curr = tmp;
- ++curr;
- return EParenType::None;
- };
-
- TAdvanceCallback advanceInlineAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
- auto tmp = curr;
- if (curr->Name == "DO") {
- ++curr;
- curr = SkipWSOrComment(curr, end);
- if (curr != end && curr->Name == "BEGIN") {
- ++curr;
- return EParenType::Open;
- }
- } else if (curr->Name == "END") {
- ++curr;
- curr = SkipWSOrComment(curr, end);
- if (curr != end && curr->Name == "DO") {
- ++curr;
- return EParenType::Close;
- }
- }
-
- curr = tmp;
- ++curr;
- return EParenType::None;
- };
-
- TTokenIterator curr = begin;
- while (curr != end) {
- bool matched = false;
- for (auto cb : {advanceLambdaBody, advanceAction, advanceInlineAction}) {
- TTokenIterator tmp = curr;
- if (cb(tmp, end) == EParenType::Open) {
- curr = SkipToNextBalanced(curr, end, cb);
- matched = true;
- if (curr == end) {
- return curr;
- }
- }
- }
- if (matched) {
- continue;
- }
- if (curr->Name == "SEMICOLON") {
- auto next = SkipWS(curr + 1, end);
- while (next != end && next->Name == "COMMENT" && curr->Line == next->Line) {
- curr = next;
- next = SkipWS(next + 1, end);
- }
- ++curr;
- break;
- }
- ++curr;
- }
-
- return curr;
-}
-
-void SplitByStatements(TTokenIterator begin, TTokenIterator end, TVector<TTokenIterator>& output) {
- output.clear();
- if (begin == end) {
- return;
- }
- output.push_back(begin);
- auto curr = begin;
- while (curr != end) {
- curr = GetNextStatementBegin(curr, end);
- output.push_back(curr);
- }
-}
-
enum class EScope {
Default,
TypeName,
@@ -833,6 +695,7 @@ private:
case TRule_sql_stmt_core::kAltSqlStmtCore14: // export
case TRule_sql_stmt_core::kAltSqlStmtCore32: // drop external data source
case TRule_sql_stmt_core::kAltSqlStmtCore34: // drop replication
+ case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer
return true;
case TRule_sql_stmt_core::kAltSqlStmtCore3: { // named nodes
const auto& stmt = msg.GetAlt_sql_stmt_core3().GetRule_named_nodes_stmt1();
@@ -1541,6 +1404,21 @@ private:
VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg);
}
+ void VisitCreateTransfer(const TRule_create_transfer_stmt& msg) {
+ NewLine();
+ VisitAllFields(TRule_create_transfer_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitAlterTransfer(const TRule_alter_transfer_stmt& msg) {
+ NewLine();
+ VisitAllFields(TRule_alter_transfer_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitDropTransfer(const TRule_drop_transfer_stmt& msg) {
+ NewLine();
+ VisitAllFields(TRule_drop_transfer_stmt::GetDescriptor(), msg);
+ }
+
void VisitCreateResourcePool(const TRule_create_resource_pool_stmt& msg) {
NewLine();
VisitAllFields(TRule_create_resource_pool_stmt::GetDescriptor(), msg);
@@ -3107,6 +2985,9 @@ TStaticData::TStaticData()
{TRule_create_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateAsyncReplication)},
{TRule_alter_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterAsyncReplication)},
{TRule_drop_replication_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropAsyncReplication)},
+ {TRule_create_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateTransfer)},
+ {TRule_alter_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTransfer)},
+ {TRule_drop_transfer_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropTransfer)},
{TRule_create_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateTopic)},
{TRule_alter_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTopic)},
{TRule_drop_topic_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropTopic)},
@@ -3188,42 +3069,15 @@ public:
}
auto lexer = NSQLTranslationV1::MakeLexer(parsedSettings.AnsiLexer, parsedSettings.Antlr4Parser);
- TParsedTokenList allTokens;
- auto onNextToken = [&](NSQLTranslation::TParsedToken&& token) {
- if (token.Name != "EOF") {
- allTokens.push_back(token);
- }
- };
-
- if (!lexer->Tokenize(query, "Query", onNextToken, issues, NSQLTranslation::SQL_MAX_PARSER_ERRORS)) {
+ TVector<TString> statements;
+ if (!NSQLTranslationV1::SplitQueryToStatements(query, lexer, statements, issues)) {
return false;
}
- TVector<TTokenIterator> statements;
- SplitByStatements(allTokens.begin(), allTokens.end(), statements);
TStringBuilder finalFormattedQuery;
bool prevAddLine = false;
TMaybe<ui32> prevStmtCoreAltCase;
- for (size_t i = 1; i < statements.size(); ++i) {
- TStringBuilder currentQueryBuilder;
- for (auto it = statements[i - 1]; it != statements[i]; ++it) {
- currentQueryBuilder << it->Content;
- }
-
- TString currentQuery = currentQueryBuilder;
- currentQuery = StripStringLeft(currentQuery);
- bool isBlank = true;
- for (auto c : currentQuery) {
- if (c != ';') {
- isBlank = false;
- break;
- }
- };
-
- if (isBlank) {
- continue;
- }
-
+ for (const TString& currentQuery : statements) {
TVector<NSQLTranslation::TParsedToken> comments;
TParsedTokenList parsedTokens, stmtTokens;
auto onNextRawToken = [&](NSQLTranslation::TParsedToken&& token) {
diff --git a/yql/essentials/sql/v1/format/sql_format_ut.h b/yql/essentials/sql/v1/format/sql_format_ut.h
index bd62ddf3687..a8fd6b7b3ff 100644
--- a/yql/essentials/sql/v1/format/sql_format_ut.h
+++ b/yql/essentials/sql/v1/format/sql_format_ut.h
@@ -383,6 +383,26 @@ Y_UNIT_TEST(AsyncReplication) {
setup.Run(cases);
}
+Y_UNIT_TEST(Transfer) {
+ TCases cases = {
+ {"create transfer user from topic1 to table1 with (user='foo')",
+ "CREATE TRANSFER user FROM topic1 TO table1 WITH (user = 'foo');\n"},
+ {"alter transfer user set (user='foo')",
+ "ALTER TRANSFER user SET (user = 'foo');\n"},
+ {"drop transfer user",
+ "DROP TRANSFER user;\n"},
+ {"drop transfer user cascade",
+ "DROP TRANSFER user CASCADE;\n"},
+ {"create transfer user from topic1 to table1 using ($x) -> { $y = cast($x as String); return $y ; } with (user='foo')",
+ "CREATE TRANSFER user FROM topic1 TO table1 USING ($x) -> {\n $y = CAST($x AS String);\n RETURN $y;\n} WITH (user = 'foo');\n"},
+ {"create transfer user from topic1 to table1 using $xxx with (user='foo')",
+ "CREATE TRANSFER user FROM topic1 TO table1 USING $xxx WITH (user = 'foo');\n"},
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+}
+
Y_UNIT_TEST(ExternalTableOperations) {
TCases cases = {
{"creAte exTernAl TabLe usEr (a int) With (a = \"b\")",
diff --git a/yql/essentials/sql/v1/lexer/lexer.cpp b/yql/essentials/sql/v1/lexer/lexer.cpp
index 1d38ec3d8b0..2609e0f7f6f 100644
--- a/yql/essentials/sql/v1/lexer/lexer.cpp
+++ b/yql/essentials/sql/v1/lexer/lexer.cpp
@@ -8,8 +8,11 @@
#include <yql/essentials/parser/proto_ast/gen/v1_ansi/SQLv1Lexer.h>
#include <yql/essentials/parser/proto_ast/gen/v1_antlr4/SQLv1Antlr4Lexer.h>
#include <yql/essentials/parser/proto_ast/gen/v1_ansi_antlr4/SQLv1Antlr4Lexer.h>
+#include <yql/essentials/sql/v1/sql.h>
#include <util/string/ascii.h>
+#include <util/string/builder.h>
+#include <util/string/strip.h>
#if defined(_tsan_enabled_)
#include <util/system/mutex.h>
@@ -80,4 +83,204 @@ bool IsProbablyKeyword(const NSQLTranslation::TParsedToken& token) {
return AsciiEqualsIgnoreCase(token.Name, token.Content);
}
+using NSQLTranslation::TParsedTokenList;
+using TTokenIterator = TParsedTokenList::const_iterator;
+
+namespace {
+
+enum EParenType {
+ Open,
+ Close,
+ None
+};
+
+using TAdvanceCallback = std::function<EParenType(TTokenIterator& curr, TTokenIterator end)>;
+
+TTokenIterator SkipWS(TTokenIterator curr, TTokenIterator end) {
+ while (curr != end && curr->Name == "WS") {
+ ++curr;
+ }
+ return curr;
+}
+
+TTokenIterator SkipWSOrComment(TTokenIterator curr, TTokenIterator end) {
+ while (curr != end && (curr->Name == "WS" || curr->Name == "COMMENT")) {
+ ++curr;
+ }
+ return curr;
+}
+
+TTokenIterator SkipToNextBalanced(TTokenIterator begin, TTokenIterator end, const TAdvanceCallback& advance) {
+ i64 level = 0;
+ TTokenIterator curr = begin;
+ while (curr != end) {
+ switch (advance(curr, end)) {
+ case EParenType::Open: {
+ ++level;
+ break;
+ }
+ case EParenType::Close: {
+ --level;
+ if (level < 0) {
+ return end;
+ } else if (level == 0) {
+ return curr;
+ }
+ break;
+ }
+ case EParenType::None:
+ break;
+ }
+ }
+ return curr;
+}
+
+TTokenIterator GetNextStatementBegin(TTokenIterator begin, TTokenIterator end) {
+ TAdvanceCallback advanceLambdaBody = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
+ Y_UNUSED(end);
+ if (curr->Name == "LBRACE_CURLY") {
+ ++curr;
+ return EParenType::Open;
+ } else if (curr->Name == "RBRACE_CURLY") {
+ ++curr;
+ return EParenType::Close;
+ } else {
+ ++curr;
+ return EParenType::None;
+ }
+ };
+
+ TAdvanceCallback advanceAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
+ auto tmp = curr;
+ if (curr->Name == "DEFINE") {
+ ++curr;
+ curr = SkipWSOrComment(curr, end);
+ if (curr != end && (curr->Name == "ACTION" || curr->Name == "SUBQUERY")) {
+ ++curr;
+ return EParenType::Open;
+ }
+ } else if (curr->Name == "END") {
+ ++curr;
+ curr = SkipWSOrComment(curr, end);
+ if (curr != end && curr->Name == "DEFINE") {
+ ++curr;
+ return EParenType::Close;
+ }
+ }
+
+ curr = tmp;
+ ++curr;
+ return EParenType::None;
+ };
+
+ TAdvanceCallback advanceInlineAction = [](TTokenIterator& curr, TTokenIterator end) -> EParenType {
+ auto tmp = curr;
+ if (curr->Name == "DO") {
+ ++curr;
+ curr = SkipWSOrComment(curr, end);
+ if (curr != end && curr->Name == "BEGIN") {
+ ++curr;
+ return EParenType::Open;
+ }
+ } else if (curr->Name == "END") {
+ ++curr;
+ curr = SkipWSOrComment(curr, end);
+ if (curr != end && curr->Name == "DO") {
+ ++curr;
+ return EParenType::Close;
+ }
+ }
+
+ curr = tmp;
+ ++curr;
+ return EParenType::None;
+ };
+
+ TTokenIterator curr = begin;
+ while (curr != end) {
+ bool matched = false;
+ for (auto cb : {advanceLambdaBody, advanceAction, advanceInlineAction}) {
+ TTokenIterator tmp = curr;
+ if (cb(tmp, end) == EParenType::Open) {
+ curr = SkipToNextBalanced(curr, end, cb);
+ matched = true;
+ if (curr == end) {
+ return curr;
+ }
+ }
+ }
+ if (matched) {
+ continue;
+ }
+ if (curr->Name == "SEMICOLON") {
+ auto next = SkipWS(curr + 1, end);
+ while (next != end && next->Name == "COMMENT" && curr->Line == next->Line) {
+ curr = next;
+ next = SkipWS(next + 1, end);
+ }
+ ++curr;
+ break;
+ }
+ ++curr;
+ }
+
+ return curr;
+}
+
+void SplitByStatements(TTokenIterator begin, TTokenIterator end, TVector<TTokenIterator>& output) {
+ output.clear();
+ if (begin == end) {
+ return;
+ }
+ output.push_back(begin);
+ auto curr = begin;
+ while (curr != end) {
+ curr = GetNextStatementBegin(curr, end);
+ output.push_back(curr);
+ }
+}
+
+}
+
+bool SplitQueryToStatements(const TString& query, NSQLTranslation::ILexer::TPtr& lexer, TVector<TString>& statements, NYql::TIssues& issues) {
+ TParsedTokenList allTokens;
+ auto onNextToken = [&](NSQLTranslation::TParsedToken&& token) {
+ if (token.Name != "EOF") {
+ allTokens.push_back(token);
+ }
+ };
+
+ if (!lexer->Tokenize(query, "Query", onNextToken, issues, NSQLTranslation::SQL_MAX_PARSER_ERRORS)) {
+ return false;
+ }
+
+ TVector<TTokenIterator> statementsTokens;
+ SplitByStatements(allTokens.begin(), allTokens.end(), statementsTokens);
+
+ for (size_t i = 1; i < statementsTokens.size(); ++i) {
+ TStringBuilder currentQueryBuilder;
+ for (auto it = statementsTokens[i - 1]; it != statementsTokens[i]; ++it) {
+ currentQueryBuilder << it->Content;
+ }
+ TString statement = currentQueryBuilder;
+ statement = StripStringLeft(statement);
+
+ bool isBlank = true;
+ for (auto c : statement) {
+ if (c != ';') {
+ isBlank = false;
+ break;
+ }
+ };
+
+ if (isBlank) {
+ continue;
+ }
+
+ statements.push_back(statement);
+ }
+
+ return true;
+}
+
} // namespace NSQLTranslationV1
diff --git a/yql/essentials/sql/v1/lexer/lexer.h b/yql/essentials/sql/v1/lexer/lexer.h
index 25bfe28f81f..d43efc6387f 100644
--- a/yql/essentials/sql/v1/lexer/lexer.h
+++ b/yql/essentials/sql/v1/lexer/lexer.h
@@ -12,4 +12,6 @@ NSQLTranslation::ILexer::TPtr MakeLexer(bool ansi, bool antlr4);
// in SELECT * FROM ... GROUP BY ... - group is a keyword.
bool IsProbablyKeyword(const NSQLTranslation::TParsedToken& token);
+bool SplitQueryToStatements(const TString& query, NSQLTranslation::ILexer::TPtr& lexer,
+ TVector<TString>& statements, NYql::TIssues& issues);
}
diff --git a/yql/essentials/sql/v1/node.h b/yql/essentials/sql/v1/node.h
index 609cd82dd48..e19245f695b 100644
--- a/yql/essentials/sql/v1/node.h
+++ b/yql/essentials/sql/v1/node.h
@@ -1548,6 +1548,14 @@ namespace NSQLTranslationV1 {
std::map<TString, TNodePtr>&& settings,
const TObjectOperatorContext& context);
TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context);
+ TNodePtr BuildCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target,
+ const TString&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context);
+ TNodePtr BuildAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context);
+ TNodePtr BuildDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context);
TNodePtr BuildWriteResult(TPosition pos, const TString& label, TNodePtr settings);
TNodePtr BuildCommitClusters(TPosition pos);
TNodePtr BuildRollbackClusters(TPosition pos);
diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp
index c74f96a27ca..a1323972c44 100644
--- a/yql/essentials/sql/v1/query.cpp
+++ b/yql/essentials/sql/v1/query.cpp
@@ -2628,6 +2628,159 @@ TNodePtr BuildAlterAsyncReplication(TPosition pos, const TString& id,
return new TAlterAsyncReplication(pos, id, std::move(settings), context);
}
+class TTransfer
+ : public TAstListNode
+ , protected TObjectOperatorContext
+{
+protected:
+ virtual INode::TPtr FillOptions(INode::TPtr options) const = 0;
+
+public:
+ explicit TTransfer(TPosition pos, const TString& id, const TString& mode, const TObjectOperatorContext& context)
+ : TAstListNode(pos)
+ , TObjectOperatorContext(context)
+ , Id(id)
+ , Mode(mode)
+ {
+ }
+
+ bool DoInit(TContext& ctx, ISource* src) override {
+ Scoped->UseCluster(ServiceId, Cluster);
+
+ auto keys = Y("Key", Q(Y(Q("transfer"), Y("String", BuildQuotedAtom(Pos, Id)))));
+ auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode)))));
+
+ Add("block", Q(Y(
+ Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, ServiceId), Scoped->WrapCluster(Cluster, ctx))),
+ Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(options))),
+ Y("return", ctx.PragmaAutoCommit ? Y(TString(CommitName), "world", "sink") : AstNode("world"))
+ )));
+
+ return TAstListNode::DoInit(ctx, src);
+ }
+
+ TPtr DoClone() const final {
+ return {};
+ }
+
+private:
+ const TString Id;
+ const TString Mode;
+
+}; // TTransfer
+
+class TCreateTransfer final: public TTransfer {
+public:
+ explicit TCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target,
+ const TString&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+ : TTransfer(pos, id, "create", context)
+ , Source(std::move(source))
+ , Target(std::move(target))
+ , TransformLambda(std::move(transformLambda))
+ , Settings(std::move(settings))
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ options = L(options, Q(Y(Q("source"), Q(Source))));
+ options = L(options, Q(Y(Q("target"), Q(Target))));
+ options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda))));
+
+ if (!Settings.empty()) {
+ auto settings = Y();
+ for (auto&& [k, v] : Settings) {
+ if (v) {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v)));
+ } else {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k))));
+ }
+ }
+ options = L(options, Q(Y(Q("settings"), Q(settings))));
+ }
+
+ return options;
+ }
+
+private:
+ const TString Source;
+ const TString Target;
+ const TString TransformLambda;
+ std::map<TString, TNodePtr> Settings;
+
+}; // TCreateTransfer
+
+TNodePtr BuildCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target,
+ const TString&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+{
+ return new TCreateTransfer(pos, id, std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context);
+}
+
+class TDropTransfer final: public TTransfer {
+public:
+ explicit TDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context)
+ : TTransfer(pos, id, cascade ? "dropCascade" : "drop", context)
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ return options;
+ }
+
+}; // TDropTransfer
+
+TNodePtr BuildDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) {
+ return new TDropTransfer(pos, id, cascade, context);
+}
+
+class TAlterTransfer final: public TTransfer {
+public:
+ explicit TAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+ : TTransfer(pos, id, "alter", context)
+ , TransformLambda(std::move(transformLambda))
+ , Settings(std::move(settings))
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda ? TransformLambda.value() : ""))));
+
+ if (!Settings.empty()) {
+ auto settings = Y();
+ for (auto&& [k, v] : Settings) {
+ if (v) {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v)));
+ } else {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k))));
+ }
+ }
+ options = L(options, Q(Y(Q("settings"), Q(settings))));
+ }
+
+ return options;
+ }
+
+private:
+ const std::optional<TString> TransformLambda;
+ std::map<TString, TNodePtr> Settings;
+
+}; // TAlterTransfer
+
+TNodePtr BuildAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+{
+ return new TAlterTransfer(pos, id, std::move(transformLambda), std::move(settings), context);
+}
+
static const TMap<EWriteColumnMode, TString> columnModeToStrMapMR {
{EWriteColumnMode::Default, ""},
{EWriteColumnMode::Insert, "append"},
diff --git a/yql/essentials/sql/v1/sql.cpp b/yql/essentials/sql/v1/sql.cpp
index 3e5dba78f3c..503f3e45da9 100644
--- a/yql/essentials/sql/v1/sql.cpp
+++ b/yql/essentials/sql/v1/sql.cpp
@@ -75,13 +75,21 @@ void SqlASTsToYqlsImpl(NYql::TAstParseResult& res, const std::vector<::NSQLv1Gen
}
}
-NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst,
+NYql::TAstParseResult SqlASTToYql(
+ const google::protobuf::Message& protoAst,
+ const NSQLTranslation::TSQLHints& hints,
+ const NSQLTranslation::TTranslationSettings& settings) {
+ return SqlASTToYql("", protoAst, hints, settings);
+}
+
+NYql::TAstParseResult SqlASTToYql(const TString& query,
+ const google::protobuf::Message& protoAst,
const NSQLTranslation::TSQLHints& hints,
const NSQLTranslation::TTranslationSettings& settings)
{
YQL_ENSURE(IsQueryMode(settings.Mode));
TAstParseResult res;
- TContext ctx(settings, hints, res.Issues);
+ TContext ctx(settings, hints, res.Issues, query);
SqlASTToYqlImpl(res, protoAst, ctx);
res.ActualSyntaxType = NYql::ESyntaxType::YQLv1;
return res;
@@ -99,7 +107,7 @@ NYql::TAstParseResult SqlToYql(const TString& query, const NSQLTranslation::TTra
return res;
}
- TContext ctx(settings, hints, res.Issues);
+ TContext ctx(settings, hints, res.Issues, query);
NSQLTranslation::TErrorCollectorOverIssues collector(res.Issues, settings.MaxErrors, settings.File);
google::protobuf::Message* ast(SqlAST(query, queryName, collector, settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena));
@@ -177,11 +185,14 @@ bool NeedUseForAllStatements(const TRule_sql_stmt_core::AltCase& subquery) {
case TRule_sql_stmt_core::kAltSqlStmtCore55: // backup
case TRule_sql_stmt_core::kAltSqlStmtCore56: // restore
case TRule_sql_stmt_core::kAltSqlStmtCore57: // alter sequence
+ case TRule_sql_stmt_core::kAltSqlStmtCore58: // create transfer
+ case TRule_sql_stmt_core::kAltSqlStmtCore59: // alter transfer
+ case TRule_sql_stmt_core::kAltSqlStmtCore60: // drop transfer
return false;
}
}
-TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules,
+TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& queryText, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules,
TVector<NYql::TStmtParseInfo>* stmtParseInfo)
{
TVector<TAstParseResult> result;
@@ -191,14 +202,14 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS
NSQLTranslation::TSQLHints hints;
auto lexer = MakeLexer(settings.AnsiLexer, settings.Antlr4Parser);
YQL_ENSURE(lexer);
- if (!CollectSqlHints(*lexer, query, queryName, settings.File, hints, issues, settings.MaxErrors, settings.Antlr4Parser)) {
+ if (!CollectSqlHints(*lexer, queryText, queryName, settings.File, hints, issues, settings.MaxErrors, settings.Antlr4Parser)) {
return result;
}
TContext ctx(settings, hints, issues);
NSQLTranslation::TErrorCollectorOverIssues collector(issues, settings.MaxErrors, settings.File);
- google::protobuf::Message* astProto(SqlAST(query, queryName, collector, settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena));
+ google::protobuf::Message* astProto(SqlAST(queryText, queryName, collector, settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena));
if (astProto) {
auto ast = static_cast<const TSQLv1ParserAST&>(*astProto);
const auto& query = ast.GetRule_sql_query();
@@ -209,7 +220,7 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS
if (NeedUseForAllStatements(statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2().Alt_case())) {
commonStates.push_back(statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2());
} else {
- TContext ctx(settings, hints, issues);
+ TContext ctx(settings, hints, issues, queryText);
result.emplace_back();
if (stmtParseInfo) {
stmtParseInfo->push_back({});
@@ -223,7 +234,7 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS
commonStates.push_back(block.GetRule_sql_stmt2().GetRule_sql_stmt_core2());
continue;
}
- TContext ctx(settings, hints, issues);
+ TContext ctx(settings, hints, issues, queryText);
result.emplace_back();
if (stmtParseInfo) {
stmtParseInfo->push_back({});
@@ -245,4 +256,28 @@ TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NS
return result;
}
+bool SplitQueryToStatements(const TString& query, TVector<TString>& statements, NYql::TIssues& issues,
+ const NSQLTranslation::TTranslationSettings& settings) {
+ auto lexer = NSQLTranslationV1::MakeLexer(settings.AnsiLexer, settings.Antlr4Parser);
+
+ TVector<TString> parts;
+ if (!SplitQueryToStatements(query, lexer, parts, issues)) {
+ return false;
+ }
+
+ for (auto& currentQuery : parts) {
+ NYql::TIssues parserIssues;
+ auto message = NSQLTranslationV1::SqlAST(currentQuery, "Query", parserIssues, NSQLTranslation::SQL_MAX_PARSER_ERRORS,
+ settings.AnsiLexer, settings.Antlr4Parser, settings.TestAntlr4, settings.Arena);
+ if (!message) {
+ // Skip empty statements
+ continue;
+ }
+
+ statements.push_back(std::move(currentQuery));
+ }
+
+ return true;
+}
+
} // namespace NSQLTranslationV1
diff --git a/yql/essentials/sql/v1/sql.h b/yql/essentials/sql/v1/sql.h
index 0a12c45d308..6e59aaf6914 100644
--- a/yql/essentials/sql/v1/sql.h
+++ b/yql/essentials/sql/v1/sql.h
@@ -3,6 +3,7 @@
#include <yql/essentials/ast/yql_ast.h>
#include <yql/essentials/parser/lexer_common/hints.h>
#include <yql/essentials/parser/proto_ast/common.h>
+#include <yql/essentials/parser/proto_ast/gen/v1_proto_split/SQLv1Parser.pb.main.h>
#include <yql/essentials/public/issue/yql_warning.h>
#include <yql/essentials/public/issue/yql_issue_manager.h>
#include <yql/essentials/sql/settings/translation_settings.h>
@@ -16,7 +17,16 @@ namespace NSQLTranslation {
namespace NSQLTranslationV1 {
NYql::TAstParseResult SqlToYql(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules = nullptr);
+
+ /*[[deprecated]] Use SqlASTToYql(query, protoAst, hints, settings)*/
NYql::TAstParseResult SqlASTToYql(const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings);
+ NYql::TAstParseResult SqlASTToYql(const TString& query, const google::protobuf::Message& protoAst, const NSQLTranslation::TSQLHints& hints, const NSQLTranslation::TTranslationSettings& settings);
+
TVector<NYql::TAstParseResult> SqlToAstStatements(const TString& query, const NSQLTranslation::TTranslationSettings& settings, NYql::TWarningRules* warningRules, TVector<NYql::TStmtParseInfo>* stmtParseInfo = nullptr);
+ bool NeedUseForAllStatements(const NSQLv1Generated::TRule_sql_stmt_core::AltCase& subquery);
+
+ bool SplitQueryToStatements(const TString& query, TVector<TString>& statements, NYql::TIssues& issues,
+ const NSQLTranslation::TTranslationSettings& settings);
+
} // namespace NSQLTranslationV1
diff --git a/yql/essentials/sql/v1/sql_expression.cpp b/yql/essentials/sql/v1/sql_expression.cpp
index a7a9877ccbd..199f15f2939 100644
--- a/yql/essentials/sql/v1/sql_expression.cpp
+++ b/yql/essentials/sql/v1/sql_expression.cpp
@@ -34,6 +34,31 @@ TNodePtr TSqlExpression::Build(const TRule_expr& node) {
}
}
+TNodePtr TSqlExpression::Build(const TRule_lambda_or_parameter& node) {
+ // lambda_or_parameter:
+ // lambda
+ // | bind_parameter
+ switch (node.Alt_case()) {
+ case TRule_lambda_or_parameter::kAltLambdaOrParameter1: {
+ return LambdaRule(node.alt_lambda_or_parameter1().GetRule_lambda1());
+ }
+ case TRule_lambda_or_parameter::kAltLambdaOrParameter2: {
+ TString named;
+ if (!NamedNodeImpl(node.GetAlt_lambda_or_parameter2().GetRule_bind_parameter1(), named, *this)) {
+ return nullptr;
+ }
+ auto namedNode = GetNamedNode(named);
+ if (!namedNode) {
+ return nullptr;
+ }
+
+ return namedNode;
+ }
+ case TRule_lambda_or_parameter::ALT_NOT_SET:
+ Y_ABORT("You should change implementation according to grammar changes");
+ }
+ }
+
TNodePtr TSqlExpression::SubExpr(const TRule_mul_subexpr& node, const TTrailingQuestions& tail) {
// mul_subexpr: con_subexpr (DOUBLE_PIPE con_subexpr)*;
auto getNode = [](const TRule_mul_subexpr::TBlock2& b) -> const TRule_con_subexpr& { return b.GetRule_con_subexpr2(); };
diff --git a/yql/essentials/sql/v1/sql_expression.h b/yql/essentials/sql/v1/sql_expression.h
index 64b9dd8a690..4f9100722ca 100644
--- a/yql/essentials/sql/v1/sql_expression.h
+++ b/yql/essentials/sql/v1/sql_expression.h
@@ -22,6 +22,7 @@ public:
}
TNodePtr Build(const TRule_expr& node);
+ TNodePtr Build(const TRule_lambda_or_parameter& node);
void SetSmartParenthesisMode(ESmartParenthesis mode) {
SmartParenthesisMode = mode;
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp
index eb1a174b30c..a04ce659861 100644
--- a/yql/essentials/sql/v1/sql_query.cpp
+++ b/yql/essentials/sql/v1/sql_query.cpp
@@ -143,7 +143,68 @@ static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings,
return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, false);
}
-bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) {
+static bool TransferSettingsEntry(std::map<TString, TNodePtr>& out,
+ const TRule_transfer_settings_entry& in, TSqlExpression& ctx, bool create)
+{
+ auto key = IdEx(in.GetRule_an_id1(), ctx);
+ auto value = ctx.Build(in.GetRule_expr3());
+
+ if (!value) {
+ ctx.Context().Error() << "Invalid transfer setting: " << key.Name;
+ return false;
+ }
+
+ TSet<TString> configSettings = {
+ "connection_string",
+ "endpoint",
+ "database",
+ "token",
+ "token_secret_name",
+ "user",
+ "password",
+ "password_secret_name",
+ };
+
+ TSet<TString> stateSettings = {
+ "state",
+ "failover_mode",
+ };
+
+ const auto keyName = to_lower(key.Name);
+ if (!configSettings.count(keyName) && !stateSettings.contains(keyName)) {
+ ctx.Context().Error() << "Unknown transfer setting: " << key.Name;
+ return false;
+ }
+
+ if (create && stateSettings.count(keyName)) {
+ ctx.Context().Error() << key.Name << " is not supported in CREATE";
+ return false;
+ }
+
+ if (!out.emplace(keyName, value).second) {
+ ctx.Context().Error() << "Duplicate transfer setting: " << key.Name;
+ }
+
+ return true;
+}
+
+static bool TransferSettings(std::map<TString, TNodePtr>& out,
+ const TRule_transfer_settings& in, TSqlExpression& ctx, bool create)
+{
+ if (!TransferSettingsEntry(out, in.GetRule_transfer_settings_entry1(), ctx, create)) {
+ return false;
+ }
+
+ for (auto& block : in.GetBlock2()) {
+ if (!TransferSettingsEntry(out, block.GetRule_transfer_settings_entry2(), ctx, create)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core, size_t statementNumber) {
TString internalStatementName;
TString humanStatementName;
ParseStatementName(core, internalStatementName, humanStatementName);
@@ -161,6 +222,10 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}
+ if (NeedUseForAllStatements(altCase)) {
+ Ctx.ForAllStatementsParts.push_back(statementNumber);
+ }
+
switch (altCase) {
case TRule_sql_stmt_core::kAltSqlStmtCore1: {
bool success = false;
@@ -1763,6 +1828,103 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildAlterSequence(pos, service, cluster, id, params, Ctx.Scoped));
break;
}
+ case TRule_sql_stmt_core::kAltSqlStmtCore58: {
+ // create_transfer_stmt: CREATE TRANSFER
+
+ auto& node = core.GetAlt_sql_stmt_core58().GetRule_create_transfer_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref3().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ auto prefixPath = Ctx.GetPrefixPath(context.ServiceId, context.Cluster);
+
+ std::map<TString, TNodePtr> settings;
+ TSqlExpression expr(Ctx, Mode);
+ if (!TransferSettings(settings, node.GetRule_transfer_settings11(), expr, true)) {
+ return false;
+ }
+
+ const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second;
+ const TString source = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second;
+ const TString target = Id(node.GetRule_object_ref7().GetRule_id_or_at2(), *this).second;
+ TString transformLambda;
+ if (node.GetBlock8().HasRule_lambda_or_parameter2()) {
+ if (!ParseTransferLambda(transformLambda, node.GetBlock8().GetRule_lambda_or_parameter2())) {
+ return false;
+ }
+ }
+
+ AddStatementToBlocks(blocks, BuildCreateTransfer(Ctx.Pos(), BuildTablePath(prefixPath, id),
+ std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore59: {
+ // alter_transfer_stmt: ALTER TRANSFER
+ auto& node = core.GetAlt_sql_stmt_core59().GetRule_alter_transfer_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref3().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ std::map<TString, TNodePtr> settings;
+ std::optional<TString> transformLambda;
+ TSqlExpression expr(Ctx, Mode);
+
+ auto transferAlterAction = [&](std::optional<TString>& transformLambda, const TRule_alter_transfer_action& in)
+ {
+ if (in.HasAlt_alter_transfer_action1()) {
+ return TransferSettings(settings, in.GetAlt_alter_transfer_action1().GetRule_alter_transfer_set_setting1().GetRule_transfer_settings3(), expr, false);
+ } else if (in.HasAlt_alter_transfer_action2()) {
+ TString lb;
+ if (!ParseTransferLambda(lb, in.GetAlt_alter_transfer_action2().GetRule_alter_transfer_set_using1().GetRule_lambda_or_parameter3())) {
+ return false;
+ }
+ transformLambda = lb;
+ return true;
+ }
+
+ return false;
+ };
+
+ if (!transferAlterAction(transformLambda, node.GetRule_alter_transfer_action4())) {
+ return false;
+ }
+ for (auto& block : node.GetBlock5()) {
+ if (!transferAlterAction(transformLambda, block.GetRule_alter_transfer_action2())) {
+ return false;
+ }
+ }
+
+ const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildAlterTransfer(Ctx.Pos(),
+ BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id),
+ std::move(transformLambda), std::move(settings), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore60: {
+ // drop_transfer_stmt: DROP TRANSFER
+ auto& node = core.GetAlt_sql_stmt_core60().GetRule_drop_transfer_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref3().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ const TString id = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildDropTransfer(Ctx.Pos(),
+ BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id),
+ node.HasBlock4(), context));
+ break;
+ }
case TRule_sql_stmt_core::ALT_NOT_SET:
Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName);
AltNotImplemented("sql_stmt_core", core);
@@ -3471,12 +3633,13 @@ TNodePtr TSqlQuery::Build(const TSQLv1ParserAST& ast) {
Ctx.PopCurrentBlocks();
};
if (query.Alt_case() == TRule_sql_query::kAltSqlQuery1) {
+ size_t statementNumber = 0;
const auto& statements = query.GetAlt_sql_query1().GetRule_sql_stmt_list1();
- if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) {
+ if (!Statement(blocks, statements.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) {
return nullptr;
}
for (auto block: statements.GetBlock3()) {
- if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2())) {
+ if (!Statement(blocks, block.GetRule_sql_stmt2().GetRule_sql_stmt_core2(), statementNumber++)) {
return nullptr;
}
}
@@ -3546,8 +3709,10 @@ TNodePtr TSqlQuery::Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_co
Y_DEFER {
Ctx.PopCurrentBlocks();
};
+
+ size_t statementNumber = 0;
for (const auto& statement : statements) {
- if (!Statement(blocks, statement)) {
+ if (!Statement(blocks, statement, statementNumber++)) {
return nullptr;
}
}
diff --git a/yql/essentials/sql/v1/sql_query.h b/yql/essentials/sql/v1/sql_query.h
index 03fd85df6b3..ea5b917f8f2 100644
--- a/yql/essentials/sql/v1/sql_query.h
+++ b/yql/essentials/sql/v1/sql_query.h
@@ -20,7 +20,7 @@ public:
TNodePtr Build(const TSQLv1ParserAST& ast);
TNodePtr Build(const std::vector<::NSQLv1Generated::TRule_sql_stmt_core>& ast);
- bool Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core);
+ bool Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core, size_t statementNumber);
private:
bool DeclareStatement(const TRule_declare_stmt& stmt);
bool ExportStatement(const TRule_export_stmt& stmt);
diff --git a/yql/essentials/sql/v1/sql_translation.cpp b/yql/essentials/sql/v1/sql_translation.cpp
index 9989295291a..a4c55cad71d 100644
--- a/yql/essentials/sql/v1/sql_translation.cpp
+++ b/yql/essentials/sql/v1/sql_translation.cpp
@@ -83,7 +83,7 @@ TNodePtr BuildViewSelect(
const TString& contextRecreationQuery
) {
TIssues issues;
- TContext context(parentContext.Settings, {}, issues);
+ TContext context(parentContext.Settings, {}, issues, parentContext.Query);
if (!RecreateContext(context, context.Settings, contextRecreationQuery)) {
parentContext.Issues.AddIssues(issues);
return nullptr;
@@ -4651,13 +4651,15 @@ bool TSqlTranslation::DefineActionOrSubqueryBody(TSqlQuery& query, TBlocks& bloc
Y_DEFER {
Ctx.PopCurrentBlocks();
};
- if (!query.Statement(blocks, body.GetBlock2().GetRule_sql_stmt_core1())) {
+
+ size_t statementNumber = 0;
+ if (!query.Statement(blocks, body.GetBlock2().GetRule_sql_stmt_core1(), statementNumber++)) {
return false;
}
for (const auto& nestedStmtItem : body.GetBlock2().GetBlock2()) {
const auto& nestedStmt = nestedStmtItem.GetRule_sql_stmt_core2();
- if (!query.Statement(blocks, nestedStmt)) {
+ if (!query.Statement(blocks, nestedStmt, statementNumber++)) {
return false;
}
}
@@ -5096,6 +5098,101 @@ bool TSqlTranslation::ParseViewQuery(
return true;
}
+namespace {
+
+static std::string::size_type GetQueryPosition(const TString& query, const NSQLv1Generated::TToken& token, bool antlr4) {
+ if (1 == token.GetLine() && 0 == token.GetColumn()) {
+ return 0;
+ }
+
+ TPosition pos = {0, 1};
+ TTextWalker walker(pos, antlr4);
+
+ std::string::size_type position = 0;
+ for (char c : query) {
+ walker.Advance(c);
+ ++position;
+
+ if (pos.Row == token.GetLine() && pos.Column == token.GetColumn()) {
+ return position;
+ }
+ }
+
+ return std::string::npos;
+}
+
+static TString GetLambdaText(TTranslation& ctx, TContext& Ctx, const TRule_lambda_or_parameter& lambdaOrParameter) {
+ static const TString statementSeparator = ";\n";
+
+ TVector<TString> statements;
+ NYql::TIssues issues;
+ if (!SplitQueryToStatements(Ctx.Query, statements, issues, Ctx.Settings)) {
+ return {};
+ }
+
+ TStringBuilder result;
+ for (const auto id : Ctx.ForAllStatementsParts) {
+ result << statements[id] << "\n";
+ }
+
+ switch (lambdaOrParameter.Alt_case()) {
+ case NSQLv1Generated::TRule_lambda_or_parameter::kAltLambdaOrParameter1: {
+ const auto& lambda = lambdaOrParameter.GetAlt_lambda_or_parameter1().GetRule_lambda1();
+
+ auto& beginToken = lambda.GetRule_smart_parenthesis1().GetToken1();
+ const NSQLv1Generated::TToken* endToken = nullptr;
+ switch (lambda.GetBlock2().GetBlock2().GetAltCase()) {
+ case TRule_lambda_TBlock2_TBlock2::AltCase::kAlt1:
+ endToken = &lambda.GetBlock2().GetBlock2().GetAlt1().GetToken3();
+ break;
+ case TRule_lambda_TBlock2_TBlock2::AltCase::kAlt2:
+ endToken = &lambda.GetBlock2().GetBlock2().GetAlt2().GetToken3();
+ break;
+ case TRule_lambda_TBlock2_TBlock2::AltCase::ALT_NOT_SET:
+ Y_ABORT("You should change implementation according to grammar changes");
+ }
+
+ auto begin = GetQueryPosition(Ctx.Query, beginToken, Ctx.Settings.Antlr4Parser);
+ auto end = GetQueryPosition(Ctx.Query, *endToken, Ctx.Settings.Antlr4Parser);
+ if (begin == std::string::npos || end == std::string::npos) {
+ return {};
+ }
+
+ result << "$__ydb_transfer_lambda = " << Ctx.Query.substr(begin, end - begin + endToken->value().size()) << statementSeparator;
+
+ return result;
+ }
+ case NSQLv1Generated::TRule_lambda_or_parameter::kAltLambdaOrParameter2: {
+ const auto& valueBlock = lambdaOrParameter.GetAlt_lambda_or_parameter2().GetRule_bind_parameter1().GetBlock2();
+ const auto id = Id(valueBlock.GetAlt1().GetRule_an_id_or_type1(), ctx);
+ result << "$__ydb_transfer_lambda = $" << id << statementSeparator;
+ return result;
+ }
+ case NSQLv1Generated::TRule_lambda_or_parameter::ALT_NOT_SET:
+ Y_ABORT("You should change implementation according to grammar changes");
+ }
+}
+
+}
+
+bool TSqlTranslation::ParseTransferLambda(
+ TString& lambdaText,
+ const TRule_lambda_or_parameter& lambdaOrParameter) {
+
+ TSqlExpression expr(Ctx, Ctx.Settings.Mode);
+ auto result = expr.Build(lambdaOrParameter);
+ if (!result) {
+ return false;
+ }
+
+ lambdaText = GetLambdaText(*this, Ctx, lambdaOrParameter);
+ if (lambdaText.empty()) {
+ Ctx.Error() << "Cannot parse lambda correctly";
+ }
+
+ return !lambdaText.empty();
+}
+
class TReturningListColumns : public INode {
public:
TReturningListColumns(TPosition pos)
diff --git a/yql/essentials/sql/v1/sql_translation.h b/yql/essentials/sql/v1/sql_translation.h
index 75e9c2f7d48..74b78ae3306 100644
--- a/yql/essentials/sql/v1/sql_translation.h
+++ b/yql/essentials/sql/v1/sql_translation.h
@@ -263,6 +263,7 @@ protected:
TVector<TDeferredAtom>& addTables,
TVector<TDeferredAtom>& removeTables,
const TRule_alter_backup_collection_entries& entries);
+ bool ParseTransferLambda(TString& lambdaText, const TRule_lambda_or_parameter& lambdaOrParameter);
bool ValidateAuthMethod(const std::map<TString, TDeferredAtom>& result);
bool ValidateExternalTable(const TCreateTableParameters& params);
diff --git a/yql/essentials/sql/v1/sql_ut.cpp b/yql/essentials/sql/v1/sql_ut.cpp
index e7b30928c32..9baf35f8f4e 100644
--- a/yql/essentials/sql/v1/sql_ut.cpp
+++ b/yql/essentials/sql/v1/sql_ut.cpp
@@ -4,6 +4,7 @@
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/sql/sql.h>
+#include <yql/essentials/sql/v1/sql.h>
#include <util/generic/map.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -287,6 +288,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT(SqlToYql("USE plato; SELECT REPLICATION FROM REPLICATION").IsOk());
}
+ Y_UNIT_TEST(TransferKeywordNotReservedForNames) {
+ UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE TRANSFER (TRANSFER Uint32, PRIMARY KEY (TRANSFER));").IsOk());
+ UNIT_ASSERT(SqlToYql("USE plato; SELECT TRANSFER FROM TRANSFER").IsOk());
+ }
+
Y_UNIT_TEST(SecondsKeywordNotReservedForNames) {
UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE SECONDS (SECONDS Uint32, PRIMARY KEY (SECONDS));").IsOk());
UNIT_ASSERT(SqlToYql("USE plato; SELECT SECONDS FROM SECONDS").IsOk());
@@ -8056,3 +8062,60 @@ Y_UNIT_TEST_SUITE(ColumnFamily) {
UNIT_ASSERT_STRING_CONTAINS(res.Issues.ToString(), "COMPRESSION_LEVEL value should be an integer");
}
}
+
+Y_UNIT_TEST_SUITE(QuerySplit) {
+ Y_UNIT_TEST(Simple) {
+ TString query = R"(
+ ;
+ -- Comment 1
+ SELECT * From Input; -- Comment 2
+ -- Comment 3
+ $a = "a";
+
+ -- Comment 9
+ ;
+
+ -- Comment 10
+
+ -- Comment 8
+
+ $b = ($x) -> {
+ -- comment 4
+ return /* Comment 5 */ $x;
+ -- Comment 6
+ };
+
+ // Comment 7
+
+
+
+ )";
+
+ google::protobuf::Arena Arena;
+
+ NSQLTranslation::TTranslationSettings settings;
+ settings.AnsiLexer = false;
+ settings.Antlr4Parser = true;
+ settings.Arena = &Arena;
+
+ TVector<TString> statements;
+ NYql::TIssues issues;
+
+ UNIT_ASSERT(NSQLTranslationV1::SplitQueryToStatements(query, statements, issues, settings));
+
+ UNIT_ASSERT_VALUES_EQUAL(statements.size(), 3);
+
+ UNIT_ASSERT_VALUES_EQUAL(statements[0], "-- Comment 1\n SELECT * From Input; -- Comment 2\n");
+ UNIT_ASSERT_VALUES_EQUAL(statements[1], R"(-- Comment 3
+ $a = "a";)");
+ UNIT_ASSERT_VALUES_EQUAL(statements[2], R"(-- Comment 10
+
+ -- Comment 8
+
+ $b = ($x) -> {
+ -- comment 4
+ return /* Comment 5 */ $x;
+ -- Comment 6
+ };)");
+ }
+}
diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.cpp b/yql/essentials/sql/v1/sql_ut_antlr4.cpp
index 5337f573510..05e85605ef4 100644
--- a/yql/essentials/sql/v1/sql_ut_antlr4.cpp
+++ b/yql/essentials/sql/v1/sql_ut_antlr4.cpp
@@ -287,6 +287,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT(SqlToYql("USE plato; SELECT REPLICATION FROM REPLICATION").IsOk());
}
+ Y_UNIT_TEST(TransferKeywordNotReservedForNames) {
+ UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE TRANSFER (TRANSFER Uint32, PRIMARY KEY (TRANSFER));").IsOk());
+ UNIT_ASSERT(SqlToYql("USE plato; SELECT TRANSFER FROM TRANSFER").IsOk());
+ }
+
Y_UNIT_TEST(SecondsKeywordNotReservedForNames) {
UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE SECONDS (SECONDS Uint32, PRIMARY KEY (SECONDS));").IsOk());
UNIT_ASSERT(SqlToYql("USE plato; SELECT SECONDS FROM SECONDS").IsOk());
@@ -8028,3 +8033,48 @@ Y_UNIT_TEST_SUITE(ColumnFamily) {
UNIT_ASSERT_STRING_CONTAINS(res.Issues.ToString(), "COMPRESSION_LEVEL value should be an integer");
}
}
+
+Y_UNIT_TEST_SUITE(Transfer) {
+ Y_UNIT_TEST(Lambda) {
+ NYql::TAstParseResult res = SqlToYql(R"( use plato;
+ -- Русский коммент, empty statement
+ ;
+
+ -- befor comment
+ $a = "А";
+
+ SELECT * FROM Input;
+
+ $b = ($x) -> { return $a || $x; };
+
+ CREATE TRANSFER `TransferName`
+ FROM `TopicName` TO `TableName`
+ USING ($x) -> {
+ -- internal comment
+ return $b($x);
+ }
+ WITH (
+ CONNECTION_STRING = "grpc://localhost:2135/?database=/Root"
+ );
+ )");
+
+ UNIT_ASSERT_C(res.IsOk(), res.Issues.ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(res.Issues.Size(), 0, res.Issues.ToString());
+
+ const auto programm = GetPrettyPrint(res);
+
+ Cerr << ">>>>> Root " << programm << Endl;
+ auto expected = R"('transformLambda 'use plato;
+-- befor comment
+ $a = "А";
+$b = ($x) -> { return $a || $x; };
+$__ydb_transfer_lambda = ($x) -> {
+ -- internal comment
+ return $b($x);
+ };
+))";
+
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, programm.find(expected));
+
+ }
+}
diff --git a/yql/essentials/tools/sql2yql/sql2yql.cpp b/yql/essentials/tools/sql2yql/sql2yql.cpp
index dd313b451cb..ba30d95e4d2 100644
--- a/yql/essentials/tools/sql2yql/sql2yql.cpp
+++ b/yql/essentials/tools/sql2yql/sql2yql.cpp
@@ -317,7 +317,7 @@ int BuildAST(int argc, char* argv[]) {
auto lexer = SqlLexer(query, parseRes.Issues, settings);
if (lexer && CollectSqlHints(*lexer, query, queryFile, settings.File, hints, parseRes.Issues,
settings.MaxErrors, settings.Antlr4Parser)) {
- parseRes = NSQLTranslation::SqlASTToYql(*ast, hints, settings);
+ parseRes = NSQLTranslation::SqlASTToYql(query, *ast, hints, settings);
}
}
} else {