diff options
| -rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 16 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 85 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 141 | ||||
| -rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
| -rw-r--r-- | ydb/library/yql/sql/v1/SQLv1.g.in | 12 | ||||
| -rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format.cpp | 14 | ||||
| -rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format_ut.cpp | 12 | ||||
| -rw-r--r-- | ydb/library/yql/sql/v1/sql.cpp | 112 | ||||
| -rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 134 |
10 files changed, 517 insertions, 18 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 7bd1a951b62..53afb8e8a7f 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -547,14 +547,18 @@ class TSchemeOpRequestHandler: public TRequestHandlerBase< TEvTxUserProxy::TEvProposeTransactionStatus, IKqpGateway::TGenericResult> { + bool FailedOnAlreadyExists = false; public: using TBase = typename TSchemeOpRequestHandler::TBase; using TRequest = TEvTxUserProxy::TEvProposeTransaction; using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus; using TResult = IKqpGateway::TGenericResult; - TSchemeOpRequestHandler(TRequest* request, TPromise<TResult> promise) - : TBase(request, promise, {}) {} + TSchemeOpRequestHandler(TRequest* request, TPromise<TResult> promise, bool failedOnAlreadyExists) + : TBase(request, promise, {}) + , FailedOnAlreadyExists(failedOnAlreadyExists) + {} + void Bootstrap(const TActorContext& ctx) { TActorId txproxy = MakeTxProxyID(); @@ -604,7 +608,7 @@ public: case TEvTxUserProxy::TResultStatus::ExecComplete: { if (response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusSuccess || - response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists) + (!FailedOnAlreadyExists && response.GetSchemeShardStatus() == NKikimrScheme::EStatus::StatusAlreadyExists)) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, "Successful completion of scheme request" << ", TxId: " << response.GetTxId()); @@ -1392,7 +1396,7 @@ public: NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *schemeTx.MutableCreateExternalDataSource(); FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings); - return SendSchemeRequest(ev.Release()); + return SendSchemeRequest(ev.Release(), true); } catch (yexception& e) { return MakeFuture(ResultFromException<TGenericResult>(e)); @@ -2128,10 +2132,10 @@ private: return promise.GetFuture(); } - TFuture<TGenericResult> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request) + TFuture<TGenericResult> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, bool failedOnAlreadyExists = false) { auto promise = NewPromise<TGenericResult>(); - IActor* requestHandler = new TSchemeOpRequestHandler(request, promise); + IActor* requestHandler = new TSchemeOpRequestHandler(request, promise, failedOnAlreadyExists); RegisterActor(requestHandler); return promise.GetFuture(); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 60e158cbcb1..47ebe3c8ba8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -128,6 +128,33 @@ namespace { return dropGroupSettings; } + TString GetOrDefault(const std::map<TString, TString>& container, const TString& key, const TString& defaultValue = TString{}) { + auto it = container.find(key); + return it == container.end() ? defaultValue : it->second; + } + + TCreateExternalDataSourceSettings ParseCreateExternalDataSourceSettings(const TCreateObjectSettings& settings) { + TCreateExternalDataSourceSettings out; + out.ExternalDataSource = settings.GetObjectId(); + out.SourceType = GetOrDefault(settings.GetFeatures(), "source_type"); + out.AuthMethod = GetOrDefault(settings.GetFeatures(), "auth_method"); + out.Installation = GetOrDefault(settings.GetFeatures(), "installation"); + out.Location = GetOrDefault(settings.GetFeatures(), "location"); + return out; + } + + TAlterExternalDataSourceSettings ParseAlterExternalDataSourceSettings(const TAlterObjectSettings& settings) { + TAlterExternalDataSourceSettings out; + out.ExternalDataSource = settings.GetObjectId(); + return out; + } + + TDropExternalDataSourceSettings ParseDropExternalDataSourceSettings(const TDropObjectSettings& settings) { + TDropExternalDataSourceSettings out; + out.ExternalDataSource = settings.GetObjectId(); + return out; + } + TCreateTableStoreSettings ParseCreateTableStoreSettings(TKiCreateTable create, const TTableSettings& settings) { TCreateTableStoreSettings out; out.TableStore = TString(create.Table()); @@ -395,9 +422,9 @@ template <class TKiObject, class TSettings> class TObjectModifierTransformer { private: TIntrusivePtr<IKikimrGateway> Gateway; - TIntrusivePtr<TKikimrSessionContext> SessionCtx; TString ActionInfo; protected: + TIntrusivePtr<TKikimrSessionContext> SessionCtx; virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TSettings& settings) = 0; TIntrusivePtr<IKikimrGateway> GetGateway() const { return Gateway; @@ -405,8 +432,8 @@ protected: public: TObjectModifierTransformer(const TString& actionInfo, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx) : Gateway(gateway) - , SessionCtx(sessionCtx) , ActionInfo(actionInfo) + , SessionCtx(sessionCtx) { } @@ -471,6 +498,48 @@ public: using TBase::TBase; }; +class TCreateExternalDataSourceTransformer: public TObjectModifierTransformer<TKiCreateObject, TCreateObjectSettings> { +private: + using TBase = TObjectModifierTransformer<TKiCreateObject, TCreateObjectSettings>; +protected: + virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TCreateObjectSettings& settings) override { + if (!SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) { + return MakeErrorFuture<IKikimrGateway::TGenericResult>(std::make_exception_ptr(yexception() << "External data sources are disabled. Please contact your system administrator to enable it")); + } + return GetGateway()->CreateExternalDataSource(cluster, ParseCreateExternalDataSourceSettings(settings), true); + } +public: + using TBase::TBase; +}; + +class TAlterExternalDataSourceTransformer: public TObjectModifierTransformer<TKiAlterObject, TAlterObjectSettings> { +private: + using TBase = TObjectModifierTransformer<TKiAlterObject, TAlterObjectSettings>; +protected: + virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TAlterObjectSettings& settings) override { + if (!SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) { + return MakeErrorFuture<IKikimrGateway::TGenericResult>(std::make_exception_ptr(yexception() << "External data sources are disabled. Please contact your system administrator to enable it")); + } + return GetGateway()->AlterExternalDataSource(cluster, ParseAlterExternalDataSourceSettings(settings)); + } +public: + using TBase::TBase; +}; + +class TDropExternalDataSourceTransformer: public TObjectModifierTransformer<TKiDropObject, TDropObjectSettings> { +private: + using TBase = TObjectModifierTransformer<TKiDropObject, TDropObjectSettings>; +protected: + virtual TFuture<IKikimrGateway::TGenericResult> DoExecute(const TString& cluster, const TDropObjectSettings& settings) override { + if (!SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) { + return MakeErrorFuture<IKikimrGateway::TGenericResult>(std::make_exception_ptr(yexception() << "External data sources are disabled. Please contact your system administrator to enable it")); + } + return GetGateway()->DropExternalDataSource(cluster, ParseDropExternalDataSourceSettings(settings)); + } +public: + using TBase::TBase; +}; + class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKiSinkCallableExecutionTransformer> { public: TKiSinkCallableExecutionTransformer( @@ -1149,15 +1218,21 @@ public: } if (auto kiObject = TMaybeNode<TKiCreateObject>(input)) { - return TCreateObjectTransformer("CREATE OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx); + return kiObject.Cast().TypeId() == "EXTERNAL_DATA_SOURCE" + ? TCreateExternalDataSourceTransformer("CREATE EXTERNAL DATA SOURCE", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx) + : TCreateObjectTransformer("CREATE OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx); } if (auto kiObject = TMaybeNode<TKiAlterObject>(input)) { - return TAlterObjectTransformer("ALTER OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx); + return kiObject.Cast().TypeId() == "EXTERNAL_DATA_SOURCE" + ? TAlterExternalDataSourceTransformer("ALTER EXTERNAL DATA SOURCE", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx) + : TAlterObjectTransformer("ALTER OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx); } if (auto kiObject = TMaybeNode<TKiDropObject>(input)) { - return TDropObjectTransformer("DROP OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx); + return kiObject.Cast().TypeId() == "EXTERNAL_DATA_SOURCE" + ? TDropExternalDataSourceTransformer("DROP EXTERNAL DATA SOURCE", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx) + : TDropObjectTransformer("DROP OBJECT", Gateway, SessionCtx).Execute(kiObject.Cast(), input, ctx); } if (auto maybeCreateGroup = TMaybeNode<TKiCreateGroup>(input)) { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 62e117c6a44..bb7aa9ce1de 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -335,10 +335,10 @@ void TestDropExternalDataSource(TTestActorRuntime& runtime, TIntrusivePtr<IKikim response.Issues().PrintTo(Cerr); UNIT_ASSERT(response.Success()); - auto externalTableDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); - const auto& externalTable = externalTableDesc->ResultSet.at(0); - UNIT_ASSERT_EQUAL(externalTableDesc->ErrorCount, 1); - UNIT_ASSERT_EQUAL(externalTable.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown); + auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalDataSourceDesc->ErrorCount, 1); + UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown); } } // namespace diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 64b117034f1..003d2d903b5 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -3595,6 +3595,147 @@ Y_UNIT_TEST_SUITE(KqpScheme) { session.Close().GetValueSync(); } + + Y_UNIT_TEST(CreateExternalDataSource) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), externalDataSourceName, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource); + UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetInstallation(), ""); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), "my-bucket"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetName(), SplitPath(externalDataSourceName).back()); + UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo->Description.GetAuth().HasNone()); + } + + Y_UNIT_TEST(DisableCreateExternalDataSource) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::INTERNAL_ERROR); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External data sources are disabled. Please contact your system administrator to enable it"); + } + + Y_UNIT_TEST(CreateExternalDataSourceValidation) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="UNKNOWN" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Authorization method not specified"); + } + + Y_UNIT_TEST(DropExternalDataSource) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + { + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto query = TStringBuilder() << R"( DROP EXTERNAL DATA SOURCE `)" << externalDataSourceName << "`"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), externalDataSourceName, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalDataSourceDesc->ErrorCount, 1); + UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown); + } + + Y_UNIT_TEST(DisableDropExternalDataSource) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( DROP EXTERNAL DATA SOURCE `)" << externalDataSourceName << "`"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "External data sources are disabled. Please contact your system administrator to enable it"); + } + + Y_UNIT_TEST(DoubleCreateExternalDataSource) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + { + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + auto externalDataSourceDesc = Navigate(runtime, runtime.AllocateEdgeActor(), externalDataSourceName, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown); + const auto& externalDataSource = externalDataSourceDesc->ResultSet.at(0); + UNIT_ASSERT_EQUAL(externalDataSource.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource); + UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetSourceType(), "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetInstallation(), ""); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetLocation(), "my-bucket"); + UNIT_ASSERT_VALUES_EQUAL(externalDataSource.ExternalDataSourceInfo->Description.GetName(), SplitPath(externalDataSourceName).back()); + UNIT_ASSERT(externalDataSource.ExternalDataSourceInfo->Description.GetAuth().HasNone()); + } + + { + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Check failed: path: '/Root/ExternalDataSource', error: path exist"); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 05bd98b896b..5fb6fe42db8 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -769,6 +769,7 @@ message TFeatureFlags { optional bool EnableDataShardVolatileTransactions = 85 [default = false]; optional bool EnableTopicServiceTx = 86 [default = false]; optional bool EnableLLVMCache = 87 [default = false]; + optional bool EnableExternalDataSources = 88 [default = false]; } diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in index 21ffa80530b..124c48827f9 100644 --- a/ydb/library/yql/sql/v1/SQLv1.g.in +++ b/ydb/library/yql/sql/v1/SQLv1.g.in @@ -47,6 +47,8 @@ sql_stmt_core: | create_object_stmt | alter_object_stmt | drop_object_stmt + | create_external_data_source_stmt + | drop_external_data_source_stmt ; expr: @@ -453,6 +455,12 @@ values_source_row: LPAREN expr_list RPAREN; simple_values_source: expr_list | select_stmt; +create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE object_ref + with_table_settings +; + +drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE object_ref; + create_object_stmt: CREATE OBJECT object_ref LPAREN TYPE object_type_ref RPAREN create_object_features? @@ -917,6 +925,7 @@ keyword_compat: ( | CREATE | CROSS | CURRENT + | DATA | DATABASE | DECIMAL | DECLARE @@ -1022,6 +1031,7 @@ keyword_compat: ( | SAVEPOINT | SEMI | SETS + | SOURCE | SUBQUERY | SYMBOLS | SYNC @@ -1195,6 +1205,7 @@ CURRENT: C U R R E N T; CURRENT_TIME: C U R R E N T '_' T I M E; CURRENT_DATE: C U R R E N T '_' D A T E; CURRENT_TIMESTAMP: C U R R E N T '_' T I M E S T A M P; +DATA: D A T A; DATABASE: D A T A B A S E; DECIMAL: D E C I M A L; DECLARE: D E C L A R E; @@ -1336,6 +1347,7 @@ SELECT: S E L E C T; SEMI: S E M I; SET: S E T; SETS: S E T S; +SOURCE: S O U R C E; STREAM: S T R E A M; STRUCT: S T R U C T; SUBQUERY: S U B Q U E R Y; diff --git a/ydb/library/yql/sql/v1/format/sql_format.cpp b/ydb/library/yql/sql/v1/format/sql_format.cpp index fbe07af20d7..eabca3cec70 100644 --- a/ydb/library/yql/sql/v1/format/sql_format.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format.cpp @@ -988,6 +988,18 @@ private: VisitAllFields(TRule_drop_object_stmt::GetDescriptor(), msg); } + void VisitCreateExternalDataSource(const TRule_create_external_data_source_stmt& msg) { + PosFromToken(msg.GetToken1()); + NewLine(); + VisitAllFields(TRule_create_external_data_source_stmt::GetDescriptor(), msg); + } + + void VisitDropExternalDataSource(const TRule_drop_external_data_source_stmt& msg) { + PosFromToken(msg.GetToken1()); + NewLine(); + VisitAllFields(TRule_drop_external_data_source_stmt::GetDescriptor(), msg); + } + void VisitAllFields(const NProtoBuf::Descriptor* descr, const NProtoBuf::Message& msg) { for (int i = 0; i < descr->field_count(); ++i) { const NProtoBuf::FieldDescriptor* fd = descr->field(i); @@ -1845,6 +1857,8 @@ TStaticData::TStaticData() {TRule_create_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateObject)}, {TRule_alter_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitAlterObject)}, {TRule_drop_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropObject)}, + {TRule_create_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateExternalDataSource)}, + {TRule_drop_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropExternalDataSource)}, }) { // ensure that all statements has a visitor diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index 53e82aa2ea5..2866b0d4528 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -273,6 +273,18 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { setup.Run(cases); } + Y_UNIT_TEST(ExternalDataSourceOperations) { + TCases cases = { + {"creAte exTernAl daTa SouRce usEr With (a = \"b\")", + "CREATE EXTERNAL DATA SOURCE usEr WITH (a = \"b\");\n"}, + {"dRop exTerNal Data SouRce usEr", + "DROP EXTERNAL DATA SOURCE usEr;\n"}, + }; + + TSetup setup; + setup.Run(cases); + } + Y_UNIT_TEST(TypeSelection) { TCases cases = { {"Select tYpe.* frOm Table tYpe", diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index d1fba615c2c..8fd084af800 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -28,6 +28,7 @@ #include <library/cpp/json/json_reader.h> #include <ydb/library/yql/utils/utf8.h> +#include <ydb/library/yql/utils/yql_paths.h> #include <google/protobuf/repeated_field.h> @@ -818,6 +819,7 @@ protected: bool alter, bool reset); bool StoreTableSettingsEntry(const TIdentifier& id, const TRule_table_setting_value& value, TTableSettings& settings, bool alter = false); + bool StoreDataSourceSettingsEntry(const TIdentifier& id, const TRule_table_setting_value* value, std::map<TString, TDeferredAtom>& result); bool ResetTableSettingsEntry(const TIdentifier& id, TTableSettings& settings); TNodePtr TypeSimple(const TRule_type_name_simple& node, bool onlyDataAllowed); @@ -849,6 +851,7 @@ protected: bool BindParameterClause(const TRule_bind_parameter& node, TDeferredAtom& result); bool ObjectFeatureValueClause(const TRule_object_feature_value & node, TDeferredAtom & result); bool ParseObjectFeatures(std::map<TString, TDeferredAtom> & result, const TRule_object_features & features); + bool ParseExternalDataSourceSettings(std::map<TString, TDeferredAtom> & result, const TRule_with_table_settings & settings); bool RoleNameClause(const TRule_role_name& node, TDeferredAtom& result, bool allowSystemRoles); bool RoleParameters(const TRule_create_user_option& node, TRoleParameters& result) ; private: @@ -2048,6 +2051,26 @@ namespace { return true; } + bool StoreString(const TRule_table_setting_value& from, TDeferredAtom& to, TContext& ctx, const TString& errorPrefix = {}) { + switch (from.Alt_case()) { + case TRule_table_setting_value::kAltTableSettingValue2: { + // STRING_VALUE + const TString stringValue(ctx.Token(from.GetAlt_table_setting_value2().GetToken1())); + auto unescaped = StringContent(ctx, ctx.Pos(), stringValue); + if (!unescaped) { + ctx.Error() << errorPrefix << " value cannot be unescaped"; + return false; + } + to = TDeferredAtom(ctx.Pos(), unescaped->Content); + break; + } + default: + ctx.Error() << errorPrefix << " value should be a string literal"; + return false; + } + return true; + } + bool StoreInt(const TRule_table_setting_value& from, TNodePtr& to, TContext& ctx) { switch (from.Alt_case()) { case TRule_table_setting_value::kAltTableSettingValue3: { @@ -9191,7 +9214,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } case TRule_sql_stmt_core::kAltSqlStmtCore27: { - // create_object_stmt: ALTER OBJECT name (TYPE type [SET k=v,...]); + // alter_object_stmt: ALTER OBJECT name (TYPE type [SET k=v,...]); auto& node = core.GetAlt_sql_stmt_core27().GetRule_alter_object_stmt1(); TObjectOperatorContext context(Ctx.Scoped); if (node.GetRule_object_ref3().HasBlock1()) { @@ -9213,7 +9236,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } case TRule_sql_stmt_core::kAltSqlStmtCore28: { - // create_object_stmt: DROP OBJECT name (TYPE type [WITH k=v,...]); + // drop_object_stmt: DROP OBJECT name (TYPE type [WITH k=v,...]); auto& node = core.GetAlt_sql_stmt_core28().GetRule_drop_object_stmt1(); TObjectOperatorContext context(Ctx.Scoped); if (node.GetRule_object_ref3().HasBlock1()) { @@ -9235,6 +9258,43 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context)); break; } + case TRule_sql_stmt_core::kAltSqlStmtCore29: + { + // create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE name WITH (k=v,...); + auto& node = core.GetAlt_sql_stmt_core29().GetRule_create_external_data_source_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref5().HasBlock1()) { + if (!ClusterExpr(node.GetRule_object_ref5().GetBlock1().GetRule_cluster_expr1(), + false, context.ServiceId, context.Cluster)) { + return false; + } + } + + const TString& objectId = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second; + std::map<TString, TDeferredAtom> kv; + if (!ParseExternalDataSourceSettings(kv, node.GetRule_with_table_settings6())) { + return false; + } + + AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", std::move(kv), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore30: + { + // drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE name; + auto& node = core.GetAlt_sql_stmt_core30().GetRule_drop_external_data_source_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref5().HasBlock1()) { + if (!ClusterExpr(node.GetRule_object_ref5().GetBlock1().GetRule_cluster_expr1(), + false, context.ServiceId, context.Cluster)) { + return false; + } + } + + const TString& objectId = Id(node.GetRule_object_ref5().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", {}, context)); + break; + } default: Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName); AltNotImplemented("sql_stmt_core", core); @@ -10799,4 +10859,52 @@ bool TSqlTranslation::ParseObjectFeatures(std::map<TString, TDeferredAtom>& resu return true; } +bool TSqlTranslation::StoreDataSourceSettingsEntry(const TIdentifier& id, const TRule_table_setting_value* value, std::map<TString, TDeferredAtom>& result) { + YQL_ENSURE(value); + + const TString key = to_lower(id.Name); + if (result.find(key) != result.end()) { + Ctx.Error() << to_upper(key) << " duplicate keys"; + return false; + } + + if (IsIn({"source_type", "installation", "location", "auth_method"}, key)) { + if (!StoreString(*value, result[key], Ctx, to_upper(key))) { + return false; + } + } else { + Ctx.Error() << "Unknown external data source setting: " << id.Name; + return false; + } + return true; +} + +bool TSqlTranslation::ParseExternalDataSourceSettings(std::map<TString, TDeferredAtom>& result, const TRule_with_table_settings& settingsNode) { + const auto& firstEntry = settingsNode.GetRule_table_settings_entry3(); + if (!StoreDataSourceSettingsEntry(IdEx(firstEntry.GetRule_an_id1(), *this), &firstEntry.GetRule_table_setting_value3(), + result)) { + return false; + } + for (auto& block : settingsNode.GetBlock4()) { + const auto& entry = block.GetRule_table_settings_entry2(); + if (!StoreDataSourceSettingsEntry(IdEx(entry.GetRule_an_id1(), *this), &entry.GetRule_table_setting_value3(), result)) { + return false; + } + } + + if (result.find("source_type") == result.end()) { + Ctx.Error() << "SOURCE_TYPE requires key"; + return false; + } + if (result.find("auth_method") == result.end()) { + Ctx.Error() << "AUTH_METHOD requires key"; + return false; + } + if (result.find("installation") == result.end() && result.find("location") == result.end()) { + Ctx.Error() << "INSTALLATION or LOCATION must be specified"; + return false; + } + return true; +} + } // namespace NSQLTranslationV1 diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index ce96a14d2d5..e6fe98f3d9c 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -751,7 +751,6 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { if (word == "Write") { - Cerr << line << Endl; UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("'features")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("'\"OVERRIDE\"")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropObject")); @@ -4703,4 +4702,137 @@ Y_UNIT_TEST_SUITE(ExternalDeclares) { UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:1:15: Error: Selecting data from monitoring source is not supported\n"); } + + Y_UNIT_TEST(CreateExternalDataSource) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + ); + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"NONE") '('"location" '"my-bucket") '('"source_type" '"ObjectStorage"))#"); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(CreateExternalDataSourceWithTablePrefix) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + pragma TablePathPrefix='/aba'; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + ); + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyDataSource"); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(CreateExternalDataSourceWithBadArguments) { + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource; + )" , "<main>:3:56: Error: Unexpected token ';' : syntax error...\n\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + LOCATION="my-bucket", + AUTH_METHOD="NONE" + ); + )" , "<main>:5:33: Error: SOURCE_TYPE requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="ObjectStorage", + AUTH_METHOD="NONE" + ); + )" , "<main>:5:33: Error: INSTALLATION or LOCATION must be specified\n"); + + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket" + ); + )" , "<main>:5:30: Error: AUTH_METHOD requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE", + OTHER="VALUE" + ); + )" , "<main>:7:21: Error: Unknown external data source setting: OTHER\n"); + } + + Y_UNIT_TEST(DropExternalDataSourceWithTablePrefix) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + DROP EXTERNAL DATA SOURCE MyDataSource; + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(DropExternalDataSource) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + pragma TablePathPrefix='/aba'; + DROP EXTERNAL DATA SOURCE MyDataSource; + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, "/aba/MyDataSource"); + UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("'features")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } } |
