diff options
| author | grigoriypisar <[email protected]> | 2025-08-28 16:56:17 +0300 |
|---|---|---|
| committer | grigoriypisar <[email protected]> | 2025-08-28 17:21:58 +0300 |
| commit | 45c5e8ac55972ded21c846f9e05a3d61d197e3a4 (patch) | |
| tree | 058b5d184adcc94e57a8262c9dcde768d5589eb7 /yql/essentials/sql/v1/object_processing.cpp | |
| parent | 9be8744bb96ab00cbf25afb60ab54db5cbad95dc (diff) | |
added sql syntax for CREATE / ALTER DROP STREAMING QUERY
Добавлен синтаксис (переиспользован код для inline action):
```
CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] query_name [WITH (
key = value
...
)] AS DO BEGIN
...
END DO;
ALTER STREAMING QUERY [IF EXISTS] query_name [SET(
key = value
...
)] [AS DO BEGIN
...
END DO];
DROP STREAMING QUERY [IF EXISTS] query_name;
```
commit_hash:29fa6aa7e61ecf45112480fe3c1df8fab542354e
Diffstat (limited to 'yql/essentials/sql/v1/object_processing.cpp')
| -rw-r--r-- | yql/essentials/sql/v1/object_processing.cpp | 121 |
1 files changed, 91 insertions, 30 deletions
diff --git a/yql/essentials/sql/v1/object_processing.cpp b/yql/essentials/sql/v1/object_processing.cpp index 64b46d612d4..f9028b01b84 100644 --- a/yql/essentials/sql/v1/object_processing.cpp +++ b/yql/essentials/sql/v1/object_processing.cpp @@ -2,9 +2,30 @@ #include <yql/essentials/core/sql_types/yql_callable_names.h> -namespace NSQLTranslationV1 { using namespace NYql; +namespace NSQLTranslationV1 { + +namespace { + +bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) { + for (const auto& [key, value] : features) { + if (value.HasNode() && !value.Build()->Init(ctx, src)) { + return false; + } + } + + return true; +} + +} // anonymous namespace + +TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped) + : Scoped_(scoped) + , ServiceId(Scoped_->CurrService) + , Cluster(Scoped_->CurrCluster) +{} + INode::TPtr TObjectProcessorImpl::BuildKeys() const { auto keys = Y("Key"); keys = L(keys, Q(Y(Q("objectId"), Y("String", BuildQuotedAtom(Pos_, ObjectId_))))); @@ -17,12 +38,9 @@ TObjectProcessorImpl::TObjectProcessorImpl(TPosition pos, const TString& objectI , TObjectOperatorContext(context) , ObjectId_(objectId) , TypeId_(typeId) -{ - -} +{} bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) { - Y_UNUSED(src); Scoped_->UseCluster(ServiceId, Cluster); auto options = FillFeatures(BuildOptions()); auto keys = BuildKeys(); @@ -35,7 +53,17 @@ bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) { return TAstListNode::DoInit(ctx, src); } -INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const { +TObjectProcessorImpl::TPtr TObjectProcessorImpl::DoClone() const { + return {}; +} + +TObjectProcessorWithFeatures::TObjectProcessorWithFeatures(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features) + : TBase(pos, objectId, typeId, context) + , Features_(std::move(features)) +{} + +INode::TPtr TObjectProcessorWithFeatures::FillFeatures(INode::TPtr options) const { if (!Features_.empty()) { auto features = Y(); for (auto&& i : Features_) { @@ -47,42 +75,75 @@ INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const { } options->Add(Q(Y(Q("features"), Q(features)))); } - if (!FeaturesToReset_.empty()) { - auto reset = Y(); - for (const auto& featureName : FeaturesToReset_) { - reset->Add(BuildQuotedAtom(Pos_, featureName)); - } - options->Add(Q(Y(Q("resetFeatures"), Q(reset)))); - } + return options; } -namespace { +bool TObjectProcessorWithFeatures::DoInit(TContext& ctx, ISource* src) { + if (!InitFeatures(ctx, src, Features_)) { + return false; + } -bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) { - for (auto& [key, value] : features) { - if (value.HasNode() && !value.Build()->Init(ctx, src)) { - return false; - } + return TObjectProcessorImpl::DoInit(ctx, src); +} + +INode::TPtr TCreateObject::BuildOptions() const { + TString mode; + + if (ExistingOk_) { + mode = "createObjectIfNotExists"; + } else if (ReplaceIfExists_) { + mode = "createObjectOrReplace"; + } else { + mode = "createObject"; } - return true; + + return Y(Q(Y(Q("mode"), Q(mode)))); } +TCreateObject::TCreateObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, bool existingOk, bool replaceIfExists) + : TBase(pos, objectId, typeId, context, std::move(features)) + , ExistingOk_(existingOk) + , ReplaceIfExists_(replaceIfExists) +{} + +INode::TPtr TUpsertObject::BuildOptions() const { + return Y(Q(Y(Q("mode"), Q("upsertObject")))); } -bool TCreateObject::DoInit(TContext& ctx, ISource* src) { - if (!InitFeatures(ctx, src, Features_)) { - return false; +TAlterObject::TAlterObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, std::set<TString>&& featuresToReset, bool missingOk) + : TBase(pos, objectId, typeId, context, std::move(features)) + , FeaturesToReset_(std::move(featuresToReset)) + , MissingOk_(missingOk) +{} + +INode::TPtr TAlterObject::FillFeatures(INode::TPtr options) const { + options = TBase::FillFeatures(options); + + if (!FeaturesToReset_.empty()) { + auto reset = Y(); + for (const auto& featureName : FeaturesToReset_) { + reset->Add(BuildQuotedAtom(Pos_, featureName)); + } + options->Add(Q(Y(Q("resetFeatures"), Q(reset)))); } - return TObjectProcessorImpl::DoInit(ctx, src); + return options; } -TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped) - : Scoped_(scoped) - , ServiceId(Scoped_->CurrService) - , Cluster(Scoped_->CurrCluster) -{ - +INode::TPtr TAlterObject::BuildOptions() const { + return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "alterObjectIfExists" : "alterObject")))); } +TDropObject::TDropObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context, + TFeatureMap&& features, bool missingOk) + : TBase(pos, objectId, typeId, context, std::move(features)) + , MissingOk_(missingOk) +{} + +INode::TPtr TDropObject::BuildOptions() const { + return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "dropObjectIfExists" : "dropObject")))); } + +} // NSQLTranslationV1 |
