diff options
author | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-06-27 19:00:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-06-27 19:00:22 +0300 |
commit | cd23cf14ba0e02916da11dafb03202741f855ab3 (patch) | |
tree | 05cb957a0d40e2b1de5adec6701c61f3da164d4c | |
parent | c560c220eadb1368069c19fca7ebc4dad43a30b6 (diff) | |
download | ydb-cd23cf14ba0e02916da11dafb03202741f855ab3.tar.gz |
Add access to MDB API for Generic provider
1. В dqrun добавлен класс, который создаёт акторную систему. В дальнейшем можно будет отрефакторить `TServiceNode` так, чтобы и он её переиспользовал.
2. В Generic провайдер добавлены изменения для корректной работы с MDB API.
19 files changed, 176 insertions, 141 deletions
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index 53dfc18ebbe..5f93243f9eb 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -313,6 +313,7 @@ private: .AddUrlParam("databaseId", databaseId) .Build(); } else { + YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway"); url = TUrlBuilder(ev->Get()->MdbGateway + "/managed-clickhouse/v1/clusters/") .AddPathComponent(databaseId) .AddPathComponent("hosts") diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index c69cae91bc2..561f80e100b 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1835,7 +1835,7 @@ private: } { - dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver)); + dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver)); } { diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp index 2231401c426..82c9e8f90bf 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp @@ -18,11 +18,17 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl( , MdbGateway(mdbGateway) , MdbTransformHost(mdbTransformHost) , TraceId(traceId) -{} +{ +} -TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds( - const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const +TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const DatabaseIds& ids) const { + // Cloud database ids validataion + for (const auto& kv: ids) { + // empty cluster name is not good + YQL_ENSURE(kv.first.first, "empty cluster name"); + } + auto promise = NewPromise<NYql::TDbResolverResponse>(); TDuration timeout = TDuration::Seconds(40); auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>( @@ -40,6 +46,7 @@ TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds( ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId, new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway, TraceId, MdbTransformHost))); + return promise.GetFuture(); } diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h index db317bc493f..66f7cf50ffd 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h @@ -16,8 +16,7 @@ public: const TString& traceId = "" ); - NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds( - const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const override; + NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const DatabaseIds& ids) const override; private: NActors::TActorSystem* ActorSystem; const NActors::TActorId Recipient; diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index 6a9385996f5..e3a00c02a84 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -13,6 +13,7 @@ enum class DatabaseType { Generic }; + struct TDatabaseAuth { TString StructuredToken; bool AddBearerToToken = false; @@ -49,8 +50,9 @@ struct TDbResolverResponse { class IDatabaseAsyncResolver { public: - virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds( - const THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>& ids) const = 0; + using DatabaseIds = THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>; + + virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const DatabaseIds& ids) const = 0; virtual ~IDatabaseAsyncResolver() = default; }; diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 1a96f12b172..9054e853a94 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -548,19 +548,31 @@ message TGenericClusterConfig { oneof Location { // Endpoint must be used for on-premise deployments. NYql.Connector.API.Endpoint Endpoint = 3; - // DatabaseID must be used when the data source is deployed in cloud. + // DatabaseId must be used when the data source is deployed in cloud. // Data source FQDN and port will be resolved by MDB service. - string DatabaseID = 4; + string DatabaseId = 4; } + // Credentials used to access data source instance required NYql.Connector.API.Credentials Credentials = 5; + + // Credentials used to access MDB API. + // When working with data source instances deployed in a cloud, + // you should either set (ServiceAccountId, ServiceAccountIdSignature) pair, + // or set IAM Token via env variable / credential table. + optional string ServiceAccountId = 6; + optional string ServiceAccountIdSignature = 7; } message TGenericGatewayConfig { // TODO: replace with map<DataSourceKind, Endpoint> required string Endpoint = 1; + reserved 2; repeated TGenericClusterConfig ClusterMapping = 3; + + // MDB API endpoint (do not fill in case of on-prem deployment) + optional string MdbGateway = 4; } /////////////////////////////// Root /////////////////////////////// diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 70345916a69..82af2dbec2b 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -60,13 +60,13 @@ namespace NYql::NDq { TGenericReadActor(ui64 inputIndex, Connector::IClient::TPtr genericClient, const NYql::Connector::API::Select& select, const NYql::Connector::API::DataSourceInstance& dataSourceInstance, const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory) - : InputIndex(inputIndex) - , ComputeActorId(computeActorId) - , ActorSystem(TActivationContext::ActorSystem()) - , ConnectorClient(genericClient) - , HolderFactory(holderFactory) - , Select(select) - , DataSourceInstance(dataSourceInstance) + : InputIndex_(inputIndex) + , ComputeActorId_(computeActorId) + , ActorSystem_(TActivationContext::ActorSystem()) + , ConnectorClient_(genericClient) + , HolderFactory_(holderFactory) + , Select_(select) + , DataSourceInstance_(dataSourceInstance) { } @@ -74,13 +74,13 @@ namespace NYql::NDq { Become(&TGenericReadActor::StateFunc); Connector::API::ListSplitsRequest listSplitsRequest; - listSplitsRequest.mutable_selects()->Add()->CopyFrom(Select); - listSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance); + listSplitsRequest.mutable_selects()->Add()->CopyFrom(Select_); + listSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance_); - auto listSplitsResult = ConnectorClient->ListSplits(listSplitsRequest); + auto listSplitsResult = ConnectorClient_->ListSplits(listSplitsRequest); if (!Connector::ErrorIsSuccess(listSplitsResult->Error)) { YQL_CLOG(ERROR, ProviderGeneric) << "ListSplits failure" << listSplitsResult->Error.DebugString(); - ActorSystem->Send(new IEventHandle( + ActorSystem_->Send(new IEventHandle( SelfId(), TActorId(), new TEvPrivate::TEvReadError(Connector::ErrorToIssues(listSplitsResult->Error)))); return; } @@ -93,12 +93,12 @@ namespace NYql::NDq { std::for_each( listSplitsResult->Splits.cbegin(), listSplitsResult->Splits.cend(), [&](const Connector::API::Split& split) { readSplitsRequest.mutable_splits()->Add()->CopyFrom(split); }); - readSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance); + readSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance_); - auto readSplitsResult = ConnectorClient->ReadSplits(readSplitsRequest); + auto readSplitsResult = ConnectorClient_->ReadSplits(readSplitsRequest); if (!Connector::ErrorIsSuccess(listSplitsResult->Error)) { YQL_CLOG(ERROR, ProviderGeneric) << "ReadSplits failure" << readSplitsResult->Error.DebugString(); - ActorSystem->Send(new IEventHandle( + ActorSystem_->Send(new IEventHandle( SelfId(), TActorId(), new TEvPrivate::TEvReadError(Connector::ErrorToIssues(listSplitsResult->Error)))); return; } @@ -106,7 +106,7 @@ namespace NYql::NDq { YQL_CLOG(INFO, ProviderGeneric) << "ReadSplits succeess, total batches: " << readSplitsResult->RecordBatches.size(); - ActorSystem->Send(new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvReadResult(readSplitsResult))); + ActorSystem_->Send(new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvReadResult(readSplitsResult))); } static constexpr char ActorName[] = "Generic_READ_ACTOR"; @@ -119,7 +119,7 @@ namespace NYql::NDq { void CommitState(const NDqProto::TCheckpoint&) final { } ui64 GetInputIndex() const final { - return InputIndex; + return InputIndex_; } STRICT_STFUNC(StateFunc, @@ -129,12 +129,12 @@ namespace NYql::NDq { i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final { YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported"); - if (Result) { + if (Result_) { NUdf::TUnboxedValue value; ui64 total = 0; - for (const auto& batch : Result->RecordBatches) { + for (const auto& batch : Result_->RecordBatches) { total += NUdf::GetSizeOfArrowBatchInBytes(*batch); YQL_CLOG(TRACE, ProviderGeneric) << "Converting arrow::RecordBatch into NUdf::UnboxedValue:\n" @@ -151,14 +151,14 @@ namespace NYql::NDq { } NUdf::TUnboxedValue* structItems = nullptr; - auto structObj = ArrowRowContainerCache.NewArray(HolderFactory, 1 + batch->num_columns(), structItems); + auto structObj = ArrowRowContainerCache_.NewArray(HolderFactory_, 1 + batch->num_columns(), structItems); for (int i = 0; i < batch->num_columns(); ++i) { const auto& columnName = batch->schema()->field(i)->name(); const auto ix = fieldNameOrder[columnName]; - structItems[ix] = HolderFactory.CreateArrowBlock(arrow::Datum(batch->column(i))); + structItems[ix] = HolderFactory_.CreateArrowBlock(arrow::Datum(batch->column(i))); } - structItems[fieldNameOrder[std::string(BlockLengthColumnName)]] = HolderFactory.CreateArrowBlock( + structItems[fieldNameOrder[std::string(BlockLengthColumnName)]] = HolderFactory_.CreateArrowBlock( arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch->num_rows()))); value = structObj; @@ -167,11 +167,11 @@ namespace NYql::NDq { // freeSpace -= size; finished = true; - Result.reset(); + Result_.reset(); // TODO: check it, because in S3 the generic cache clearing happens only when LastFileWasProcessed: // https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp?rev=r11543410#L2497 - ArrowRowContainerCache.Clear(); + ArrowRowContainerCache_.Clear(); return total; } @@ -180,13 +180,13 @@ namespace NYql::NDq { } void Handle(TEvPrivate::TEvReadResult::TPtr& evReadResult) { - Result = evReadResult->Get()->Result; - Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + Result_ = evReadResult->Get()->Result; + Send(ComputeActorId_, new TEvNewAsyncInputDataArrived(InputIndex_)); } void Handle(TEvPrivate::TEvReadError::TPtr& result) { - Send(ComputeActorId, - new TEvAsyncInputError(InputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + Send(ComputeActorId_, + new TEvAsyncInputError(InputIndex_, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } // IActor & IDqComputeActorAsyncInput @@ -194,18 +194,18 @@ namespace NYql::NDq { TActorBootstrapped<TGenericReadActor>::PassAway(); } - const ui64 InputIndex; - const NActors::TActorId ComputeActorId; + const ui64 InputIndex_; + const NActors::TActorId ComputeActorId_; - TActorSystem* const ActorSystem; + TActorSystem* const ActorSystem_; // Changed: - Connector::IClient::TPtr ConnectorClient; - Connector::ReadSplitsResult::TPtr Result; - NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache; - const NKikimr::NMiniKQL::THolderFactory& HolderFactory; - const NYql::Connector::API::Select Select; - const NYql::Connector::API::DataSourceInstance DataSourceInstance; + Connector::IClient::TPtr ConnectorClient_; + Connector::ReadSplitsResult::TPtr Result_; + NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; + const NYql::Connector::API::Select Select_; + const NYql::Connector::API::DataSourceInstance DataSourceInstance_; }; std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> diff --git a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp index 7942485feb9..644141825df 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp @@ -12,7 +12,7 @@ void SetDatabaseSourceInstance(NYql::Connector::API::DataSourceInstance* dsi) { dsi->mutable_credentials()->mutable_basic()->set_password("qwerty12345"); } -std::shared_ptr<NYql::Connector::DescribeTableResult> describeTable(NYql::Connector::IClient::TPtr client) { +std::shared_ptr<NYql::Connector::DescribeTableResult> DescribeTable(NYql::Connector::IClient::TPtr client) { NYql::Connector::API::DescribeTableRequest request; request.set_table(TableName); SetDatabaseSourceInstance(request.mutable_data_source_instance()); @@ -31,7 +31,7 @@ std::shared_ptr<NYql::Connector::DescribeTableResult> describeTable(NYql::Connec } std::shared_ptr<NYql::Connector::ListSplitsResult> -listSplits(NYql::Connector::IClient::TPtr client, const google::protobuf::RepeatedPtrField<Ydb::Column>& columns) { +ListSplits(NYql::Connector::IClient::TPtr client, const google::protobuf::RepeatedPtrField<Ydb::Column>& columns) { NYql::Connector::API::ListSplitsRequest request; SetDatabaseSourceInstance(request.mutable_data_source_instance()); @@ -61,7 +61,7 @@ listSplits(NYql::Connector::IClient::TPtr client, const google::protobuf::Repeat return result; } -std::shared_ptr<NYql::Connector::ReadSplitsResult> readSplits(NYql::Connector::IClient::TPtr client, +std::shared_ptr<NYql::Connector::ReadSplitsResult> ReadSplits(NYql::Connector::IClient::TPtr client, const std::vector<NYql::Connector::API::Split>& splits) { NYql::Connector::API::ReadSplitsRequest request; SetDatabaseSourceInstance(request.mutable_data_source_instance()); @@ -91,9 +91,9 @@ int main() { auto client = NYql::Connector::MakeClientGRPC("localhost:50051"); try { - auto columns = describeTable(client)->Schema.columns(); - auto splits = listSplits(client, columns)->Splits; - readSplits(client, splits); + auto columns = DescribeTable(client)->Schema.columns(); + auto splits = ListSplits(client, columns)->Splits; + ReadSplits(client, splits); } catch (const std::runtime_error& e) { YQL_LOG(ERROR) << e.what(); } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp index fc4da3b6aec..928a23283af 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp @@ -3,6 +3,7 @@ #include <arrow/io/api.h> #include <arrow/ipc/api.h> #include <util/string/builder.h> +#include <util/system/type_name.h> #include <ydb/core/formats/arrow/serializer/batch_only.h> #include <ydb/core/formats/arrow/serializer/full.h> #include <ydb/library/yql/utils/log/log.h> @@ -47,7 +48,7 @@ namespace NYql::Connector { } default: ythrow yexception() << "unexpected type: " << Ydb::Type_PrimitiveTypeId_Name(t) << " (" - << typeid(t).name() << ")"; + << TypeName(t) << ")"; } return arrow::Status::OK(); } diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt index b45684fa272..d41aad8c1c6 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt @@ -30,6 +30,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-structured_token providers-common-transform providers-dq-common providers-dq-expr_nodes @@ -50,5 +51,4 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt index 190f243c44f..712f9b54de7 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt @@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-structured_token providers-common-transform providers-dq-common providers-dq-expr_nodes @@ -51,5 +52,4 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt index 190f243c44f..712f9b54de7 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt @@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-structured_token providers-common-transform providers-dq-common providers-dq-expr_nodes @@ -51,5 +52,4 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt index b45684fa272..d41aad8c1c6 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt @@ -30,6 +30,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-structured_token providers-common-transform providers-dq-common providers-dq-expr_nodes @@ -50,5 +51,4 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index c788882efbf..9129151fcd4 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -15,7 +15,6 @@ SRCS( yql_generic_provider.cpp yql_generic_provider.h yql_generic_provider_impl.h - yql_generic_settings.cpp yql_generic_settings.h ) @@ -38,6 +37,7 @@ PEERDIR( ydb/library/yql/providers/common/mkql ydb/library/yql/providers/common/proto ydb/library/yql/providers/common/provider + ydb/library/yql/providers/common/structured_token ydb/library/yql/providers/common/transform ydb/library/yql/providers/dq/common ydb/library/yql/providers/dq/expr_nodes diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index 81c2aef829b..064519d2371 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -47,24 +47,29 @@ namespace NYql { !reads.empty()) { for (auto& node : reads) { const TGenRead read(node); - const auto cluster = read.DataSource().Cluster().StringValue(); - YQL_CLOG(DEBUG, ProviderGeneric) << "Found cluster: " << cluster; - auto databaseID = State_->Configuration->ClusterConfigs[cluster].GetDatabaseID(); - YQL_CLOG(DEBUG, ProviderGeneric) << "Found cloudID: " << databaseID; - const auto idKey = std::make_pair(databaseID, NYql::DatabaseType::Generic); - const auto iter = State_->DatabaseIds.find(idKey); - if (iter != State_->DatabaseIds.end()) { - YQL_CLOG(DEBUG, ProviderGeneric) << "Resolve CloudID: " << databaseID; - ids[idKey] = iter->second; + const auto clusterName = read.DataSource().Cluster().StringValue(); + YQL_CLOG(DEBUG, ProviderGeneric) << "found cluster name: " << clusterName; + + auto databaseId = State_->Configuration->ClusterConfigs[clusterName].GetDatabaseId(); + if (databaseId) { + YQL_CLOG(DEBUG, ProviderGeneric) << "found database id: " << databaseId; + const auto idKey = std::make_pair(databaseId, NYql::DatabaseType::Generic); + const auto iter = State_->DatabaseIds.find(idKey); + if (iter != State_->DatabaseIds.end()) { + YQL_CLOG(DEBUG, ProviderGeneric) << "resolve database id: " << databaseId; + ids[idKey] = iter->second; + } } } } - YQL_CLOG(DEBUG, ProviderGeneric) << "Ids to resolve: " << ids.size(); + YQL_CLOG(DEBUG, ProviderGeneric) << "total database ids to resolve: " << ids.size(); if (ids.empty()) { return TStatus::Ok; } + // FIXME: overengineered code - instead of using weak_ptr, directly copy shared_ptr in callback in this way: + // Apply([response = DbResolverResponse_](...)) const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -88,7 +93,7 @@ namespace NYql { DbResolverResponse_->DatabaseId2Endpoint.end()); DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); YQL_CLOG(DEBUG, ProviderGeneric) << "ResolvedIds: " << FullResolvedIds_.size(); - const auto& id2Clusters = State_->Configuration->DbId2Clusters; + const auto& id2Clusters = State_->Configuration->DatabaseIdsToClusterNames; for (const auto& [dbIdWithType, info] : FullResolvedIds_) { const auto& dbId = dbIdWithType.first; const auto iter = id2Clusters.find(dbId); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp index 4a061c552e9..fe88e66c3e5 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp @@ -27,7 +27,7 @@ namespace NYql { state->FunctionRegistry = functionRegistry; state->DbResolver = dbResolver; if (gatewaysConfig) { - state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DbResolver, state->DatabaseIds); + state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DbResolver, state->DatabaseIds, typeCtx->Credentials); } TDataProviderInfo info; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h index 3e41f056a0f..69d902894a1 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h @@ -49,16 +49,7 @@ namespace NYql { ythrow yexception() << "unknown (" << cluster << ", " << table << ") pair"; }; - THashMap<std::pair<TString, TString>, TTableMeta> Tables; - std::unordered_map<std::string_view, std::string_view> Timezones; - - TTypeAnnotationContext* Types = nullptr; - TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>(); - const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; - - TString DumpToString() const { + TString ToString() const { TStringBuilder sb; if (Tables) { for (const auto& kv : Tables) { @@ -68,6 +59,17 @@ namespace NYql { } return sb; } + + THashMap<std::pair<TString, TString>, TTableMeta> Tables; + std::unordered_map<std::string_view, std::string_view> Timezones; + + TTypeAnnotationContext* Types = nullptr; + TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>(); + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; + + // key - database id, value - credentials to access MDB API + NYql::IDatabaseAsyncResolver::DatabaseIds DatabaseIds; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp deleted file mode 100644 index 5c1086b81e2..00000000000 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "yql_generic_settings.h" - -namespace NYql { - - using namespace NCommon; - - TGenericConfiguration::TGenericConfiguration() - { - } - - TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const { - return std::make_shared<const TGenericSettings>(*this); - } - - bool TGenericConfiguration::HasCluster(TStringBuf cluster) const { - return ValidClusters.contains(cluster); - } - -} diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index 3d6f36d722c..1274488f5e0 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -4,6 +4,7 @@ #include <ydb/library/yql/providers/common/config/yql_setting.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> #include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h> #include <ydb/library/yql/utils/log/log.h> @@ -13,25 +14,17 @@ namespace NYql { using TConstPtr = std::shared_ptr<const TGenericSettings>; }; - struct TGenericURL { - TString Host; - ui16 Port; - EHostScheme Scheme; - - TString Endpoint() const { - return Host + ':' + ToString(Port); - }; - }; - struct TGenericConfiguration: public TGenericSettings, public NCommon::TSettingDispatcher { using TPtr = TIntrusivePtr<TGenericConfiguration>; - TGenericConfiguration(); + TGenericConfiguration(){}; TGenericConfiguration(const TGenericConfiguration&) = delete; template <typename TProtoConfig> - void Init(const TProtoConfig& config, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) + void Init(const TProtoConfig& config, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + NYql::IDatabaseAsyncResolver::DatabaseIds& databaseIds, + const TCredentials::TPtr& credentials) { TVector<TString> clusterNames(Reserve(config.ClusterMappingSize())); @@ -39,42 +32,74 @@ namespace NYql { clusterNames.push_back(cluster.GetName()); ClusterConfigs[cluster.GetName()] = cluster; } - this->SetValidClusters(clusterNames); - // TODO: support data sources other than ClickHouse here - for (auto& cluster : config.GetClusterMapping()) { - if (dbResolver) { - const auto& databaseID = cluster.GetDatabaseID(); - YQL_CLOG(DEBUG, ProviderGeneric) - << "Settings: clusterName = " << cluster.GetName() << ", cluster cloud id = " << databaseID - << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() - << ", HasEndpoint: " << (cluster.HasEndpoint() ? "TRUE" : "FALSE"); - - // TODO: recover logic with structured tokens - databaseIds[std::make_pair(databaseID, NYql::DatabaseType::Generic)] = - NYql::TDatabaseAuth{"", /*AddBearer=*/false}; - if (databaseID) { - DbId2Clusters[databaseID].emplace_back(cluster.GetName()); - YQL_CLOG(DEBUG, ProviderGeneric) << "Add cluster cloud id: " << databaseID << " to DbId2Clusters"; - } - } - - // NOTE: Tokens map is left because of legacy. - // There are no reasons for provider to store these tokens other than - // to keep compatibility with YQL engine. - const auto& basic = cluster.GetCredentials().basic(); - Tokens[cluster.GetName()] = TStringBuilder() << "basic#" << basic.username() << "#" << basic.password(); + for (const auto& cluster : config.GetClusterMapping()) { + InitCluster(cluster, dbResolver, databaseIds, credentials); } + this->FreezeDefaults(); } - bool HasCluster(TStringBuf cluster) const; + private: + void InitCluster(const TGenericClusterConfig& cluster, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + NYql::IDatabaseAsyncResolver::DatabaseIds& databaseIds, + const TCredentials::TPtr& credentials) { + const auto& clusterName = cluster.GetName(); + const auto& databaseId = cluster.GetDatabaseId(); + const auto& endpoint = cluster.GetEndpoint(); + + YQL_CLOG(DEBUG, ProviderGeneric) + << "initialize cluster" + << ": name = " << clusterName + << ", database id = " << databaseId + << ", endpoint = " << endpoint; + + if (dbResolver && databaseId) { + const auto token = MakeStructuredToken(cluster, credentials); + + databaseIds[std::make_pair(databaseId, NYql::DatabaseType::Generic)] = NYql::TDatabaseAuth{token, /*AddBearer=*/true}; + + DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName); + YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping"; + } + + // NOTE: Tokens map is filled just because it's required by YQL legacy code. + // The only reason for provider to store these tokens is + // to keep compatibility with YQL engine. + // Real credentials are stored in TGenericClusterConfig. + Tokens[cluster.GetName()] = " "; + } + + // Structured tokens are used to access MDB API. They can be constructed from two + TString MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) { + TStructuredTokenBuilder b; + + const auto iamToken = credentials->FindCredentialContent("default_" + cluster.name(), "default_generic", ""); + if (iamToken) { + return b.SetIAMToken(iamToken).ToJson(); + } + + if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) { + return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson(); + } + + ythrow yexception() << "you should either provide IAM Token via credential system, " + "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config"; + } + + public: + TGenericSettings::TConstPtr Snapshot() const { + return std::make_shared<const TGenericSettings>(*this); + } + + bool HasCluster(TStringBuf cluster) const { + return ValidClusters.contains(cluster); + } - TGenericSettings::TConstPtr Snapshot() const; THashMap<TString, TString> Tokens; - THashMap<TString, TGenericClusterConfig> ClusterConfigs; - THashMap<TString, TVector<TString>> DbId2Clusters; // DatabaseId -> ClusterNames + THashMap<TString, TGenericClusterConfig> ClusterConfigs; // cluster name -> cluster config + THashMap<TString, TVector<TString>> DatabaseIdsToClusterNames; // database id -> cluster name }; - } |