diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-15 18:04:37 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-15 18:04:37 +0300 |
commit | 457c97d555bfbe3f9351eb22a932b9965e6816a6 (patch) | |
tree | 64b6f6aef25dbac3a4c60d33eedf2aac519102ab | |
parent | fb226f95af53bfc8b810a8d1e80f3d09daecfba3 (diff) | |
download | ydb-457c97d555bfbe3f9351eb22a932b9965e6816a6.tar.gz |
YQ-831 Refactoring DatabaseAsyncResolver
Refactoring
ref:99c3eae012ffa2985b4824a0b9aeebff6f5efb3c
23 files changed, 76 insertions, 184 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 0095963c8f1..0084f9c37d2 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -67,6 +67,7 @@ namespace NYq { using namespace NActors; +using namespace NYql; namespace { diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index ada7b0d464c..ef272b0404b 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -69,7 +69,6 @@ #include <ydb/core/yq/libs/checkpointing_common/defs.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h> -#include <ydb/core/yq/libs/common/database_token_builder.h> #include <ydb/core/yq/libs/private_client/private_client.h> #define LOG_E(stream) \ @@ -283,7 +282,6 @@ private: LOG_D("Connection with empty name " << connection.meta().id()); continue; } - Connections[connection.content().name()] = connection; // Necessary for TDatabaseAsyncResolverWithMeta YqConnections.emplace(connection.meta().id(), connection); } } @@ -1138,8 +1136,8 @@ private: clusters); TVector<TDataProviderInitializer> dataProvidersInit; - const auto dbResolver = std::make_shared<TDatabaseAsyncResolverWithMeta>(TDatabaseAsyncResolverWithMeta(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver, - Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId, Params.AuthToken, Params.AccountIdSignatures, Connections)); + const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver, + Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId); { // TBD: move init to better place QueryStateUpdateRequest.set_scope(Params.Scope.ToString()); @@ -1350,7 +1348,6 @@ private: bool EnableCheckpointCoordinator = false; bool RetryNeeded = false; Yq::Private::PingTaskRequest QueryStateUpdateRequest; - THashMap<TString, YandexQuery::Connection> Connections; // Necessary for DbAsyncResolver const ui64 MaxTasksPerOperation = 100; diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp index 8f1ac3839e8..704431b1fb0 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp @@ -66,7 +66,7 @@ public: } const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) { + AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) *res = std::move(future.ExtractValue()); }); diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp index 15c4d834e3c..3cdfb17866e 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp @@ -6,9 +6,9 @@ namespace NYql { TDataProviderInitializer GetClickHouseDataProviderInitializer( IHTTPGateway::TPtr gateway, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta) + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) { - return [gateway, dbResolverWithMeta] ( + return [gateway, dbResolver] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -29,7 +29,7 @@ TDataProviderInitializer GetClickHouseDataProviderInitializer( state->Types = typeCtx.Get(); state->FunctionRegistry = functionRegistry; - state->DbResolver = dbResolverWithMeta; + state->DbResolver = dbResolver; if (gatewaysConfig) { state->Configuration->Init(gatewaysConfig->GetClickHouse(), state->DbResolver, state->DatabaseIds); } diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h index 28ac8b244b7..17ebb38b7b3 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h @@ -1,9 +1,10 @@ #pragma once +#include "yql_clickhouse_settings.h" + #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> -#include "yql_clickhouse_settings.h" -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NKikimr::NMiniKQL { class IFunctionRegistry; @@ -27,12 +28,12 @@ struct TClickHouseState : public TThrRefBase TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver; + std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer GetClickHouseDataProviderInitializer( IHTTPGateway::TPtr gateway, - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr + std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr ); TIntrusivePtr<IDataProvider> CreateClickHouseDataSource(TClickHouseState::TPtr state, IHTTPGateway::TPtr gateway); diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h index 8f6eecf9382..c206844b4f9 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h @@ -1,12 +1,13 @@ #pragma once +#include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/providers/common/config/yql_dispatch.h> #include <ydb/library/yql/providers/common/config/yql_setting.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/core/yq/libs/events/events.h> -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h> -#include <ydb/core/yq/libs/common/database_token_builder.h> + namespace NYql { @@ -23,7 +24,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS template <typename TProtoConfig> void Init( const TProtoConfig& config, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver, + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); @@ -37,10 +38,11 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS for (auto& cluster: config.GetClusterMapping()) { this->Dispatch(cluster.GetName(), cluster.GetSettings()); - if (dbResolver) { //TODO: change log level + if (dbResolver) { YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ; - dbResolver->TryAddDbIdToResolve(cluster.HasCluster(), cluster.GetName(), cluster.GetId(), NYq::DatabaseType::ClickHouse, databaseIds); + databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::ClickHouse)] = + NYq::TEvents::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false}; if (cluster.GetId()) { DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); YQL_CLOG(DEBUG, ProviderClickHouse) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters"; diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index a1e4ab18a1d..733bed54b02 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -5,14 +5,10 @@ namespace NYq { -struct TResolveParams { - THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth> Ids; - TString TraceId; -}; - class IDatabaseAsyncResolver { public: - virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const = 0; + virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds( + const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const = 0; virtual ~IDatabaseAsyncResolver() = default; }; diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp index d4fcc6e6dcb..e13093b4637 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp @@ -1,22 +1,27 @@ #include "db_async_resolver_impl.h" namespace NYq { +using namespace NThreading; -TDatabaseAsyncResolver::TDatabaseAsyncResolver( +TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl( NActors::TActorSystem* actorSystem, const NActors::TActorId& recipient, const TString& ydbMvpEndpoint, const TString& mdbGateway, - const bool mdbTransformHost) + bool mdbTransformHost, + const TString& traceId) : ActorSystem(actorSystem) , Recipient(recipient) , YdbMvpEndpoint(ydbMvpEndpoint) , MdbGateway(mdbGateway) , MdbTransformHost(mdbTransformHost) + , TraceId(traceId) {} -NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const { - auto promise = NThreading::NewPromise<TEvents::TDbResolverResponse>(); +TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds( + const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const +{ + auto promise = NewPromise<TEvents::TDbResolverResponse>(); TDuration timeout = TDuration::Seconds(40); auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>( [promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable { @@ -31,8 +36,8 @@ NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::Resolv NActors::TActorId callbackId = ActorSystem->Register(callback.Release()); ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId, - new TEvents::TEvEndpointRequest(params.Ids, YdbMvpEndpoint, MdbGateway, - params.TraceId, MdbTransformHost))); + new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway, + TraceId, MdbTransformHost))); return promise.GetFuture(); } diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h index 27b8f3a6f3a..10ca0c956e4 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h @@ -4,23 +4,26 @@ namespace NYq { -class TDatabaseAsyncResolver : public IDatabaseAsyncResolver { +class TDatabaseAsyncResolverImpl : public IDatabaseAsyncResolver { public: - TDatabaseAsyncResolver( + TDatabaseAsyncResolverImpl( NActors::TActorSystem* actorSystem, const NActors::TActorId& recipient, const TString& ydbMvpEndpoint, const TString& mdbGateway, - const bool mdbTransformHost + bool mdbTransformHost = false, + const TString& traceId = "" ); - NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override; + NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds( + const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const override; private: NActors::TActorSystem* ActorSystem; const NActors::TActorId Recipient; const TString YdbMvpEndpoint; const TString MdbGateway; const bool MdbTransformHost = false; + const TString TraceId; }; } // NYq diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp deleted file mode 100644 index 027415d9d78..00000000000 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp +++ /dev/null @@ -1,72 +0,0 @@ -#include "db_async_resolver_with_meta.h" -#include <ydb/core/yq/libs/common/database_token_builder.h> - -namespace NYq { - - TDatabaseAsyncResolverWithMeta::TDatabaseAsyncResolverWithMeta( - NActors::TActorSystem* actorSystem, - const NActors::TActorId& recipient, - const TString& ydbMvpEndpoint, - const TString& mdbGateway, - const bool mdbTransformHost, - const TString& traceId, - const TString& token, - const THashMap<TString, TString>& accountIdSignatures, - const THashMap<TString, YandexQuery::Connection>& connections) - : DbResolver(actorSystem, recipient, ydbMvpEndpoint, mdbGateway, mdbTransformHost) - , TraceId(traceId) - , Token(token) - , AccountIdSignatures(accountIdSignatures) - , Connections(connections) - {} - - NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const { - return DbResolver.ResolveIds(params); - } - - TString TDatabaseAsyncResolverWithMeta::GetTraceId() const { - return TraceId; - } - - TString TDatabaseAsyncResolverWithMeta::GetToken() const { - return Token; - } - - const THashMap<TString, TString>& TDatabaseAsyncResolverWithMeta::GetAccountIdSignatures() const { - return AccountIdSignatures; - } - - void TDatabaseAsyncResolverWithMeta::TryAddDbIdToResolve( - const bool isEndpoint, - const TString& clusterName, - const TString& dbId, - const DatabaseType type, - THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds) const { - if (isEndpoint) { - return; - } - const auto iter = Connections.find(clusterName); - if (iter == Connections.end()) { - return; - } - const auto& conn = iter->second; - const auto& setting = conn.content().setting(); - YandexQuery::IamAuth auth; - switch (type) { - case DatabaseType::Ydb: - auth = setting.ydb_database().auth(); - break; - case DatabaseType::ClickHouse: - auth = setting.clickhouse_cluster().auth(); - break; - case DatabaseType::DataStreams: - auth = setting.data_streams().auth(); - break; - default: - return; - } - TryAddDatabaseToResolve(auth, dbId, type, Token, AccountIdSignatures, databaseIds); - } - -} // NYq - diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h deleted file mode 100644 index 48fa9df0412..00000000000 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once -#include "db_async_resolver_impl.h" - -namespace NYq { - -class TDatabaseAsyncResolverWithMeta : public IDatabaseAsyncResolver { -public: - TDatabaseAsyncResolverWithMeta( - NActors::TActorSystem* actorSystem, - const NActors::TActorId& recipient, - const TString& ydbMvpEndpoint, - const TString& mdbGateway, - const bool mdbTransformHost, - const TString& traceId, - const TString& token, - const THashMap<TString, TString>& accountIdSignatures, - const THashMap<TString, YandexQuery::Connection>& connections - ); - - NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override; - - TString GetTraceId() const; - - TString GetToken() const; - - const THashMap<TString, TString>& GetAccountIdSignatures() const; - - void TryAddDbIdToResolve( - const bool isEndpoint, - const TString& clusterName, - const TString& dbId, - const DatabaseType type, - THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds - ) const; - -private: - TDatabaseAsyncResolver DbResolver; - const TString TraceId; - const TString Token; - const THashMap<TString, TString> AccountIdSignatures; - const THashMap<TString, YandexQuery::Connection> Connections; -}; - -} // NYq
\ No newline at end of file diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make index 2262759e360..659fb003039 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make +++ b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make @@ -4,12 +4,12 @@ LIBRARY() SRCS( db_async_resolver_impl.cpp - db_async_resolver_with_meta.cpp ) PEERDIR( - ydb/core/yq/libs/common + library/cpp/threading/future ydb/core/yq/libs/events + ydb/library/yql/providers/dq/actors ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp index 976469de6aa..349798c9310 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp @@ -35,7 +35,7 @@ public: return TStatus::Ok; const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) + AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) *res = std::move(future.ExtractValue()); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp index bb9650d5cc9..ef26fc7a82f 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp @@ -37,7 +37,7 @@ public: return TStatus::Ok; const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) + AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) *res = std::move(future.ExtractValue()); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp index b3f700c1475..5b51d4c0f8c 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp @@ -12,8 +12,8 @@ namespace NYql { TDataProviderInitializer GetPqDataProviderInitializer( IPqGateway::TPtr gateway, bool supportRtmrMode, - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta) { - return [gateway, supportRtmrMode, dbResolverWithMeta] ( + std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) { + return [gateway, supportRtmrMode, dbResolver] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -33,9 +33,9 @@ TDataProviderInitializer GetPqDataProviderInitializer( state->SupportRtmrMode = supportRtmrMode; state->Types = typeCtx.Get(); state->FunctionRegistry = functionRegistry; - state->DbResolver = dbResolverWithMeta; + state->DbResolver = dbResolver; if (gatewaysConfig) { - state->Configuration->Init(gatewaysConfig->GetPq(), typeCtx, dbResolverWithMeta, state->DatabaseIds); + state->Configuration->Init(gatewaysConfig->GetPq(), typeCtx, dbResolver, state->DatabaseIds); } state->Gateway = gateway; state->DqIntegration = CreatePqDqIntegration(state); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h index 6b86fd29a6e..7e2e39b746c 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h @@ -5,7 +5,7 @@ #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/providers/dq/interface/yql_dq_integration.h> #include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h> -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NKikimr::NMiniKQL { class IFunctionRegistry; @@ -53,13 +53,13 @@ public: IPqGateway::TPtr Gateway; THolder<IDqIntegration> DqIntegration; THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver; + std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer GetPqDataProviderInitializer( IPqGateway::TPtr gateway, bool supportRtmrMode = false, - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr + std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr ); } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp index 278db980f41..e4dead34134 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp @@ -17,7 +17,7 @@ TPqSettings::TConstPtr TPqConfiguration::Snapshot() const { void TPqConfiguration::Init( const TPqGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver, + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); @@ -43,19 +43,21 @@ void TPqConfiguration::Init( clusterSettings.UseSsl = cluster.GetUseSsl(); clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken(); + const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken()); + clusterSettings.AuthToken = authToken; + const auto structuredTokenJson = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken); + Tokens[clusterSettings.ClusterName] = structuredTokenJson; + if (dbResolver) { YQL_CLOG(DEBUG, ProviderPq) << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetDatabaseId() << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() << ", HasEndpoint = " << (cluster.HasEndpoint() ? "TRUE" : "FALSE") ; - dbResolver->TryAddDbIdToResolve(cluster.HasEndpoint(), cluster.GetName(), cluster.GetDatabaseId(), NYq::DatabaseType::DataStreams, databaseIds); if (cluster.GetDatabaseId()) { + databaseIds[std::make_pair(cluster.GetDatabaseId(), NYq::DatabaseType::DataStreams)] = + NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; DbId2Clusters[cluster.GetDatabaseId()].emplace_back(cluster.GetName()); YQL_CLOG(DEBUG, ProviderPq) << "Add dbId: " << cluster.GetDatabaseId() << " to DbId2Clusters"; } } - - const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken()); - clusterSettings.AuthToken = authToken; - Tokens[clusterSettings.ClusterName] = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken); } FreezeDefaults(); } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h index cdf55e9d2bd..1337d7dac24 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h @@ -7,8 +7,7 @@ #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/core/yq/libs/events/events.h> -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h> -#include <ydb/core/yq/libs/common/database_token_builder.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NYql { @@ -42,7 +41,7 @@ struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher void Init( const TPqGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver, + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds); TString GetDatabaseForTopic(const TString& cluster) const; diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp index b837a61aa60..ff0c5fd5087 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp @@ -60,7 +60,7 @@ public: return TStatus::Ok; } const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) + AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) *res = std::move(future.ExtractValue()); diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp index d17e353d3ed..11aa5060a22 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp @@ -7,8 +7,8 @@ namespace NYql { TDataProviderInitializer GetYdbDataProviderInitializer( NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta) { - return [driver, credentialsFactory, dbResolverWithMeta] ( + std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) { + return [driver, credentialsFactory, dbResolver] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -30,7 +30,7 @@ TDataProviderInitializer GetYdbDataProviderInitializer( state->Types = typeCtx.Get(); state->FunctionRegistry = functionRegistry; state->CredentialsFactory = credentialsFactory; - state->DbResolver = dbResolverWithMeta; + state->DbResolver = dbResolver; if (gatewaysConfig) { state->Configuration->Init(gatewaysConfig->GetYdb(), typeCtx, state->DbResolver, state->DatabaseIds); } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h index 41c55868f7e..56cd6e93314 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h @@ -6,7 +6,7 @@ #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/lib/experimental/ydb_clickhouse_internal.h> -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NKikimr::NMiniKQL { class IFunctionRegistry; @@ -34,13 +34,13 @@ struct TYdbState : public TThrRefBase const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver; + std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer GetYdbDataProviderInitializer( NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr ); TIntrusivePtr<IDataProvider> CreateYdbDataSource( diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp index 06d1919027c..a582395c3c6 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp @@ -20,7 +20,7 @@ bool TYdbConfiguration::HasCluster(TStringBuf cluster) const { void TYdbConfiguration::Init( const TYdbGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver, + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); @@ -37,9 +37,14 @@ void TYdbConfiguration::Init( for (const auto& cluster : config.GetClusterMapping()) { this->Dispatch(cluster.GetName(), cluster.GetSettings()); + const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + cluster.GetName(), "default_ydb", cluster.GetToken()); + const auto structuredTokenJson = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken); + Tokens[cluster.GetName()] = structuredTokenJson; + if (dbResolver) { - dbResolver->TryAddDbIdToResolve(cluster.HasEndpoint(), cluster.GetName(), cluster.GetId(), NYq::DatabaseType::Ydb, databaseIds); if (cluster.GetId()) { + databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::Ydb)] = + NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); } } @@ -54,8 +59,6 @@ void TYdbConfiguration::Init( if (cluster.HasAddBearerToToken()) settings.AddBearerToToken = cluster.GetAddBearerToToken(); - const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + cluster.GetName(), "default_ydb", cluster.GetToken()); - Tokens[cluster.GetName()] = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken); settings.Raw = cluster; } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h index 9bb003df99e..7a27452cc59 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h @@ -6,8 +6,7 @@ #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/core/yq/libs/events/events.h> -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h> -#include <ydb/core/yq/libs/common/database_token_builder.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NYql { @@ -35,7 +34,7 @@ struct TYdbConfiguration : public TYdbSettings, public NCommon::TSettingDispatch void Init( const TYdbGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver, + const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds ); |