aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordinmukhammed <dinmukhammed@yandex-team.ru>2022-02-15 18:04:37 +0300
committerdinmukhammed <dinmukhammed@yandex-team.ru>2022-02-15 18:04:37 +0300
commit457c97d555bfbe3f9351eb22a932b9965e6816a6 (patch)
tree64b6f6aef25dbac3a4c60d33eedf2aac519102ab
parentfb226f95af53bfc8b810a8d1e80f3d09daecfba3 (diff)
downloadydb-457c97d555bfbe3f9351eb22a932b9965e6816a6.tar.gz
YQ-831 Refactoring DatabaseAsyncResolver
Refactoring ref:99c3eae012ffa2985b4824a0b9aeebff6f5efb3c
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp1
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp7
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp2
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp6
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h9
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h12
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h8
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp17
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h11
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp72
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h44
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/ya.make4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp8
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_provider.h6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp14
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_settings.h5
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp2
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp6
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h6
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp11
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h5
23 files changed, 76 insertions, 184 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 0095963c8f1..0084f9c37d2 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -67,6 +67,7 @@
namespace NYq {
using namespace NActors;
+using namespace NYql;
namespace {
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index ada7b0d464c..ef272b0404b 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -69,7 +69,6 @@
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
#include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h>
-#include <ydb/core/yq/libs/common/database_token_builder.h>
#include <ydb/core/yq/libs/private_client/private_client.h>
#define LOG_E(stream) \
@@ -283,7 +282,6 @@ private:
LOG_D("Connection with empty name " << connection.meta().id());
continue;
}
- Connections[connection.content().name()] = connection; // Necessary for TDatabaseAsyncResolverWithMeta
YqConnections.emplace(connection.meta().id(), connection);
}
}
@@ -1138,8 +1136,8 @@ private:
clusters);
TVector<TDataProviderInitializer> dataProvidersInit;
- const auto dbResolver = std::make_shared<TDatabaseAsyncResolverWithMeta>(TDatabaseAsyncResolverWithMeta(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
- Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId, Params.AuthToken, Params.AccountIdSignatures, Connections));
+ const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
+ Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId);
{
// TBD: move init to better place
QueryStateUpdateRequest.set_scope(Params.Scope.ToString());
@@ -1350,7 +1348,6 @@ private:
bool EnableCheckpointCoordinator = false;
bool RetryNeeded = false;
Yq::Private::PingTaskRequest QueryStateUpdateRequest;
- THashMap<TString, YandexQuery::Connection> Connections; // Necessary for DbAsyncResolver
const ui64 MaxTasksPerOperation = 100;
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
index 8f1ac3839e8..704431b1fb0 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
@@ -66,7 +66,7 @@ public:
}
const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) {
+ AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
if (const auto res = response.lock())
*res = std::move(future.ExtractValue());
});
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp
index 15c4d834e3c..3cdfb17866e 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp
@@ -6,9 +6,9 @@ namespace NYql {
TDataProviderInitializer GetClickHouseDataProviderInitializer(
IHTTPGateway::TPtr gateway,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta)
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver)
{
- return [gateway, dbResolverWithMeta] (
+ return [gateway, dbResolver] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
@@ -29,7 +29,7 @@ TDataProviderInitializer GetClickHouseDataProviderInitializer(
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
- state->DbResolver = dbResolverWithMeta;
+ state->DbResolver = dbResolver;
if (gatewaysConfig) {
state->Configuration->Init(gatewaysConfig->GetClickHouse(), state->DbResolver, state->DatabaseIds);
}
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
index 28ac8b244b7..17ebb38b7b3 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
@@ -1,9 +1,10 @@
#pragma once
+#include "yql_clickhouse_settings.h"
+
#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include "yql_clickhouse_settings.h"
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NKikimr::NMiniKQL {
class IFunctionRegistry;
@@ -27,12 +28,12 @@ struct TClickHouseState : public TThrRefBase
TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver;
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer GetClickHouseDataProviderInitializer(
IHTTPGateway::TPtr gateway,
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
);
TIntrusivePtr<IDataProvider> CreateClickHouseDataSource(TClickHouseState::TPtr state, IHTTPGateway::TPtr gateway);
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
index 8f6eecf9382..c206844b4f9 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
@@ -1,12 +1,13 @@
#pragma once
+#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/providers/common/config/yql_dispatch.h>
#include <ydb/library/yql/providers/common/config/yql_setting.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/core/yq/libs/events/events.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
-#include <ydb/core/yq/libs/common/database_token_builder.h>
+
namespace NYql {
@@ -23,7 +24,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
template <typename TProtoConfig>
void Init(
const TProtoConfig& config,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver,
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
@@ -37,10 +38,11 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
for (auto& cluster: config.GetClusterMapping()) {
this->Dispatch(cluster.GetName(), cluster.GetSettings());
- if (dbResolver) { //TODO: change log level
+ if (dbResolver) {
YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName()
<< ", clusterDbId = " << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ;
- dbResolver->TryAddDbIdToResolve(cluster.HasCluster(), cluster.GetName(), cluster.GetId(), NYq::DatabaseType::ClickHouse, databaseIds);
+ databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::ClickHouse)] =
+ NYq::TEvents::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false};
if (cluster.GetId()) {
DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
YQL_CLOG(DEBUG, ProviderClickHouse) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters";
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
index a1e4ab18a1d..733bed54b02 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
@@ -5,14 +5,10 @@
namespace NYq {
-struct TResolveParams {
- THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth> Ids;
- TString TraceId;
-};
-
class IDatabaseAsyncResolver {
public:
- virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const = 0;
+ virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(
+ const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const = 0;
virtual ~IDatabaseAsyncResolver() = default;
};
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp
index d4fcc6e6dcb..e13093b4637 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp
@@ -1,22 +1,27 @@
#include "db_async_resolver_impl.h"
namespace NYq {
+using namespace NThreading;
-TDatabaseAsyncResolver::TDatabaseAsyncResolver(
+TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl(
NActors::TActorSystem* actorSystem,
const NActors::TActorId& recipient,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
- const bool mdbTransformHost)
+ bool mdbTransformHost,
+ const TString& traceId)
: ActorSystem(actorSystem)
, Recipient(recipient)
, YdbMvpEndpoint(ydbMvpEndpoint)
, MdbGateway(mdbGateway)
, MdbTransformHost(mdbTransformHost)
+ , TraceId(traceId)
{}
-NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const {
- auto promise = NThreading::NewPromise<TEvents::TDbResolverResponse>();
+TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
+ const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const
+{
+ auto promise = NewPromise<TEvents::TDbResolverResponse>();
TDuration timeout = TDuration::Seconds(40);
auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>(
[promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable {
@@ -31,8 +36,8 @@ NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::Resolv
NActors::TActorId callbackId = ActorSystem->Register(callback.Release());
ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId,
- new TEvents::TEvEndpointRequest(params.Ids, YdbMvpEndpoint, MdbGateway,
- params.TraceId, MdbTransformHost)));
+ new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway,
+ TraceId, MdbTransformHost)));
return promise.GetFuture();
}
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h
index 27b8f3a6f3a..10ca0c956e4 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h
@@ -4,23 +4,26 @@
namespace NYq {
-class TDatabaseAsyncResolver : public IDatabaseAsyncResolver {
+class TDatabaseAsyncResolverImpl : public IDatabaseAsyncResolver {
public:
- TDatabaseAsyncResolver(
+ TDatabaseAsyncResolverImpl(
NActors::TActorSystem* actorSystem,
const NActors::TActorId& recipient,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
- const bool mdbTransformHost
+ bool mdbTransformHost = false,
+ const TString& traceId = ""
);
- NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override;
+ NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(
+ const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const override;
private:
NActors::TActorSystem* ActorSystem;
const NActors::TActorId Recipient;
const TString YdbMvpEndpoint;
const TString MdbGateway;
const bool MdbTransformHost = false;
+ const TString TraceId;
};
} // NYq
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp
deleted file mode 100644
index 027415d9d78..00000000000
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-#include "db_async_resolver_with_meta.h"
-#include <ydb/core/yq/libs/common/database_token_builder.h>
-
-namespace NYq {
-
- TDatabaseAsyncResolverWithMeta::TDatabaseAsyncResolverWithMeta(
- NActors::TActorSystem* actorSystem,
- const NActors::TActorId& recipient,
- const TString& ydbMvpEndpoint,
- const TString& mdbGateway,
- const bool mdbTransformHost,
- const TString& traceId,
- const TString& token,
- const THashMap<TString, TString>& accountIdSignatures,
- const THashMap<TString, YandexQuery::Connection>& connections)
- : DbResolver(actorSystem, recipient, ydbMvpEndpoint, mdbGateway, mdbTransformHost)
- , TraceId(traceId)
- , Token(token)
- , AccountIdSignatures(accountIdSignatures)
- , Connections(connections)
- {}
-
- NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const {
- return DbResolver.ResolveIds(params);
- }
-
- TString TDatabaseAsyncResolverWithMeta::GetTraceId() const {
- return TraceId;
- }
-
- TString TDatabaseAsyncResolverWithMeta::GetToken() const {
- return Token;
- }
-
- const THashMap<TString, TString>& TDatabaseAsyncResolverWithMeta::GetAccountIdSignatures() const {
- return AccountIdSignatures;
- }
-
- void TDatabaseAsyncResolverWithMeta::TryAddDbIdToResolve(
- const bool isEndpoint,
- const TString& clusterName,
- const TString& dbId,
- const DatabaseType type,
- THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds) const {
- if (isEndpoint) {
- return;
- }
- const auto iter = Connections.find(clusterName);
- if (iter == Connections.end()) {
- return;
- }
- const auto& conn = iter->second;
- const auto& setting = conn.content().setting();
- YandexQuery::IamAuth auth;
- switch (type) {
- case DatabaseType::Ydb:
- auth = setting.ydb_database().auth();
- break;
- case DatabaseType::ClickHouse:
- auth = setting.clickhouse_cluster().auth();
- break;
- case DatabaseType::DataStreams:
- auth = setting.data_streams().auth();
- break;
- default:
- return;
- }
- TryAddDatabaseToResolve(auth, dbId, type, Token, AccountIdSignatures, databaseIds);
- }
-
-} // NYq
-
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h
deleted file mode 100644
index 48fa9df0412..00000000000
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h
+++ /dev/null
@@ -1,44 +0,0 @@
-#pragma once
-#include "db_async_resolver_impl.h"
-
-namespace NYq {
-
-class TDatabaseAsyncResolverWithMeta : public IDatabaseAsyncResolver {
-public:
- TDatabaseAsyncResolverWithMeta(
- NActors::TActorSystem* actorSystem,
- const NActors::TActorId& recipient,
- const TString& ydbMvpEndpoint,
- const TString& mdbGateway,
- const bool mdbTransformHost,
- const TString& traceId,
- const TString& token,
- const THashMap<TString, TString>& accountIdSignatures,
- const THashMap<TString, YandexQuery::Connection>& connections
- );
-
- NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override;
-
- TString GetTraceId() const;
-
- TString GetToken() const;
-
- const THashMap<TString, TString>& GetAccountIdSignatures() const;
-
- void TryAddDbIdToResolve(
- const bool isEndpoint,
- const TString& clusterName,
- const TString& dbId,
- const DatabaseType type,
- THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds
- ) const;
-
-private:
- TDatabaseAsyncResolver DbResolver;
- const TString TraceId;
- const TString Token;
- const THashMap<TString, TString> AccountIdSignatures;
- const THashMap<TString, YandexQuery::Connection> Connections;
-};
-
-} // NYq \ No newline at end of file
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
index 2262759e360..659fb003039 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
@@ -4,12 +4,12 @@ LIBRARY()
SRCS(
db_async_resolver_impl.cpp
- db_async_resolver_with_meta.cpp
)
PEERDIR(
- ydb/core/yq/libs/common
+ library/cpp/threading/future
ydb/core/yq/libs/events
+ ydb/library/yql/providers/dq/actors
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
index 976469de6aa..349798c9310 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
@@ -35,7 +35,7 @@ public:
return TStatus::Ok;
const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future)
+ AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
*res = std::move(future.ExtractValue());
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
index bb9650d5cc9..ef26fc7a82f 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
@@ -37,7 +37,7 @@ public:
return TStatus::Ok;
const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future)
+ AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
*res = std::move(future.ExtractValue());
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
index b3f700c1475..5b51d4c0f8c 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
@@ -12,8 +12,8 @@ namespace NYql {
TDataProviderInitializer GetPqDataProviderInitializer(
IPqGateway::TPtr gateway,
bool supportRtmrMode,
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta) {
- return [gateway, supportRtmrMode, dbResolverWithMeta] (
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) {
+ return [gateway, supportRtmrMode, dbResolver] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
@@ -33,9 +33,9 @@ TDataProviderInitializer GetPqDataProviderInitializer(
state->SupportRtmrMode = supportRtmrMode;
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
- state->DbResolver = dbResolverWithMeta;
+ state->DbResolver = dbResolver;
if (gatewaysConfig) {
- state->Configuration->Init(gatewaysConfig->GetPq(), typeCtx, dbResolverWithMeta, state->DatabaseIds);
+ state->Configuration->Init(gatewaysConfig->GetPq(), typeCtx, dbResolver, state->DatabaseIds);
}
state->Gateway = gateway;
state->DqIntegration = CreatePqDqIntegration(state);
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
index 6b86fd29a6e..7e2e39b746c 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
@@ -5,7 +5,7 @@
#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/providers/dq/interface/yql_dq_integration.h>
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NKikimr::NMiniKQL {
class IFunctionRegistry;
@@ -53,13 +53,13 @@ public:
IPqGateway::TPtr Gateway;
THolder<IDqIntegration> DqIntegration;
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver;
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer GetPqDataProviderInitializer(
IPqGateway::TPtr gateway,
bool supportRtmrMode = false,
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
);
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
index 278db980f41..e4dead34134 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
@@ -17,7 +17,7 @@ TPqSettings::TConstPtr TPqConfiguration::Snapshot() const {
void TPqConfiguration::Init(
const TPqGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver,
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
@@ -43,19 +43,21 @@ void TPqConfiguration::Init(
clusterSettings.UseSsl = cluster.GetUseSsl();
clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken();
+ const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken());
+ clusterSettings.AuthToken = authToken;
+ const auto structuredTokenJson = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken);
+ Tokens[clusterSettings.ClusterName] = structuredTokenJson;
+
if (dbResolver) {
YQL_CLOG(DEBUG, ProviderPq) << "Settings: clusterName = " << cluster.GetName()
<< ", clusterDbId = " << cluster.GetDatabaseId() << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() << ", HasEndpoint = " << (cluster.HasEndpoint() ? "TRUE" : "FALSE") ;
- dbResolver->TryAddDbIdToResolve(cluster.HasEndpoint(), cluster.GetName(), cluster.GetDatabaseId(), NYq::DatabaseType::DataStreams, databaseIds);
if (cluster.GetDatabaseId()) {
+ databaseIds[std::make_pair(cluster.GetDatabaseId(), NYq::DatabaseType::DataStreams)] =
+ NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
DbId2Clusters[cluster.GetDatabaseId()].emplace_back(cluster.GetName());
YQL_CLOG(DEBUG, ProviderPq) << "Add dbId: " << cluster.GetDatabaseId() << " to DbId2Clusters";
}
}
-
- const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken());
- clusterSettings.AuthToken = authToken;
- Tokens[clusterSettings.ClusterName] = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken);
}
FreezeDefaults();
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
index cdf55e9d2bd..1337d7dac24 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
@@ -7,8 +7,7 @@
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/core/yq/libs/events/events.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
-#include <ydb/core/yq/libs/common/database_token_builder.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NYql {
@@ -42,7 +41,7 @@ struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher
void Init(
const TPqGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver,
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds);
TString GetDatabaseForTopic(const TString& cluster) const;
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
index b837a61aa60..ff0c5fd5087 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
@@ -60,7 +60,7 @@ public:
return TStatus::Ok;
}
const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future)
+ AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
*res = std::move(future.ExtractValue());
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp
index d17e353d3ed..11aa5060a22 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp
@@ -7,8 +7,8 @@ namespace NYql {
TDataProviderInitializer GetYdbDataProviderInitializer(
NYdb::TDriver driver,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta) {
- return [driver, credentialsFactory, dbResolverWithMeta] (
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) {
+ return [driver, credentialsFactory, dbResolver] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
@@ -30,7 +30,7 @@ TDataProviderInitializer GetYdbDataProviderInitializer(
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
state->CredentialsFactory = credentialsFactory;
- state->DbResolver = dbResolverWithMeta;
+ state->DbResolver = dbResolver;
if (gatewaysConfig) {
state->Configuration->Init(gatewaysConfig->GetYdb(), typeCtx, state->DbResolver, state->DatabaseIds);
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
index 41c55868f7e..56cd6e93314 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
@@ -6,7 +6,7 @@
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/lib/experimental/ydb_clickhouse_internal.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NKikimr::NMiniKQL {
class IFunctionRegistry;
@@ -34,13 +34,13 @@ struct TYdbState : public TThrRefBase
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> DbResolver;
+ std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer GetYdbDataProviderInitializer(
NYdb::TDriver driver,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolverWithMeta = nullptr
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
);
TIntrusivePtr<IDataProvider> CreateYdbDataSource(
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
index 06d1919027c..a582395c3c6 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
@@ -20,7 +20,7 @@ bool TYdbConfiguration::HasCluster(TStringBuf cluster) const {
void TYdbConfiguration::Init(
const TYdbGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver,
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
@@ -37,9 +37,14 @@ void TYdbConfiguration::Init(
for (const auto& cluster : config.GetClusterMapping()) {
this->Dispatch(cluster.GetName(), cluster.GetSettings());
+ const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + cluster.GetName(), "default_ydb", cluster.GetToken());
+ const auto structuredTokenJson = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken);
+ Tokens[cluster.GetName()] = structuredTokenJson;
+
if (dbResolver) {
- dbResolver->TryAddDbIdToResolve(cluster.HasEndpoint(), cluster.GetName(), cluster.GetId(), NYq::DatabaseType::Ydb, databaseIds);
if (cluster.GetId()) {
+ databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::Ydb)] =
+ NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
}
}
@@ -54,8 +59,6 @@ void TYdbConfiguration::Init(
if (cluster.HasAddBearerToToken())
settings.AddBearerToToken = cluster.GetAddBearerToToken();
- const TString authToken = typeCtx->FindCredentialContent("cluster:default_" + cluster.GetName(), "default_ydb", cluster.GetToken());
- Tokens[cluster.GetName()] = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken);
settings.Raw = cluster;
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
index 9bb003df99e..7a27452cc59 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
@@ -6,8 +6,7 @@
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/core/yq/libs/events/events.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_with_meta.h>
-#include <ydb/core/yq/libs/common/database_token_builder.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NYql {
@@ -35,7 +34,7 @@ struct TYdbConfiguration : public TYdbSettings, public NCommon::TSettingDispatch
void Init(
const TYdbGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::TDatabaseAsyncResolverWithMeta> dbResolver,
+ const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds
);