diff options
| author | hcpp <[email protected]> | 2023-08-16 14:55:34 +0300 |
|---|---|---|
| committer | hcpp <[email protected]> | 2023-08-16 16:31:15 +0300 |
| commit | 22d756319c13b10fe917926367e9919ac205f05e (patch) | |
| tree | d126e327a136dc26eb0281b30ef3335ab7a86b64 | |
| parent | 397cfdba67f6838930f18cc7e6f029ed10beb2a2 (diff) | |
new sources and auth methods have been added
Source Types: ClickHouse, Postgres
Auth method: Basic, MdbBasic, Aws
RFC: https://wiki.yandex-team.ru/users/hcpp/externaltable-i-datasource-ddl/#auth
27 files changed, 693 insertions, 156 deletions
diff --git a/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt b/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt index be0a88b0693..e5907534ca4 100644 --- a/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC cpp-client-ydb_value ) target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp ) diff --git a/ydb/core/external_sources/CMakeLists.linux-aarch64.txt b/ydb/core/external_sources/CMakeLists.linux-aarch64.txt index 3aa55187905..0360f78d0aa 100644 --- a/ydb/core/external_sources/CMakeLists.linux-aarch64.txt +++ b/ydb/core/external_sources/CMakeLists.linux-aarch64.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC cpp-client-ydb_value ) target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp ) diff --git a/ydb/core/external_sources/CMakeLists.linux-x86_64.txt b/ydb/core/external_sources/CMakeLists.linux-x86_64.txt index 3aa55187905..0360f78d0aa 100644 --- a/ydb/core/external_sources/CMakeLists.linux-x86_64.txt +++ b/ydb/core/external_sources/CMakeLists.linux-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC cpp-client-ydb_value ) target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp ) diff --git a/ydb/core/external_sources/CMakeLists.windows-x86_64.txt b/ydb/core/external_sources/CMakeLists.windows-x86_64.txt index be0a88b0693..e5907534ca4 100644 --- a/ydb/core/external_sources/CMakeLists.windows-x86_64.txt +++ b/ydb/core/external_sources/CMakeLists.windows-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-core-external_sources PUBLIC cpp-client-ydb_value ) target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp ) diff --git a/ydb/core/external_sources/external_data_source.cpp b/ydb/core/external_sources/external_data_source.cpp new file mode 100644 index 00000000000..f0e11708535 --- /dev/null +++ b/ydb/core/external_sources/external_data_source.cpp @@ -0,0 +1,44 @@ +#include "external_data_source.h" + +namespace NKikimr::NExternalSource { + +namespace { + +struct TExternalDataSource : public IExternalSource { + TExternalDataSource(const TString& name, const TVector<TString>& authMethods) + : Name(name) + , AuthMethods(authMethods) + {} + + virtual TString Pack(const NKikimrExternalSources::TSchema&, + const NKikimrExternalSources::TGeneral&) const override { + ythrow TExternalSourceException() << "Only external table supports pack operation"; + } + + virtual TString GetName() const override { + return Name; + } + + virtual bool HasExternalTable() const override { + return false; + } + + virtual TVector<TString> GetAuthMethods() const override { + return AuthMethods; + } + + virtual TMap<TString, TVector<TString>> GetParameters(const TString&) const override { + ythrow TExternalSourceException() << "Only external table supports parameters"; + } +private: + const TString Name; + const TVector<TString> AuthMethods; +}; + +} + +IExternalSource::TPtr CreateExternalDataSource(const TString& name, const TVector<TString>& authMethods) { + return MakeIntrusive<TExternalDataSource>(name, authMethods); +} + +} diff --git a/ydb/core/external_sources/external_data_source.h b/ydb/core/external_sources/external_data_source.h new file mode 100644 index 00000000000..32034bfa1ec --- /dev/null +++ b/ydb/core/external_sources/external_data_source.h @@ -0,0 +1,9 @@ +#pragma once + +#include "external_source.h" + +namespace NKikimr::NExternalSource { + +IExternalSource::TPtr CreateExternalDataSource(const TString& name, const TVector<TString>& authMethods); + +} diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h index 60d1e6a067d..4b8a05e18a8 100644 --- a/ydb/core/external_sources/external_source.h +++ b/ydb/core/external_sources/external_source.h @@ -1,6 +1,8 @@ #pragma once +#include <util/generic/map.h> #include <util/generic/string.h> + #include <ydb/core/protos/external_sources.pb.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -23,12 +25,22 @@ struct IExternalSource : public TThrRefBase { const NKikimrExternalSources::TGeneral& general) const = 0; /* + If this source supports external table than this method will return true + */ + virtual bool HasExternalTable() const = 0; + + /* The name of the data source that is used inside the - implementation during the read phase. Must match provider name. + implementation during the read/write phase. Must match provider name. */ virtual TString GetName() const = 0; /* + List of auth methods supported by the source + */ + virtual TVector<TString> GetAuthMethods() const = 0; + + /* At the input, a string with the name of the content is passed, which is obtained from the Pack method and returns a list of parameters that will be put in the AST of the source. Also, diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index 4bc78d4d9b1..78ff46f2778 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -1,8 +1,11 @@ #include "external_source_factory.h" #include "object_storage.h" +#include "external_data_source.h" #include <util/generic/map.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> + namespace NKikimr::NExternalSource { @@ -29,7 +32,9 @@ private: IExternalSourceFactory::TPtr CreateExternalSourceFactory() { return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{ - {"ObjectStorage", CreateObjectStorageExternalSource()} + {"ObjectStorage", CreateObjectStorageExternalSource()}, + {"ClickHouse", CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"})}, + {"PostgreSQL", CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"})} }); } diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 000ec785ec6..4029c4753e5 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -40,7 +40,7 @@ struct TObjectStorageExternalSource : public IExternalSource { } if (auto issues = Validate(schema, objectStorage)) { - ythrow TExternalSourceException() << issues.ToString() << Endl; + ythrow TExternalSourceException() << issues.ToString(); } return objectStorage.SerializeAsString(); @@ -50,6 +50,14 @@ struct TObjectStorageExternalSource : public IExternalSource { return TString{NYql::S3ProviderName}; } + virtual bool HasExternalTable() const override { + return true; + } + + virtual TVector<TString> GetAuthMethods() const override { + return {"NONE", "SERVICE_ACCOUNT", "AWS"}; + } + virtual TMap<TString, TVector<TString>> GetParameters(const TString& content) const override { NKikimrExternalSources::TObjectStorage objectStorage; objectStorage.ParseFromStringOrThrow(content); diff --git a/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt index 63e20e82072..dc0d7a903f8 100644 --- a/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt @@ -29,6 +29,7 @@ target_link_options(ydb-core-external_sources-ut PRIVATE ) target_sources(ydb-core-external_sources-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ) set_property( TARGET diff --git a/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt b/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt index 39dedfcb235..41ec23c6cb2 100644 --- a/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt @@ -32,6 +32,7 @@ target_link_options(ydb-core-external_sources-ut PRIVATE ) target_sources(ydb-core-external_sources-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ) set_property( TARGET diff --git a/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt b/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt index 4d47be66e2a..aac17202605 100644 --- a/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt @@ -33,6 +33,7 @@ target_link_options(ydb-core-external_sources-ut PRIVATE ) target_sources(ydb-core-external_sources-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ) set_property( TARGET diff --git a/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt b/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt index 2cdeb163665..84ebe035212 100644 --- a/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-external_sources-ut PUBLIC ) target_sources(ydb-core-external_sources-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_data_source.cpp ) set_property( TARGET diff --git a/ydb/core/external_sources/ut/ya.make b/ydb/core/external_sources/ut/ya.make index 66515b54f3f..d3dc4ac63a2 100644 --- a/ydb/core/external_sources/ut/ya.make +++ b/ydb/core/external_sources/ut/ya.make @@ -7,6 +7,7 @@ PEERDIR( SRCS( object_storage_ut.cpp + external_data_source.cpp ) END() diff --git a/ydb/core/external_sources/ya.make b/ydb/core/external_sources/ya.make index 27cc4665082..0fb9c842e76 100644 --- a/ydb/core/external_sources/ya.make +++ b/ydb/core/external_sources/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + external_data_source.cpp external_source_factory.cpp object_storage.cpp ) diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h index 04e8d125679..7df524a453b 100644 --- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h +++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h @@ -113,23 +113,23 @@ private: TActorId ActorId; }; -struct TDescribeObjectResponse { - TDescribeObjectResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) +struct TDescribeSecretsResponse { + TDescribeSecretsResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Status(status) , Issues(std::move(issues)) {} - TDescribeObjectResponse(const TString& secretValue) - : SecretValue(secretValue) + TDescribeSecretsResponse(const TVector<TString>& secretValues) + : SecretValues(secretValues) , Status(Ydb::StatusIds::SUCCESS) {} - TString SecretValue; + TVector<TString> SecretValues; Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; -class TDescribeObjectActor: public NActors::TActorBootstrapped<TDescribeObjectActor> { +class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> { STRICT_STFUNC(StateFunc, hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); ) @@ -138,15 +138,19 @@ class TDescribeObjectActor: public NActors::TActorBootstrapped<TDescribeObjectAc Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>(); - TString secretValue; - bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(SecretId), secretValue); - - if (isFound) { - Promise.SetValue(TDescribeObjectResponse(secretValue)); - } else { - Promise.SetValue(TDescribeObjectResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + SecretId.GetSecretId() + "' not found") })); + TVector<TString> secretValues; + secretValues.reserve(SecretIds.size()); + for (const auto& secretId: SecretIds) { + TString secretValue; + const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue); + if (!isFound) { + Promise.SetValue(TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") })); + PassAway(); + return; + } + secretValues.push_back(secretValue); } - + Promise.SetValue(TDescribeSecretsResponse(secretValues)); PassAway(); } @@ -155,26 +159,34 @@ class TDescribeObjectActor: public NActors::TActorBootstrapped<TDescribeObjectAc } public: - TDescribeObjectActor(const TString& ownerUserId, const TString& secretId, NThreading::TPromise<TDescribeObjectResponse> promise) - : SecretId(ownerUserId, secretId) + TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise) + : SecretIds(CreateSecretIds(ownerUserId, secretIds)) , Promise(promise) {} void Bootstrap() { if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { - Promise.SetValue(TDescribeObjectResponse(Ydb::StatusIds::INTERNAL_ERROR, { NYql::TIssue("metadata service is not active") })); + Promise.SetValue(TDescribeSecretsResponse(Ydb::StatusIds::INTERNAL_ERROR, { NYql::TIssue("metadata service is not active") })); PassAway(); return; } this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); + Become(&TDescribeSecretsActor::StateFunc); + } - Become(&TDescribeObjectActor::StateFunc); +private: + static TVector<NMetadata::NSecret::TSecretId> CreateSecretIds(const TString& ownerUserId, const TVector<TString>& secretIds) { + TVector<NMetadata::NSecret::TSecretId> result; + for (const auto& secretId: secretIds) { + result.emplace_back(ownerUserId, secretId); + } + return result; } private: - const NMetadata::NSecret::TSecretId SecretId; - NThreading::TPromise<TDescribeObjectResponse> Promise; + const TVector<NMetadata::NSecret::TSecretId> SecretIds; + NThreading::TPromise<TDescribeSecretsResponse> Promise; }; } diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index 2dfc1699147..c852f596f50 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -13,26 +13,42 @@ namespace NKikimr::NKqp { namespace { -TString GetOrDefault(const NYql::TCreateObjectSettings& container, const TString& key, const TString& defaultValue = TString{}) { +TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString& key) { auto fValue = container.GetFeaturesExtractor().Extract(key); - return fValue ? *fValue : defaultValue; + return fValue ? *fValue : TString{}; } void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescription& externaDataSourceDesc, const TString& name, const NYql::TCreateObjectSettings& settings) { externaDataSourceDesc.SetName(name); - externaDataSourceDesc.SetSourceType(GetOrDefault(settings, "source_type")); - externaDataSourceDesc.SetLocation(GetOrDefault(settings, "location")); - externaDataSourceDesc.SetInstallation(GetOrDefault(settings, "installation")); + externaDataSourceDesc.SetSourceType(GetOrEmpty(settings, "source_type")); + externaDataSourceDesc.SetLocation(GetOrEmpty(settings, "location")); + externaDataSourceDesc.SetInstallation(GetOrEmpty(settings, "installation")); - TString authMethod = GetOrDefault(settings, "auth_method"); + TString authMethod = GetOrEmpty(settings, "auth_method"); if (authMethod == "NONE") { externaDataSourceDesc.MutableAuth()->MutableNone(); } else if (authMethod == "SERVICE_ACCOUNT") { auto& sa = *externaDataSourceDesc.MutableAuth()->MutableServiceAccount(); - sa.SetId(GetOrDefault(settings, "service_account_id")); - sa.SetSecretName(GetOrDefault(settings, "service_account_secret_name")); + sa.SetId(GetOrEmpty(settings, "service_account_id")); + sa.SetSecretName(GetOrEmpty(settings, "service_account_secret_name")); + } else if (authMethod == "BASIC") { + auto& basic = *externaDataSourceDesc.MutableAuth()->MutableBasic(); + basic.SetLogin(GetOrEmpty(settings, "login")); + basic.SetPasswordSecretName(GetOrEmpty(settings, "password_secret_name")); + } else if (authMethod == "MDB_BASIC") { + auto& mdbBasic = *externaDataSourceDesc.MutableAuth()->MutableMdbBasic(); + mdbBasic.SetServiceAccountId(GetOrEmpty(settings, "service_account_id")); + mdbBasic.SetServiceAccountSecretName(GetOrEmpty(settings, "service_account_secret_name")); + mdbBasic.SetLogin(GetOrEmpty(settings, "login")); + mdbBasic.SetPasswordSecretName(GetOrEmpty(settings, "password_secret_name")); + } else if (authMethod == "AWS") { + auto& aws = *externaDataSourceDesc.MutableAuth()->MutableAws(); + aws.SetAwsAccessKeyIdSecretName(GetOrEmpty(settings, "aws_access_key_id_secret_name")); + aws.SetAwsSecretAccessKeySecretName(GetOrEmpty(settings, "aws_secret_access_key_secret_name")); + } else { + ythrow yexception() << "Internal error. Unknown auth method: " << authMethod; } } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 143cdb270b3..72220b0bf8e 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -290,7 +290,6 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache return result; } - TTableMetadataResult EnrichExternalTable(const TTableMetadataResult& externalTable, const TTableMetadataResult& externalDataSource) { TTableMetadataResult result; if (!externalTable.Success()) { @@ -327,30 +326,107 @@ void UpdateMetadataIfSuccess(NYql::TKikimrTableMetadataPtr ptr, size_t idx, cons } -void UpdateExternalDataSourceSecretValue(TTableMetadataResult& externalDataSourceMetadata, const TDescribeObjectResponse& objectDescription) { +void SetError(TTableMetadataResult& externalDataSourceMetadata, const TString& error) { + externalDataSourceMetadata.AddIssues({ NYql::TIssue(error) }); + externalDataSourceMetadata.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::BAD_REQUEST)); +} + +void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSourceMetadata, const TDescribeSecretsResponse& objectDescription) { if (objectDescription.Status != Ydb::StatusIds::SUCCESS) { externalDataSourceMetadata.AddIssues(objectDescription.Issues); externalDataSourceMetadata.SetStatus(NYql::YqlStatusFromYdbStatus(objectDescription.Status)); } else { - externalDataSourceMetadata.Metadata->ExternalSource.ServiceAccountIdSignature = objectDescription.SecretValue; + const auto& authDescription = externalDataSourceMetadata.Metadata->ExternalSource.DataSourceAuth; + switch (authDescription.identity_case()) { + case NKikimrSchemeOp::TAuth::kServiceAccount: { + if (objectDescription.SecretValues.size() != 1) { + SetError(externalDataSourceMetadata, TStringBuilder{} << "Service account auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 1"); + return; + } + externalDataSourceMetadata.Metadata->ExternalSource.ServiceAccountIdSignature = objectDescription.SecretValues[0]; + return; + } + + case NKikimrSchemeOp::TAuth::kNone: { + if (objectDescription.SecretValues.size() != 0) { + SetError(externalDataSourceMetadata, TStringBuilder{} << "None auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 0"); + return; + } + return; + } + + case NKikimrSchemeOp::TAuth::kBasic: { + if (objectDescription.SecretValues.size() != 1) { + SetError(externalDataSourceMetadata, TStringBuilder{} << "Basic auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 1"); + return; + } + externalDataSourceMetadata.Metadata->ExternalSource.Password = objectDescription.SecretValues[0]; + return; + } + case NKikimrSchemeOp::TAuth::kMdbBasic: { + if (objectDescription.SecretValues.size() != 2) { + SetError(externalDataSourceMetadata, TStringBuilder{} << "Mdb basic auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 2"); + return; + } + externalDataSourceMetadata.Metadata->ExternalSource.ServiceAccountIdSignature = objectDescription.SecretValues[0]; + externalDataSourceMetadata.Metadata->ExternalSource.Password = objectDescription.SecretValues[1]; + return; + } + case NKikimrSchemeOp::TAuth::kAws: { + if (objectDescription.SecretValues.size() != 2) { + SetError(externalDataSourceMetadata, TStringBuilder{} << "Aws auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 2"); + return; + } + externalDataSourceMetadata.Metadata->ExternalSource.AwsAccessKeyId = objectDescription.SecretValues[0]; + externalDataSourceMetadata.Metadata->ExternalSource.AwsSecretAccessKey = objectDescription.SecretValues[1]; + return; + } + case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: { + SetError(externalDataSourceMetadata, "identity case is not specified in case of update external data source secrets"); + return; + } + } } } -NThreading::TFuture<TDescribeObjectResponse> LoadExternalDataSourceSecretValue(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TActorSystem* actorSystem) { +NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TActorSystem* actorSystem) { const auto& authDescription = entry.ExternalDataSourceInfo->Description.GetAuth(); switch (authDescription.identity_case()) { case NKikimrSchemeOp::TAuth::kServiceAccount: { - const TString& secretId = authDescription.GetServiceAccount().GetSecretName(); - auto promise = NewPromise<TDescribeObjectResponse>(); - actorSystem->Register(new TDescribeObjectActor(userToken ? userToken->GetUserSID() : "", secretId, promise)); + const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName(); + auto promise = NewPromise<TDescribeSecretsResponse>(); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise)); return promise.GetFuture(); } case NKikimrSchemeOp::TAuth::kNone: - return MakeFuture(TDescribeObjectResponse("")); + return MakeFuture(TDescribeSecretsResponse({})); + + case NKikimrSchemeOp::TAuth::kBasic: { + const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName(); + auto promise = NewPromise<TDescribeSecretsResponse>(); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise)); + return promise.GetFuture(); + } + + case NKikimrSchemeOp::TAuth::kMdbBasic: { + const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName(); + const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName(); + auto promise = NewPromise<TDescribeSecretsResponse>(); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise)); + return promise.GetFuture(); + } + + case NKikimrSchemeOp::TAuth::kAws: { + const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName(); + const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName(); + auto promise = NewPromise<TDescribeSecretsResponse>(); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise)); + return promise.GetFuture(); + } case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: - return MakeFuture(TDescribeObjectResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") })); + return MakeFuture(TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") })); } } @@ -622,10 +698,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta promise.SetValue(externalDataSourceMetadata); return; } - LoadExternalDataSourceSecretValue(entry, userToken, ActorSystem) - .Subscribe([promise, externalDataSourceMetadata](const TFuture<TDescribeObjectResponse>& result) mutable + LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem) + .Subscribe([promise, externalDataSourceMetadata](const TFuture<TDescribeSecretsResponse>& result) mutable { - UpdateExternalDataSourceSecretValue(externalDataSourceMetadata, result.GetValue()); + UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue()); promise.SetValue(externalDataSourceMetadata); }); } diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 139731f4d32..44e78519257 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -251,12 +251,40 @@ public: switch (metadata.ExternalSource.DataSourceAuth.identity_case()) { case NKikimrSchemeOp::TAuth::kServiceAccount: + properties["authMethod"] = "SERVICE_ACCOUNT"; properties["serviceAccountId"] = metadata.ExternalSource.DataSourceAuth.GetServiceAccount().GetId(); properties["serviceAccountIdSignature"] = metadata.ExternalSource.ServiceAccountIdSignature; properties["serviceAccountIdSignatureReference"] = metadata.ExternalSource.DataSourceAuth.GetServiceAccount().GetSecretName(); break; case NKikimrSchemeOp::TAuth::kNone: + properties["authMethod"] = "SERVICE_ACCOUNT"; + break; + + case NKikimrSchemeOp::TAuth::kBasic: + properties["authMethod"] = "BASIC"; + properties["login"] = metadata.ExternalSource.DataSourceAuth.GetBasic().GetLogin(); + properties["password"] = metadata.ExternalSource.Password; + properties["passwordReference"] = metadata.ExternalSource.DataSourceAuth.GetBasic().GetPasswordSecretName(); + break; + + case NKikimrSchemeOp::TAuth::kMdbBasic: + properties["authMethod"] = "MDB_BASIC"; + properties["serviceAccountId"] = metadata.ExternalSource.DataSourceAuth.GetMdbBasic().GetServiceAccountId(); + properties["serviceAccountIdSignature"] = metadata.ExternalSource.ServiceAccountIdSignature; + properties["serviceAccountIdSignatureReference"] = metadata.ExternalSource.DataSourceAuth.GetMdbBasic().GetServiceAccountSecretName(); + + properties["login"] = metadata.ExternalSource.DataSourceAuth.GetMdbBasic().GetLogin(); + properties["password"] = metadata.ExternalSource.Password; + properties["passwordReference"] = metadata.ExternalSource.DataSourceAuth.GetMdbBasic().GetPasswordSecretName(); + break; + + case NKikimrSchemeOp::TAuth::kAws: + properties["authMethod"] = "AWS"; + properties["awsAccessKeyId"] = metadata.ExternalSource.AwsAccessKeyId; + properties["awsAccessKeyIdReference"] = metadata.ExternalSource.DataSourceAuth.GetAws().GetAwsAccessKeyIdSecretName(); + properties["awsSecretAccessKey"] = metadata.ExternalSource.AwsSecretAccessKey; + properties["awsSecretAccessKeyReference"] = metadata.ExternalSource.DataSourceAuth.GetAws().GetAwsSecretAccessKeySecretName(); break; case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index b316ba0e921..74c096f4766 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -365,6 +365,9 @@ struct TExternalSource { TString DataSourceLocation; TString DataSourceInstallation; TString ServiceAccountIdSignature; + TString Password; + TString AwsAccessKeyId; + TString AwsSecretAccessKey; NKikimrSchemeOp::TAuth DataSourceAuth; }; diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 85ad54cd1dd..3a3ce562bf5 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -311,9 +311,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { TInstant start = TInstant::Now(); bool created = false; while (!created && TInstant::Now() - start <= maximalWaitTime) { - auto promise = NThreading::NewPromise<TDescribeObjectResponse>(); - runtime->Register(new TDescribeObjectActor("", secretId, promise)); - TDescribeObjectResponse response = promise.GetFuture().GetValueSync(); + auto promise = NThreading::NewPromise<TDescribeSecretsResponse>(); + runtime->Register(new TDescribeSecretsActor("", {secretId}, promise)); + TDescribeSecretsResponse response = promise.GetFuture().GetValueSync(); if (response.Status == Ydb::StatusIds::SUCCESS) { created = true; @@ -326,7 +326,7 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { UNIT_ASSERT_C(created, "Creating secret object timeout.\n"); } - Y_UNIT_TEST(TestLoadSecretValueFromExternalDataSourceMetadata) { + Y_UNIT_TEST(TestLoadServiceAccountSecretValueFromExternalDataSourceMetadata) { TKikimrRunner kikimr; kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); auto db = kikimr.GetTableClient(); @@ -363,6 +363,108 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { UNIT_ASSERT_C(response.Success(), response.Issues().ToOneLineString()); UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.ServiceAccountIdSignature, secretValue); } + + Y_UNIT_TEST(TestLoadBasicSecretValueFromExternalDataSourceMetadata) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + TString secretId = "myPasswordSecretId"; + TString secretValue = "pswd"; + CreateSecretObject(secretId, secretValue, session, kikimr.GetTestServer().GetRuntime()); + + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="my-bucket", + AUTH_METHOD="BASIC", + LOGIN="mylogin", + PASSWORD_SECRET_NAME=")" << secretId << R"(" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto responseFuture = GetIcGateway(kikimr.GetTestServer())->LoadTableMetadata(TestCluster, externalDataSourceName, IKikimrGateway::TLoadTableMetadataSettings()); + responseFuture.Wait(); + + auto response = responseFuture.GetValue(); + UNIT_ASSERT_C(response.Success(), response.Issues().ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Password, secretValue); + } + + Y_UNIT_TEST(TestLoadMdbBasicSecretValueFromExternalDataSourceMetadata) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + TString secretPasswordId = "myPasswordSecretId"; + TString secretPasswordValue = "pswd"; + CreateSecretObject(secretPasswordId, secretPasswordValue, session, kikimr.GetTestServer().GetRuntime()); + + TString secretSaId = "mySa"; + TString secretSaValue = "sign(mySa)"; + CreateSecretObject(secretSaId, secretSaValue, session, kikimr.GetTestServer().GetRuntime()); + + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="my-bucket", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_ID="mysa", + SERVICE_ACCOUNT_SECRET_NAME=")" << secretSaId << R"(", + LOGIN="mylogin", + PASSWORD_SECRET_NAME=")" << secretPasswordId << R"(" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto responseFuture = GetIcGateway(kikimr.GetTestServer())->LoadTableMetadata(TestCluster, externalDataSourceName, IKikimrGateway::TLoadTableMetadataSettings()); + responseFuture.Wait(); + + auto response = responseFuture.GetValue(); + UNIT_ASSERT_C(response.Success(), response.Issues().ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Password, secretPasswordValue); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.ServiceAccountIdSignature, secretSaValue); + } + + Y_UNIT_TEST(TestLoadAwsSecretValueFromExternalDataSourceMetadata) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + TString awsAccessKeyIdSecretId = "awsAccessKeyIdSecretId"; + TString awsAccessKeyIdSecretValue = "key"; + CreateSecretObject(awsAccessKeyIdSecretId, awsAccessKeyIdSecretValue, session, kikimr.GetTestServer().GetRuntime()); + + TString awsSecretAccessKeySecretId = "awsSecretAccessKeySecretId"; + TString awsSecretAccessKeySecretValue = "value"; + CreateSecretObject(awsSecretAccessKeySecretId, awsSecretAccessKeySecretValue, session, kikimr.GetTestServer().GetRuntime()); + + TString externalDataSourceName = "/Root/ExternalDataSource"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME=")" << awsAccessKeyIdSecretId << R"(", + AWS_SECRET_ACCESS_KEY_SECRET_NAME=")" << awsSecretAccessKeySecretId << R"(" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto responseFuture = GetIcGateway(kikimr.GetTestServer())->LoadTableMetadata(TestCluster, externalDataSourceName, IKikimrGateway::TLoadTableMetadataSettings()); + responseFuture.Wait(); + + auto response = responseFuture.GetValue(); + UNIT_ASSERT_C(response.Success(), response.Issues().ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.AwsAccessKeyId, awsAccessKeyIdSecretValue); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.AwsSecretAccessKey, awsSecretAccessKeySecretValue); + } } } // namespace NYql diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 464342e096e..2e1c0dfdadd 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1730,18 +1730,38 @@ message TExternalTableDescription { } // Access without authorization -message NoneAuth { +message TNoneAuth { } -message ServiceAccountAuth { +message TServiceAccountAuth { optional string Id = 1; optional string SecretName = 2; } +message TMdbBasic { + optional string ServiceAccountId = 1; + optional string ServiceAccountSecretName = 2; + optional string Login = 3; + optional string PasswordSecretName = 4; +} + +message TAws { + optional string AwsAccessKeyIdSecretName = 1; + optional string AwsSecretAccessKeySecretName = 2; +} + +message TBasic { + optional string Login = 1; + optional string PasswordSecretName = 2; +} + message TAuth { oneof identity { - NoneAuth None = 3; - ServiceAccountAuth ServiceAccount = 4; + TNoneAuth None = 3; + TServiceAccountAuth ServiceAccount = 4; + TBasic Basic = 5; + TMdbBasic MdbBasic = 6; + TAws Aws = 7; } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp index 4afcfdacddb..1f13b936c0e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_data_source.cpp @@ -15,15 +15,6 @@ using namespace NSchemeShard; constexpr uint32_t MAX_FIELD_SIZE = 1000; constexpr uint32_t MAX_PROTOBUF_SIZE = 2 * 1024 * 1024; // 2 MiB -bool ValidateSourceType(const TString& sourceType, TString& errStr) { - // Only object storage supported today - if (sourceType != "ObjectStorage") { - errStr = "Only ObjectStorage source type supported but got " + sourceType; - return false; - } - return true; -} - bool ValidateLocationAndInstallation(const TString& location, const TString& installation, TString& errStr) { if (!location && !installation) { errStr = "Location or installation must not be empty"; @@ -40,32 +31,53 @@ bool ValidateLocationAndInstallation(const TString& location, const TString& ins return true; } -bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth, TString& errStr) { +bool CheckAuth(const TString& authMethod, const TVector<TString>& availableAuthMethods, TString& errStr) { + if (Find(availableAuthMethods, authMethod) == availableAuthMethods.end()) { + errStr = TStringBuilder{} << authMethod << " isn't supported for this source type"; + return false; + } + + return true; +} + +bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth, const NKikimr::NExternalSource::IExternalSource::TPtr& source, TString& errStr) { if (auth.ByteSizeLong() > MAX_PROTOBUF_SIZE) { errStr = Sprintf("Maximum size of authorization information must be less or equal equal to %u but got %lu", MAX_PROTOBUF_SIZE, auth.ByteSizeLong()); return false; } + const auto availableAuthMethods = source->GetAuthMethods(); switch (auth.identity_case()) { case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: { - errStr = "Authorization method not specified"; + errStr = "Authorization method isn't specified"; return false; } case NKikimrSchemeOp::TAuth::kServiceAccount: - case NKikimrSchemeOp::TAuth::kNone: { - return true; - } + return CheckAuth("SERVICE_ACCOUNT", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kMdbBasic: + return CheckAuth("MDB_BASIC", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kBasic: + return CheckAuth("BASIC", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kAws: + return CheckAuth("AWS", availableAuthMethods, errStr); + case NKikimrSchemeOp::TAuth::kNone: + return CheckAuth("NONE", availableAuthMethods, errStr); } return false; } -bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, TString& errStr) { - return ValidateSourceType(desc.GetSourceType(), errStr) - && ValidateLocationAndInstallation(desc.GetLocation(), desc.GetInstallation(), errStr) - && ValidateAuth(desc.GetAuth(), errStr); +bool Validate(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) { + try { + auto source = factory->GetOrCreate(desc.GetSourceType()); + return ValidateLocationAndInstallation(desc.GetLocation(), desc.GetInstallation(), errStr) + && ValidateAuth(desc.GetAuth(), source, errStr); + } catch (...) { + errStr = CurrentExceptionMessage(); + return false; + } } -TExternalDataSourceInfo::TPtr CreateExternalDataSource(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, TString& errStr) { - if (!Validate(desc, errStr)) { +TExternalDataSourceInfo::TPtr CreateExternalDataSource(const NKikimrSchemeOp::TExternalDataSourceDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) { + if (!Validate(desc, factory, errStr)) { return nullptr; } TExternalDataSourceInfo::TPtr externalDataSoureInfo = new TExternalDataSourceInfo; @@ -241,7 +253,7 @@ public: return result; } - TExternalDataSourceInfo::TPtr externalDataSoureInfo = CreateExternalDataSource(externalDataSoureDescription, errStr); + TExternalDataSourceInfo::TPtr externalDataSoureInfo = CreateExternalDataSource(externalDataSoureDescription, context.SS->ExternalSourceFactory, errStr); if (!externalDataSoureInfo) { result->SetError(NKikimrScheme::StatusSchemeError, errStr); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp index 077cd36ebe9..1135664b15e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp @@ -159,6 +159,10 @@ TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NK NKikimrExternalSources::TGeneral general; general.ParseFromStringOrThrow(desc.GetContent()); auto source = factory->GetOrCreate(sourceType); + if (!source->HasExternalTable()) { + errStr = TStringBuilder{} << "External table isn't supported for " << sourceType; + return nullptr; + } externalTableInfo->Content = source->Pack(schema, general); } catch (...) { errStr = CurrentExceptionMessage(); diff --git a/ydb/core/tx/schemeshard/ut_external_data_source.cpp b/ydb/core/tx/schemeshard/ut_external_data_source.cpp index 34d97d94e80..058d02bc62b 100644 --- a/ydb/core/tx/schemeshard/ut_external_data_source.cpp +++ b/ydb/core/tx/schemeshard/ut_external_data_source.cpp @@ -259,7 +259,7 @@ Y_UNIT_TEST_SUITE(TExternalDataSourceTest) { None { } } - )", {{NKikimrScheme::StatusSchemeError, "Only ObjectStorage source type supported but got DataStream"}}); + )", {{NKikimrScheme::StatusSchemeError, "External source with type DataStream was not found"}}); TestCreateExternalDataSource(runtime, ++txId, "/MyRoot/DirA",R"( Name: "MyExternalDataSource" SourceType: "ObjectStorage" diff --git a/ydb/library/yql/sql/v1/sql_translation.cpp b/ydb/library/yql/sql/v1/sql_translation.cpp index 4305458adba..90e32fc1ff7 100644 --- a/ydb/library/yql/sql/v1/sql_translation.cpp +++ b/ydb/library/yql/sql/v1/sql_translation.cpp @@ -4198,7 +4198,8 @@ bool TSqlTranslation::StoreDataSourceSettingsEntry(const TIdentifier& id, const } if (IsIn({"source_type", "installation", "location", - "auth_method", "service_account_id", "service_account_secret_name"}, key)) { + "auth_method", "service_account_id", "service_account_secret_name", + "login", "password_secret_name", "aws_access_key_id_secret_name", "aws_secret_access_key_secret_name"}, key)) { if (!StoreString(*value, result[key], Ctx, to_upper(key))) { return false; } @@ -4238,11 +4239,18 @@ bool TSqlTranslation::ParseExternalDataSourceSettings(std::map<TString, TDeferre bool TSqlTranslation::ValidateAuthMethod(const std::map<TString, TDeferredAtom>& result) { const static TSet<TStringBuf> allAuthFields{ "service_account_id", - "service_account_secret_name" + "service_account_secret_name", + "login", + "password_secret_name", + "aws_access_key_id_secret_name", + "aws_secret_access_key_secret_name" }; const static TMap<TStringBuf, TSet<TStringBuf>> authMethodFields{ {"NONE", {}}, - {"SERVICE_ACCOUNT", {"service_account_id", "service_account_secret_name"}} + {"SERVICE_ACCOUNT", {"service_account_id", "service_account_secret_name"}}, + {"BASIC", {"login", "password_secret_name"}}, + {"AWS", {"aws_access_key_id_secret_name", "aws_secret_access_key_secret_name"}}, + {"MDB_BASIC", {"service_account_id", "service_account_secret_name", "login", "password_secret_name"}} }; auto authMethodIt = result.find("auth_method"); if (authMethodIt == result.end() || authMethodIt->second.GetLiteral() == nullptr) { diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 4b097d308ff..8f4e7d283f3 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -2237,6 +2237,82 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { VerifyProgram(res, elementStat, verifyLine); UNIT_ASSERT(elementStat["Aggregate"] == 1); } + + Y_UNIT_TEST(CreateAsyncReplicationParseCorrect) { + auto req = R"( + USE plato; + CREATE ASYNC REPLICATION MyReplication + FOR table1 AS table2, table3 AS table4 + WITH ( + ENDPOINT = "localhost:2135", + DATABASE = "/MyDatabase" + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createAsyncReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table1")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table2")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table3")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table4")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("ENDPOINT")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("localhost:2135")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DATABASE")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("/MyDatabase")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(DropAsyncReplicationParseCorrect) { + auto req = R"( + USE plato; + DROP ASYNC REPLICATION MyReplication; + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropAsyncReplication")); + UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("cascade")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(DropAsyncReplicationCascade) { + auto req = R"( + USE plato; + DROP ASYNC REPLICATION MyReplication CASCADE; + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("cascade")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } } Y_UNIT_TEST_SUITE(ExternalFunction) { @@ -5043,7 +5119,7 @@ Y_UNIT_TEST_SUITE(ExternalDeclares) { } Y_UNIT_TEST_SUITE(ExternalDataSource) { - Y_UNIT_TEST(CreateExternalDataSource) { + Y_UNIT_TEST(CreateExternalDataSourceWithAuthNone) { NYql::TAstParseResult res = SqlToYql(R"( USE plato; CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( @@ -5067,7 +5143,7 @@ Y_UNIT_TEST_SUITE(ExternalDataSource) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } - Y_UNIT_TEST(CreateExternalDataSourceWithServiceAccount) { + Y_UNIT_TEST(CreateExternalDataSourceWithAuthServiceAccount) { NYql::TAstParseResult res = SqlToYql(R"( USE plato; CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( @@ -5075,14 +5151,94 @@ Y_UNIT_TEST_SUITE(ExternalDataSource) { LOCATION="my-bucket", AUTH_METHOD="SERVICE_ACCOUNT", SERVICE_ACCOUNT_ID="sa", - SERVICE_ACCOUNT_SECRET_NAME="secret_name" + SERVICE_ACCOUNT_SECRET_NAME="sa_secret_name" + ); + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"SERVICE_ACCOUNT") '('"location" '"my-bucket") '('"service_account_id" '"sa") '('"service_account_secret_name" '"sa_secret_name") '('"source_type" '"ObjectStorage"))#"); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(CreateExternalDataSourceWithBasic) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="BASIC", + LOGIN="admin", + PASSWORD_SECRET_NAME="secret_name" ); )"); UNIT_ASSERT(res.Root); TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { if (word == "Write") { - UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"SERVICE_ACCOUNT") '('"location" '"my-bucket") '('"service_account_id" '"sa") '('"service_account_secret_name" '"secret_name") '('"source_type" '"ObjectStorage"))#"); + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"BASIC") '('"location" '"protocol://host:port/") '('"login" '"admin") '('"password_secret_name" '"secret_name") '('"source_type" '"PostgreSQL"))#"); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(CreateExternalDataSourceWithMdbBasic) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_ID="sa", + SERVICE_ACCOUNT_SECRET_NAME="sa_secret_name", + LOGIN="admin", + PASSWORD_SECRET_NAME="secret_name" + ); + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"MDB_BASIC") '('"location" '"protocol://host:port/") '('"login" '"admin") '('"password_secret_name" '"secret_name") '('"service_account_id" '"sa") '('"service_account_secret_name" '"sa_secret_name") '('"source_type" '"PostgreSQL"))#"); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(CreateExternalDataSourceWithAws) { + NYql::TAstParseResult res = SqlToYql(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="secred_id_name", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="secret_key_name" + ); + )"); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_STRING_CONTAINS(line, R"#('('('"auth_method" '"AWS") '('"aws_access_key_id_secret_name" '"secred_id_name") '('"aws_secret_access_key_secret_name" '"secret_key_name") '('"location" '"protocol://host:port/") '('"source_type" '"PostgreSQL"))#"); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createObject")); } }; @@ -5195,6 +5351,94 @@ Y_UNIT_TEST_SUITE(ExternalDataSource) { SERVICE_ACCOUNT_SECRET_NAME="s1" ); )" , "<main>:7:49: Error: SERVICE_ACCOUNT_ID requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="BASIC", + LOGIN="admin" + ); + )" , "<main>:7:27: Error: PASSWORD_SECRET_NAME requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="BASIC", + PASSWORD_SECRET_NAME="secret_name" + ); + )" , "<main>:7:42: Error: LOGIN requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_SECRET_NAME="sa_secret_name", + LOGIN="admin", + PASSWORD_SECRET_NAME="secret_name" + ); + )" , "<main>:9:42: Error: SERVICE_ACCOUNT_ID requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_ID="sa", + LOGIN="admin", + PASSWORD_SECRET_NAME="secret_name" + ); + )" , "<main>:9:42: Error: SERVICE_ACCOUNT_SECRET_NAME requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_ID="sa", + SERVICE_ACCOUNT_SECRET_NAME="sa_secret_name", + PASSWORD_SECRET_NAME="secret_name" + ); + )" , "<main>:9:42: Error: LOGIN requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_ID="sa", + SERVICE_ACCOUNT_SECRET_NAME="sa_secret_name", + LOGIN="admin" + ); + )" , "<main>:9:27: Error: PASSWORD_SECRET_NAME requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="AWS", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="secret_key_name" + ); + )" , "<main>:7:55: Error: AWS_ACCESS_KEY_ID_SECRET_NAME requires key\n"); + + ExpectFailWithError(R"( + USE plato; + CREATE EXTERNAL DATA SOURCE MyDataSource WITH ( + SOURCE_TYPE="PostgreSQL", + LOCATION="protocol://host:port/", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="secred_id_name" + ); + )" , "<main>:7:51: Error: AWS_SECRET_ACCESS_KEY_SECRET_NAME requires key\n"); } Y_UNIT_TEST(DropExternalDataSourceWithTablePrefix) { @@ -5238,82 +5482,6 @@ Y_UNIT_TEST_SUITE(ExternalDataSource) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } - - Y_UNIT_TEST(CreateAsyncReplicationParseCorrect) { - auto req = R"( - USE plato; - CREATE ASYNC REPLICATION MyReplication - FOR table1 AS table2, table3 AS table4 - WITH ( - ENDPOINT = "localhost:2135", - DATABASE = "/MyDatabase" - ); - )"; - auto res = SqlToYql(req); - UNIT_ASSERT(res.Root); - - TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { - if (word == "Write") { - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createAsyncReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table1")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table2")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table3")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table4")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("ENDPOINT")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("localhost:2135")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DATABASE")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("/MyDatabase")); - } - }; - - TWordCountHive elementStat = { {TString("Write"), 0}}; - VerifyProgram(res, elementStat, verifyLine); - - UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); - } - - Y_UNIT_TEST(DropAsyncReplicationParseCorrect) { - auto req = R"( - USE plato; - DROP ASYNC REPLICATION MyReplication; - )"; - auto res = SqlToYql(req); - UNIT_ASSERT(res.Root); - - TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { - if (word == "Write") { - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropAsyncReplication")); - UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("cascade")); - } - }; - - TWordCountHive elementStat = { {TString("Write"), 0}}; - VerifyProgram(res, elementStat, verifyLine); - - UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); - } - - Y_UNIT_TEST(DropAsyncReplicationCascade) { - auto req = R"( - USE plato; - DROP ASYNC REPLICATION MyReplication CASCADE; - )"; - auto res = SqlToYql(req); - UNIT_ASSERT(res.Root); - - TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { - if (word == "Write") { - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("cascade")); - } - }; - - TWordCountHive elementStat = { {TString("Write"), 0}}; - VerifyProgram(res, elementStat, verifyLine); - - UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); - } } Y_UNIT_TEST_SUITE(ExternalTable) { |
