diff options
| author | tesseract <[email protected]> | 2025-01-21 12:50:29 +0300 |
|---|---|---|
| committer | tesseract <[email protected]> | 2025-01-21 14:32:19 +0300 |
| commit | e677409ecb6106695a976307290b2f6bad3d72c0 (patch) | |
| tree | 7c4fe8c7334a8f814506c857a08322ea800a8b79 /yql/essentials/sql/v1/query.cpp | |
| parent | e2324a4c7934ecbc80eb47f70d2586c4995499b5 (diff) | |
YQL for create, alter and drop transfer from topic to table
commit_hash:09502f46a7ee665609d2c4ba8d9e0aa421720cdb
Diffstat (limited to 'yql/essentials/sql/v1/query.cpp')
| -rw-r--r-- | yql/essentials/sql/v1/query.cpp | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp index c74f96a27ca..a1323972c44 100644 --- a/yql/essentials/sql/v1/query.cpp +++ b/yql/essentials/sql/v1/query.cpp @@ -2628,6 +2628,159 @@ TNodePtr BuildAlterAsyncReplication(TPosition pos, const TString& id, return new TAlterAsyncReplication(pos, id, std::move(settings), context); } +class TTransfer + : public TAstListNode + , protected TObjectOperatorContext +{ +protected: + virtual INode::TPtr FillOptions(INode::TPtr options) const = 0; + +public: + explicit TTransfer(TPosition pos, const TString& id, const TString& mode, const TObjectOperatorContext& context) + : TAstListNode(pos) + , TObjectOperatorContext(context) + , Id(id) + , Mode(mode) + { + } + + bool DoInit(TContext& ctx, ISource* src) override { + Scoped->UseCluster(ServiceId, Cluster); + + auto keys = Y("Key", Q(Y(Q("transfer"), Y("String", BuildQuotedAtom(Pos, Id))))); + auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode))))); + + Add("block", Q(Y( + Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, ServiceId), Scoped->WrapCluster(Cluster, ctx))), + Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(options))), + Y("return", ctx.PragmaAutoCommit ? Y(TString(CommitName), "world", "sink") : AstNode("world")) + ))); + + return TAstListNode::DoInit(ctx, src); + } + + TPtr DoClone() const final { + return {}; + } + +private: + const TString Id; + const TString Mode; + +}; // TTransfer + +class TCreateTransfer final: public TTransfer { +public: + explicit TCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target, + const TString&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) + : TTransfer(pos, id, "create", context) + , Source(std::move(source)) + , Target(std::move(target)) + , TransformLambda(std::move(transformLambda)) + , Settings(std::move(settings)) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + options = L(options, Q(Y(Q("source"), Q(Source)))); + options = L(options, Q(Y(Q("target"), Q(Target)))); + options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda)))); + + if (!Settings.empty()) { + auto settings = Y(); + for (auto&& [k, v] : Settings) { + if (v) { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v))); + } else { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k)))); + } + } + options = L(options, Q(Y(Q("settings"), Q(settings)))); + } + + return options; + } + +private: + const TString Source; + const TString Target; + const TString TransformLambda; + std::map<TString, TNodePtr> Settings; + +}; // TCreateTransfer + +TNodePtr BuildCreateTransfer(TPosition pos, const TString& id, const TString&& source, const TString&& target, + const TString&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) +{ + return new TCreateTransfer(pos, id, std::move(source), std::move(target), std::move(transformLambda), std::move(settings), context); +} + +class TDropTransfer final: public TTransfer { +public: + explicit TDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) + : TTransfer(pos, id, cascade ? "dropCascade" : "drop", context) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + return options; + } + +}; // TDropTransfer + +TNodePtr BuildDropTransfer(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) { + return new TDropTransfer(pos, id, cascade, context); +} + +class TAlterTransfer final: public TTransfer { +public: + explicit TAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) + : TTransfer(pos, id, "alter", context) + , TransformLambda(std::move(transformLambda)) + , Settings(std::move(settings)) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + options = L(options, Q(Y(Q("transformLambda"), Q(TransformLambda ? TransformLambda.value() : "")))); + + if (!Settings.empty()) { + auto settings = Y(); + for (auto&& [k, v] : Settings) { + if (v) { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v))); + } else { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k)))); + } + } + options = L(options, Q(Y(Q("settings"), Q(settings)))); + } + + return options; + } + +private: + const std::optional<TString> TransformLambda; + std::map<TString, TNodePtr> Settings; + +}; // TAlterTransfer + +TNodePtr BuildAlterTransfer(TPosition pos, const TString& id, std::optional<TString>&& transformLambda, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) +{ + return new TAlterTransfer(pos, id, std::move(transformLambda), std::move(settings), context); +} + static const TMap<EWriteColumnMode, TString> columnModeToStrMapMR { {EWriteColumnMode::Default, ""}, {EWriteColumnMode::Insert, "append"}, |
