summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql/v1/object_processing.cpp
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2025-08-28 16:56:17 +0300
committergrigoriypisar <[email protected]>2025-08-28 17:21:58 +0300
commit45c5e8ac55972ded21c846f9e05a3d61d197e3a4 (patch)
tree058b5d184adcc94e57a8262c9dcde768d5589eb7 /yql/essentials/sql/v1/object_processing.cpp
parent9be8744bb96ab00cbf25afb60ab54db5cbad95dc (diff)
added sql syntax for CREATE / ALTER DROP STREAMING QUERY
Добавлен синтаксис (переиспользован код для inline action): ``` CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] query_name [WITH ( key = value ... )] AS DO BEGIN ... END DO; ALTER STREAMING QUERY [IF EXISTS] query_name [SET( key = value ... )] [AS DO BEGIN ... END DO]; DROP STREAMING QUERY [IF EXISTS] query_name; ``` commit_hash:29fa6aa7e61ecf45112480fe3c1df8fab542354e
Diffstat (limited to 'yql/essentials/sql/v1/object_processing.cpp')
-rw-r--r--yql/essentials/sql/v1/object_processing.cpp121
1 files changed, 91 insertions, 30 deletions
diff --git a/yql/essentials/sql/v1/object_processing.cpp b/yql/essentials/sql/v1/object_processing.cpp
index 64b46d612d4..f9028b01b84 100644
--- a/yql/essentials/sql/v1/object_processing.cpp
+++ b/yql/essentials/sql/v1/object_processing.cpp
@@ -2,9 +2,30 @@
#include <yql/essentials/core/sql_types/yql_callable_names.h>
-namespace NSQLTranslationV1 {
using namespace NYql;
+namespace NSQLTranslationV1 {
+
+namespace {
+
+bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) {
+ for (const auto& [key, value] : features) {
+ if (value.HasNode() && !value.Build()->Init(ctx, src)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+} // anonymous namespace
+
+TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped)
+ : Scoped_(scoped)
+ , ServiceId(Scoped_->CurrService)
+ , Cluster(Scoped_->CurrCluster)
+{}
+
INode::TPtr TObjectProcessorImpl::BuildKeys() const {
auto keys = Y("Key");
keys = L(keys, Q(Y(Q("objectId"), Y("String", BuildQuotedAtom(Pos_, ObjectId_)))));
@@ -17,12 +38,9 @@ TObjectProcessorImpl::TObjectProcessorImpl(TPosition pos, const TString& objectI
, TObjectOperatorContext(context)
, ObjectId_(objectId)
, TypeId_(typeId)
-{
-
-}
+{}
bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) {
- Y_UNUSED(src);
Scoped_->UseCluster(ServiceId, Cluster);
auto options = FillFeatures(BuildOptions());
auto keys = BuildKeys();
@@ -35,7 +53,17 @@ bool TObjectProcessorImpl::DoInit(TContext& ctx, ISource* src) {
return TAstListNode::DoInit(ctx, src);
}
-INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const {
+TObjectProcessorImpl::TPtr TObjectProcessorImpl::DoClone() const {
+ return {};
+}
+
+TObjectProcessorWithFeatures::TObjectProcessorWithFeatures(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features)
+ : TBase(pos, objectId, typeId, context)
+ , Features_(std::move(features))
+{}
+
+INode::TPtr TObjectProcessorWithFeatures::FillFeatures(INode::TPtr options) const {
if (!Features_.empty()) {
auto features = Y();
for (auto&& i : Features_) {
@@ -47,42 +75,75 @@ INode::TPtr TCreateObject::FillFeatures(INode::TPtr options) const {
}
options->Add(Q(Y(Q("features"), Q(features))));
}
- if (!FeaturesToReset_.empty()) {
- auto reset = Y();
- for (const auto& featureName : FeaturesToReset_) {
- reset->Add(BuildQuotedAtom(Pos_, featureName));
- }
- options->Add(Q(Y(Q("resetFeatures"), Q(reset))));
- }
+
return options;
}
-namespace {
+bool TObjectProcessorWithFeatures::DoInit(TContext& ctx, ISource* src) {
+ if (!InitFeatures(ctx, src, Features_)) {
+ return false;
+ }
-bool InitFeatures(TContext& ctx, ISource* src, const std::map<TString, TDeferredAtom>& features) {
- for (auto& [key, value] : features) {
- if (value.HasNode() && !value.Build()->Init(ctx, src)) {
- return false;
- }
+ return TObjectProcessorImpl::DoInit(ctx, src);
+}
+
+INode::TPtr TCreateObject::BuildOptions() const {
+ TString mode;
+
+ if (ExistingOk_) {
+ mode = "createObjectIfNotExists";
+ } else if (ReplaceIfExists_) {
+ mode = "createObjectOrReplace";
+ } else {
+ mode = "createObject";
}
- return true;
+
+ return Y(Q(Y(Q("mode"), Q(mode))));
}
+TCreateObject::TCreateObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, bool existingOk, bool replaceIfExists)
+ : TBase(pos, objectId, typeId, context, std::move(features))
+ , ExistingOk_(existingOk)
+ , ReplaceIfExists_(replaceIfExists)
+{}
+
+INode::TPtr TUpsertObject::BuildOptions() const {
+ return Y(Q(Y(Q("mode"), Q("upsertObject"))));
}
-bool TCreateObject::DoInit(TContext& ctx, ISource* src) {
- if (!InitFeatures(ctx, src, Features_)) {
- return false;
+TAlterObject::TAlterObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, std::set<TString>&& featuresToReset, bool missingOk)
+ : TBase(pos, objectId, typeId, context, std::move(features))
+ , FeaturesToReset_(std::move(featuresToReset))
+ , MissingOk_(missingOk)
+{}
+
+INode::TPtr TAlterObject::FillFeatures(INode::TPtr options) const {
+ options = TBase::FillFeatures(options);
+
+ if (!FeaturesToReset_.empty()) {
+ auto reset = Y();
+ for (const auto& featureName : FeaturesToReset_) {
+ reset->Add(BuildQuotedAtom(Pos_, featureName));
+ }
+ options->Add(Q(Y(Q("resetFeatures"), Q(reset))));
}
- return TObjectProcessorImpl::DoInit(ctx, src);
+ return options;
}
-TObjectOperatorContext::TObjectOperatorContext(TScopedStatePtr scoped)
- : Scoped_(scoped)
- , ServiceId(Scoped_->CurrService)
- , Cluster(Scoped_->CurrCluster)
-{
-
+INode::TPtr TAlterObject::BuildOptions() const {
+ return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "alterObjectIfExists" : "alterObject"))));
}
+TDropObject::TDropObject(TPosition pos, const TString& objectId, const TString& typeId, const TObjectOperatorContext& context,
+ TFeatureMap&& features, bool missingOk)
+ : TBase(pos, objectId, typeId, context, std::move(features))
+ , MissingOk_(missingOk)
+{}
+
+INode::TPtr TDropObject::BuildOptions() const {
+ return Y(Q(Y(Q("mode"), Q(MissingOk_ ? "dropObjectIfExists" : "dropObject"))));
}
+
+} // NSQLTranslationV1