summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql/v1/query.cpp
diff options
context:
space:
mode:
authortesseract <[email protected]>2025-01-21 12:50:29 +0300
committertesseract <[email protected]>2025-01-21 14:32:19 +0300
commite677409ecb6106695a976307290b2f6bad3d72c0 (patch)
tree7c4fe8c7334a8f814506c857a08322ea800a8b79 /yql/essentials/sql/v1/query.cpp
parente2324a4c7934ecbc80eb47f70d2586c4995499b5 (diff)
YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb
Diffstat (limited to 'yql/essentials/sql/v1/query.cpp')
-rw-r--r--yql/essentials/sql/v1/query.cpp153
1 files changed, 153 insertions, 0 deletions
diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp
index c74f96a27ca..a1323972c44 100644
--- a/yql/essentials/sql/v1/query.cpp
+++ b/yql/essentials/sql/v1/query.cpp
@@ -2628,6 +2628,159 @@ TNodePtr BuildAlterAsyncReplication(TPosition pos, const TString& id,
return new TAlterAsyncReplication(pos, id, std::move(settings), context);
}
+class TTransfer
+ : public TAstListNode
+ , protected TObjectOperatorContext
+{
+protected:
+ virtual INode::TPtr FillOptions(INode::TPtr options) const = 0;
+
+public:
+ explicit TTransfer(TPosition pos, const TString& id, const TString& mode, const TObjectOperatorContext& context)
+ : TAstListNode(pos)
+ , TObjectOperatorContext(context)
+ , Id(id)
+ , Mode(mode)
+ {
+ }
+
+ bool DoInit(TContext& ctx, ISource* src) override {
+ Scoped->UseCluster(ServiceId, Cluster);
+
+ auto keys = Y("Key", Q(Y(Q("transfer"), Y("String", BuildQuotedAtom(Pos, Id)))));
+ auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode)))));
+
+ Add("block", Q(Y(
+ Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, ServiceId), Scoped->WrapCluster(Cluster, ctx))),
+ Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(options))),
+ Y("return", ctx.PragmaAutoCommit ? Y(TString(CommitName), "world", "sink") : AstNode("world"))
+ )));
+
+ return TAstListNode::DoInit(ctx, src);
+ }
+
+ TPtr DoClone() const final {
+ return {};
+ }
+
+private:
+ const TString Id;
+ const TString Mode;
+
+}; // TTransfer
+
+class TCreateTransfer final: public TTransfer {
+public:
+ explicit TCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target,
+ const TString&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+ : TTransfer(pos, id, "create", context)
+ , Source(std::move(source))
+ , Target(std::move(target))
+ , TransformLambda(std::move(transformLambda))
+ , Settings(std::move(settings))
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ options = L(options, Q(Y(Q("source"), Q(Source))));
+ options = L(options, Q(Y(Q("target"), Q(Target))));
+ options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda))));
+
+ if (!Settings.empty()) {
+ auto settings = Y();
+ for (auto&& [k, v] : Settings) {
+ if (v) {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v)));
+ } else {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k))));
+ }
+ }
+ options = L(options, Q(Y(Q("settings"), Q(settings))));
+ }
+
+ return options;
+ }
+
+private:
+ const TString Source;
+ const TString Target;
+ const TString TransformLambda;
+ std::map<TString, TNodePtr> Settings;
+
+}; // TCreateTransfer
+
+TNodePtr BuildCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target,
+ const TString&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+{
+ return new TCreateTransfer(pos, id, std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context);
+}
+
+class TDropTransfer final: public TTransfer {
+public:
+ explicit TDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context)
+ : TTransfer(pos, id, cascade ? "dropCascade" : "drop", context)
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ return options;
+ }
+
+}; // TDropTransfer
+
+TNodePtr BuildDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) {
+ return new TDropTransfer(pos, id, cascade, context);
+}
+
+class TAlterTransfer final: public TTransfer {
+public:
+ explicit TAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+ : TTransfer(pos, id, "alter", context)
+ , TransformLambda(std::move(transformLambda))
+ , Settings(std::move(settings))
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda ? TransformLambda.value() : ""))));
+
+ if (!Settings.empty()) {
+ auto settings = Y();
+ for (auto&& [k, v] : Settings) {
+ if (v) {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v)));
+ } else {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k))));
+ }
+ }
+ options = L(options, Q(Y(Q("settings"), Q(settings))));
+ }
+
+ return options;
+ }
+
+private:
+ const std::optional<TString> TransformLambda;
+ std::map<TString, TNodePtr> Settings;
+
+}; // TAlterTransfer
+
+TNodePtr BuildAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+{
+ return new TAlterTransfer(pos, id, std::move(transformLambda), std::move(settings), context);
+}
+
static const TMap<EWriteColumnMode, TString> columnModeToStrMapMR {
{EWriteColumnMode::Default, ""},
{EWriteColumnMode::Insert, "append"},