aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-03-23 12:30:48 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-03-23 12:30:48 +0300
commit3db4a2c7361fe6e945546319a890ef1ef84e44d6 (patch)
tree4bcfa4fbf9f0d454353b44bacbd58370dd896d55
parent97b4fd5e68d79891f0705fd4f5953d5a03a44309 (diff)
downloadydb-3db4a2c7361fe6e945546319a890ef1ef84e44d6.tar.gz
YQ-911 Use YDB driver from YQ shared resources in internal YDB connections for CP and checkpointing
Use CaCert setting for client Refactor YDB connection Refactor ydb lib Split two drivers ref:b529cad63e7e5aa22cb8d32f77f150e75135f71a
-rw-r--r--ydb/core/testlib/CMakeLists.txt1
-rw-r--r--ydb/core/testlib/test_client.cpp10
-rw-r--r--ydb/core/testlib/test_client.h2
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp2
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp12
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h5
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp19
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_proxy.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_service.cpp5
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_service.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp46
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp28
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp2
-rw-r--r--ydb/core/yq/libs/init/init.cpp10
-rw-r--r--ydb/core/yq/libs/shared_resources/shared_resources.cpp8
-rw-r--r--ydb/core/yq/libs/shared_resources/shared_resources.h6
-rw-r--r--ydb/core/yq/libs/ydb/create_schema.cpp6
-rw-r--r--ydb/core/yq/libs/ydb/ydb.cpp71
-rw-r--r--ydb/core/yq/libs/ydb/ydb.h15
24 files changed, 155 insertions, 118 deletions
diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt
index c55d5096f8..d58dbacfe0 100644
--- a/ydb/core/testlib/CMakeLists.txt
+++ b/ydb/core/testlib/CMakeLists.txt
@@ -62,6 +62,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
libs-audit-mock
yq-libs-init
yq-libs-mock
+ yq-libs-shared_resources
ydb-library-aclib
library-folder_service-mock
library-mkql_proto-protos
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 67799128cb..4dcaa93f4e 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -804,7 +804,7 @@ namespace Tests {
const auto ydbCredFactory = NKikimr::CreateYdbCredentialsProviderFactory;
auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
- auto yqSharedResources = NYq::CreateYqSharedResources(protoConfig, ydbCredFactory, counters);
+ YqSharedResources = NYq::CreateYqSharedResources(protoConfig, ydbCredFactory, counters);
NYq::Init(
protoConfig,
Runtime->GetNodeId(nodeIdx),
@@ -812,13 +812,13 @@ namespace Tests {
&appData,
"TestTenant",
nullptr, // MakeIntrusive<NPq::NConfigurationManager::TConnections>(),
- yqSharedResources,
+ YqSharedResources,
NKikimr::NFolderService::CreateMockFolderServiceActor,
NYq::CreateMockYqAuditServiceActor,
ydbCredFactory,
/*IcPort = */0
);
- NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, yqSharedResources);
+ NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, YqSharedResources);
}
}
@@ -885,6 +885,10 @@ namespace Tests {
GRpcServer->Stop();
}
+ if (YqSharedResources) {
+ YqSharedResources->Stop();
+ }
+
if (Runtime) {
Runtime.Destroy();
}
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 0f1bfa7a46..b3673907ae 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -24,6 +24,7 @@
#include <ydb/core/kesus/tablet/events.h>
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/base/grpc_service_factory.h>
+#include <ydb/core/yq/libs/shared_resources/interface/shared_resources.h>
#include <google/protobuf/text_format.h>
@@ -260,6 +261,7 @@ namespace Tests {
TAutoPtr<NMsgBusProxy::IMessageBusServer> BusServer;
std::unique_ptr<NGrpc::TGRpcServer> GRpcServer;
TIntrusivePtr<NMonitoring::TDynamicCounters> GRpcServerRootCounters;
+ NYq::IYqSharedResources::TPtr YqSharedResources;
};
class TClient {
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index 30c1a01343..fbeab5a647 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -73,7 +73,7 @@ public:
, YqSharedResources(yqSharedResources)
, IcPort(icPort)
, Client(
- YqSharedResources->YdbDriver,
+ YqSharedResources->CoreYdbDriver,
NYdb::TCommonClientSettings()
.DiscoveryEndpoint(PrivateApiConfig.GetTaskServiceEndpoint())
.EnableSsl(PrivateApiConfig.GetSecureTaskService())
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 9c40cd890f..f24225aaf2 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -160,7 +160,7 @@ public:
, Guid(CreateGuidAsString())
, ClientCounters(clientCounters)
, Client(
- YqSharedResources->YdbDriver,
+ YqSharedResources->CoreYdbDriver,
NYdb::TCommonClientSettings()
.DiscoveryEndpoint(PrivateApiConfig.GetTaskServiceEndpoint())
.EnableSsl(PrivateApiConfig.GetSecureTaskService())
@@ -336,7 +336,7 @@ private:
const auto createdAt = TInstant::Now();
TRunActorParams params(
- YqSharedResources->YdbDriver, S3Gateway,
+ YqSharedResources, S3Gateway,
FunctionRegistry, RandomProvider,
ModuleResolver, ModuleResolver->GetNextUniqueId(),
DqCompFactory, PqCmConnections,
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 451a9f7db1..ab28285641 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -113,7 +113,7 @@ public:
Params.QueryId,
Params.Owner,
TPrivateClient(
- Params.Driver,
+ Params.YqSharedResources->CoreYdbDriver,
NYdb::TCommonClientSettings()
.DiscoveryEndpoint(Params.PrivateApiConfig.GetTaskServiceEndpoint())
.EnableSsl(Params.PrivateApiConfig.GetSecureTaskService())
@@ -412,7 +412,7 @@ private:
::NYq::MakeReadRuleCreatorActor(
SelfId(),
Params.QueryId,
- Params.Driver,
+ Params.YqSharedResources->UserSpaceYdbDriver,
std::move(TopicsForConsumersCreation),
std::move(CredentialsForConsumersCreation)
)
@@ -771,7 +771,7 @@ private:
::NYq::MakeReadRuleDeleterActor(
SelfId(),
Params.QueryId,
- Params.Driver,
+ Params.YqSharedResources->UserSpaceYdbDriver,
Params.CreatedTopicConsumers,
std::move(credentials)
)
@@ -832,7 +832,7 @@ private:
}
resultId = NActors::TActivationContext::Register(
CreateResultWriter(
- Params.Driver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig,
+ Params.YqSharedResources->UserSpaceYdbDriver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig,
writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ClientCounters));
} else {
LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
@@ -1096,7 +1096,7 @@ private:
}
{
- dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.Driver, Params.CredentialsFactory, dbResolver));
+ dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}
{
@@ -1109,7 +1109,7 @@ private:
{
NYql::TPqGatewayServices pqServices(
- Params.Driver,
+ Params.YqSharedResources->UserSpaceYdbDriver,
Params.PqCmConnections,
Params.CredentialsFactory,
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index 14933dd6e7..09d9003d11 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -5,7 +5,7 @@ namespace NYq {
using namespace NActors;
TRunActorParams::TRunActorParams(
- NYdb::TDriver driver,
+ TYqSharedResources::TPtr yqSharedResources,
NYql::IHTTPGateway::TPtr s3Gateway,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<IRandomProvider> randomProvider,
@@ -48,7 +48,7 @@ TRunActorParams::TRunActorParams(
TInstant createdAt,
const TString& tenantName
)
- : Driver(driver)
+ : YqSharedResources(yqSharedResources)
, S3Gateway(s3Gateway)
, FunctionRegistry(functionRegistry)
, RandomProvider(randomProvider)
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index fd907234f3..1f25e807ff 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -3,6 +3,7 @@
#include <ydb/core/yq/libs/config/protos/pinger.pb.h>
#include <ydb/core/yq/libs/config/protos/yq_config.pb.h>
#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
@@ -19,7 +20,7 @@ namespace NYq {
struct TRunActorParams { // TODO2 : Change name
TRunActorParams(
- NYdb::TDriver driver,
+ TYqSharedResources::TPtr yqSharedResources,
NYql::IHTTPGateway::TPtr s3Gateway,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<IRandomProvider> randomProvider,
@@ -66,7 +67,7 @@ struct TRunActorParams { // TODO2 : Change name
TRunActorParams(const TRunActorParams& params) = default;
TRunActorParams(TRunActorParams&& params) = default;
- NYdb::TDriver Driver;
+ TYqSharedResources::TPtr YqSharedResources;
NYql::IHTTPGateway::TPtr S3Gateway;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
TIntrusivePtr<IRandomProvider> RandomProvider;
diff --git a/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt b/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt
index f2ab9f4661..735c28fa79 100644
--- a/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt
+++ b/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt
@@ -22,6 +22,7 @@ target_link_libraries(yq-libs-checkpoint_storage PUBLIC
libs-checkpoint_storage-events
libs-checkpoint_storage-proto
yq-libs-checkpointing_common
+ yq-libs-shared_resources
ydb-library-security
cpp-client-ydb_scheme
cpp-client-ydb_table
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
index 07c8d5101a..f504653e6f 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
@@ -41,12 +41,14 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
TStateStoragePtr StateStorage;
TActorId ActorGC;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
+ TYqSharedResources::TPtr YqSharedResources;
public:
explicit TStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
void Bootstrap();
@@ -100,22 +102,24 @@ static void FillDefaultParameters(NConfig::TCheckpointCoordinatorConfig& checkpo
TStorageProxy::TStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
: Config(config)
, CommonConfig(commonConfig)
, StorageConfig(Config.GetStorage())
- , CredentialsProviderFactory(credentialsProviderFactory) {
+ , CredentialsProviderFactory(credentialsProviderFactory)
+ , YqSharedResources(yqSharedResources) {
FillDefaultParameters(Config, StorageConfig);
}
void TStorageProxy::Bootstrap() {
- CheckpointStorage = NewYdbCheckpointStorage(StorageConfig, CredentialsProviderFactory, CreateEntityIdGenerator(CommonConfig.GetIdsPrefix()));
+ CheckpointStorage = NewYdbCheckpointStorage(StorageConfig, CredentialsProviderFactory, CreateEntityIdGenerator(CommonConfig.GetIdsPrefix()), YqSharedResources);
auto issues = CheckpointStorage->Init().GetValueSync();
if (!issues.Empty()) {
LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint storage: " << issues.ToOneLineString());
}
- StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory);
+ StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory, YqSharedResources);
issues = StateStorage->Init().GetValueSync();
if (!issues.Empty()) {
LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint state storage: " << issues.ToOneLineString());
@@ -393,9 +397,10 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
std::unique_ptr<NActors::IActor> NewStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
- return std::unique_ptr<NActors::IActor>(new TStorageProxy(config, commonConfig, credentialsProviderFactory));
+ return std::unique_ptr<NActors::IActor>(new TStorageProxy(config, commonConfig, credentialsProviderFactory, yqSharedResources));
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h
index c67d928a15..f487024867 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h
@@ -2,6 +2,7 @@
#include <ydb/core/yq/libs/config/protos/checkpoint_coordinator.pb.h>
#include <ydb/core/yq/libs/config/protos/common.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
@@ -14,6 +15,7 @@ namespace NYq {
std::unique_ptr<NActors::IActor> NewStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp
index 421c19d7fd..aa45745711 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp
@@ -11,9 +11,10 @@ using namespace NActors;
std::unique_ptr<NActors::IActor> NewCheckpointStorageService(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
- return NewStorageProxy(config, commonConfig, credentialsProviderFactory);
+ return NewStorageProxy(config, commonConfig, credentialsProviderFactory, yqSharedResources);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_service.h b/ydb/core/yq/libs/checkpoint_storage/storage_service.h
index 5eab844545..44c426ef05 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_service.h
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_service.h
@@ -2,6 +2,7 @@
#include <ydb/core/yq/libs/config/protos/checkpoint_coordinator.pb.h>
#include <ydb/core/yq/libs/config/protos/common.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
@@ -14,6 +15,7 @@ namespace NYq {
std::unique_ptr<NActors::IActor> NewCheckpointStorageService(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
index a5349371db..a3308b9206 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
@@ -548,6 +548,7 @@ TFuture<TStatus> UpdateCheckpointWithCheckWrapper(
////////////////////////////////////////////////////////////////////////////////
class TCheckpointStorage : public ICheckpointStorage {
+ TYqSharedResources::TPtr YqSharedResources;
TYdbConnectionPtr YdbConnection;
const NConfig::TYdbStorageConfig Config;
@@ -555,7 +556,8 @@ public:
explicit TCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator);
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources);
~TCheckpointStorage() = default;
@@ -624,8 +626,10 @@ private:
TCheckpointStorage::TCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator)
- : YdbConnection(NewYdbConnection(config, credentialsProviderFactory))
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources)
+ : YqSharedResources(yqSharedResources)
+ , YdbConnection(NewYdbConnection(config, credentialsProviderFactory, YqSharedResources->CoreYdbDriver))
, Config(config)
, EntityIdGenerator(entityIdGenerator)
{
@@ -637,8 +641,7 @@ TFuture<TIssues> TCheckpointStorage::Init()
// TODO: list at first?
if (YdbConnection->DB != YdbConnection->TablePathPrefix) {
- auto schemeClient = NScheme::TSchemeClient(YdbConnection->Driver);
- auto status = schemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
+ auto status = YdbConnection->SchemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
if (!status.IsSuccess() && status.GetStatus() != EStatus::ALREADY_EXISTS) {
issues = status.GetIssues();
@@ -713,7 +716,7 @@ TFuture<TIssues> TCheckpointStorage::Init()
TFuture<TIssues> TCheckpointStorage::RegisterGraphCoordinator(const TCoordinatorId& coordinator)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator] (TSession session) {
auto context = MakeIntrusive<TGenerationContext>(
session,
@@ -734,7 +737,7 @@ TFuture<TIssues> TCheckpointStorage::RegisterGraphCoordinator(const TCoordinator
TFuture<ICheckpointStorage::TGetCoordinatorsResult> TCheckpointStorage::GetCoordinators() {
auto getContext = MakeIntrusive<TGetCoordinatorsContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, getContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -788,7 +791,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateCheckpointImpl(const TCoordinatorId& coordinator, const TCheckpointContextPtr& checkpointContext) {
Y_VERIFY(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId || checkpointContext->EntityIdGenerator);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -823,7 +826,7 @@ TFuture<TIssues> TCheckpointStorage::UpdateCheckpointStatus(
ECheckpointStatus prevStatus)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -849,7 +852,7 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint(
const TCheckpointId& checkpointId)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -879,7 +882,7 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp
{
auto getContext = MakeIntrusive<TGetCheckpointsContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graph, getContext, statuses, limit, loadGraphDescription, settings = DefaultExecDataQuerySettings()] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -906,7 +909,7 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp
}
TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId] (TSession session) {
// TODO: use prepared queries
auto query = Sprintf(R"(
@@ -944,7 +947,7 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC(
const TString& graphId,
const TCheckpointId& checkpointUpperBound)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
// TODO: use prepared queries
auto query = Sprintf(R"(
@@ -984,7 +987,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
const TString& graphId,
const TCheckpointId& checkpointUpperBound)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
// TODO: use prepared queries
using namespace fmt::literals;
@@ -1055,9 +1058,9 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
return StatusToIssues(future);
}
-TFuture<ICheckpointStorage::TAddToStateSizeResult> NYq::TCheckpointStorage::AddToStateSize(const TString& graphId, const TCheckpointId& checkpointId, ui64 size) {
+TFuture<ICheckpointStorage::TAddToStateSizeResult> TCheckpointStorage::AddToStateSize(const TString& graphId, const TCheckpointId& checkpointId, ui64 size) {
auto result = MakeIntrusive<TAddToStateSizeContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, size, result, thisPtr = TIntrusivePtr(this)](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
@@ -1120,9 +1123,9 @@ TFuture<ICheckpointStorage::TAddToStateSizeResult> NYq::TCheckpointStorage::AddT
});
}
-TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> NYq::TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
+TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
@@ -1169,7 +1172,7 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> NYq::TCheckpoin
});
}
-TExecDataQuerySettings NYq::TCheckpointStorage::DefaultExecDataQuerySettings() {
+TExecDataQuerySettings TCheckpointStorage::DefaultExecDataQuerySettings() {
return TExecDataQuerySettings()
.KeepInQueryCache(true)
.ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec()))
@@ -1184,10 +1187,11 @@ TExecDataQuerySettings NYq::TCheckpointStorage::DefaultExecDataQuerySettings() {
TCheckpointStoragePtr NewYdbCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator)
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
Y_VERIFY(entityIdGenerator);
- return new TCheckpointStorage(config, credentialsProviderFactory, entityIdGenerator);
+ return new TCheckpointStorage(config, credentialsProviderFactory, entityIdGenerator, yqSharedResources);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h
index c52b006f91..aac3ca84c3 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h
@@ -5,6 +5,7 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/yq/libs/common/entity_id.h>
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
namespace NYq {
@@ -13,6 +14,7 @@ namespace NYq {
TCheckpointStoragePtr NewYdbCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator);
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
index ab209ec1e8..516ce8a210 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
@@ -146,13 +146,15 @@ TFuture<TStatus> ProcessState(
////////////////////////////////////////////////////////////////////////////////
class TStateStorage : public IStateStorage {
+ TYqSharedResources::TPtr YqSharedResources;
TYdbConnectionPtr YdbConnection;
const NConfig::TYdbStorageConfig Config;
public:
explicit TStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
~TStateStorage() = default;
TFuture<TIssues> Init() override;
@@ -188,8 +190,10 @@ public:
TStateStorage::TStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
- : YdbConnection(NewYdbConnection(config, credentialsProviderFactory))
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
+ : YqSharedResources(yqSharedResources)
+ , YdbConnection(NewYdbConnection(config, credentialsProviderFactory, YqSharedResources->UserSpaceYdbDriver))
, Config(config)
{
}
@@ -201,8 +205,7 @@ TFuture<TIssues> TStateStorage::Init()
// TODO: list at first?
if (YdbConnection->DB != YdbConnection->TablePathPrefix) {
//LOG_STREAMS_STORAGE_SERVICE_INFO("Creating directory: " << YdbConnection->TablePathPrefix);
- auto schemeClient = NScheme::TSchemeClient(YdbConnection->Driver);
- auto status = schemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
+ auto status = YdbConnection->SchemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
if (!status.IsSuccess() && status.GetStatus() != EStatus::ALREADY_EXISTS) {
issues = status.GetIssues();
@@ -250,7 +253,7 @@ TFuture<TIssues> TStateStorage::SaveState(
const TCheckpointId& checkpointId,
const NYql::NDqProto::TComputeActorState& state)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, taskId, graphId, checkpointId, state, thisPtr = TIntrusivePtr(this)] (TSession session) {
auto context = MakeIntrusive<TContext>(
prefix,
@@ -289,7 +292,7 @@ TFuture<IStateStorage::TGetStateResult> TStateStorage::GetState(
graphId,
checkpointId);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[context, thisPtr = TIntrusivePtr(this)] (TSession session) {
context->Session = session;
auto future = thisPtr->SelectState(context);
@@ -314,7 +317,7 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates(
{
auto context = MakeIntrusive<TCountStateContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, context, thisPtr = TIntrusivePtr(this)] (TSession session) {
// publish nodes
@@ -379,7 +382,7 @@ TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() {
}
TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this)] (TSession session) {
// publish nodes
@@ -419,7 +422,7 @@ TFuture<TIssues> TStateStorage::DeleteCheckpoints(
const TString& graphId,
const TCheckpointId& checkpointUpperBound)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
// publish nodes
@@ -556,9 +559,10 @@ TFuture<TStatus> TStateStorage::UpsertState(const TContextPtr& context) {
TStateStoragePtr NewYdbStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
- return new TStateStorage(config, credentialsProviderFactory);
+ return new TStateStorage(config, credentialsProviderFactory, yqSharedResources);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h
index 61dba973de..c9e5e73de6 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h
@@ -4,6 +4,7 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
namespace NYq {
@@ -11,6 +12,7 @@ namespace NYq {
TStateStoragePtr NewYdbStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 9ec0900952..160865be64 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -48,7 +48,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YQ_CONTROL_PLANE_STORAGE_PROVIDER));
DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10);
- YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory);
+ YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver);
CreateDirectory();
CreateQueriesTable();
CreatePendingSmallTable();
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 22149a7865..ae8c7bf7d2 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -97,7 +97,7 @@ void Init(
}
if (protoConfig.GetCheckpointCoordinator().GetEnabled()) {
- auto checkpointStorage = NYq::NewCheckpointStorageService(protoConfig.GetCheckpointCoordinator(), protoConfig.GetCommon(), credentialsProviderFactory);
+ auto checkpointStorage = NYq::NewCheckpointStorageService(protoConfig.GetCheckpointCoordinator(), protoConfig.GetCommon(), credentialsProviderFactory, yqSharedResources);
actorRegistrator(NYql::NDq::MakeCheckpointStorageID(), checkpointStorage.release());
}
@@ -106,7 +106,7 @@ void Init(
NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
NYql::GetCommonDqFactory(),
- NYql::GetDqYdbFactory(yqSharedResources->YdbDriver),
+ NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver),
NKikimr::NMiniKQL::GetYqlFactory()
});
@@ -132,13 +132,13 @@ void Init(
}
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent);
- RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
- RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory);
+ RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
+ RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory,
httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
RegisterClickHouseReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway);
- RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->YdbDriver, credentialsFactory);
+ RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterDQSolomonWriteActorFactory(*sinkActorFactory, credentialsFactory);
}
diff --git a/ydb/core/yq/libs/shared_resources/shared_resources.cpp b/ydb/core/yq/libs/shared_resources/shared_resources.cpp
index 3dfd566232..131d67d2bd 100644
--- a/ydb/core/yq/libs/shared_resources/shared_resources.cpp
+++ b/ydb/core/yq/libs/shared_resources/shared_resources.cpp
@@ -99,7 +99,8 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
}
void Stop() override {
- YdbDriver.Stop(true);
+ CoreYdbDriver.Stop(true);
+ // UserSpaceYdbDriver.Stop(true); // For now it points to the same driver as CoreYdbDriver, so don't call Stop
}
NYdb::TDriverConfig GetYdbDriverConfig(const NYq::NConfig::TYdbDriverConfig& config) {
@@ -122,7 +123,7 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
const NYq::NConfig::TDbPoolConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const NMonitoring::TDynamicCounterPtr& counters) {
- DbPoolHolder = MakeIntrusive<NYq::TDbPoolHolder>(config, YdbDriver, credentialsProviderFactory, counters);
+ DbPoolHolder = MakeIntrusive<NYq::TDbPoolHolder>(config, CoreYdbDriver, credentialsProviderFactory, counters);
}
};
@@ -136,7 +137,8 @@ TYqSharedResources::TPtr CreateYqSharedResourcesImpl(
}
TYqSharedResources::TYqSharedResources(NYdb::TDriver driver)
- : YdbDriver(std::move(driver))
+ : CoreYdbDriver(driver)
+ , UserSpaceYdbDriver(std::move(driver))
{
}
diff --git a/ydb/core/yq/libs/shared_resources/shared_resources.h b/ydb/core/yq/libs/shared_resources/shared_resources.h
index bfd1ec6695..bd8e5e3950 100644
--- a/ydb/core/yq/libs/shared_resources/shared_resources.h
+++ b/ydb/core/yq/libs/shared_resources/shared_resources.h
@@ -16,7 +16,11 @@ struct TYqSharedResources : public IYqSharedResources {
static TPtr Cast(const IYqSharedResources::TPtr& ptr);
// Resources
- NYdb::TDriver YdbDriver;
+
+ // Separated YDB drivers for user queries execution and for YQ core usage.
+ // For now they are actually point to the same driver, but it can be changed in the future.
+ NYdb::TDriver CoreYdbDriver;
+ NYdb::TDriver UserSpaceYdbDriver;
TDbPoolHolder::TPtr DbPoolHolder;
protected:
diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp
index 51af951a06..a0ce2efe22 100644
--- a/ydb/core/yq/libs/ydb/create_schema.cpp
+++ b/ydb/core/yq/libs/ydb/create_schema.cpp
@@ -165,7 +165,7 @@ protected:
}
void CreateSession() {
- Connection->Client.GetSession().Subscribe(
+ Connection->TableClient.GetSession().Subscribe(
[actorId = SelfId(), actorSystem = NActors::TActivationContext::ActorSystem()](const NYdb::NTable::TAsyncCreateSessionResult& result) {
actorSystem->Send(actorId, new TEvPrivate::TEvCreateSessionResult(result.GetValue()));
}
@@ -234,7 +234,6 @@ public:
ui64 cookie)
: TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie)
, DirectoryPath(directoryPath)
- , SchemeClient(Connection->Driver)
{
}
@@ -245,12 +244,11 @@ private:
NYdb::TAsyncStatus CallYdbSdk() override {
SCHEMA_LOG_DEBUG("Call create directory \"" << DirectoryPath << "\"");
- return SchemeClient.MakeDirectory(DirectoryPath);
+ return Connection->SchemeClient.MakeDirectory(DirectoryPath);
}
private:
const TString DirectoryPath;
- NYdb::NScheme::TSchemeClient SchemeClient;
};
NActors::IActor* MakeCreateTableActor(
diff --git a/ydb/core/yq/libs/ydb/ydb.cpp b/ydb/core/yq/libs/ydb/ydb.cpp
index d6096abce3..190efa13df 100644
--- a/ydb/core/yq/libs/ydb/ydb.cpp
+++ b/ydb/core/yq/libs/ydb/ydb.cpp
@@ -22,6 +22,9 @@ namespace {
////////////////////////////////////////////////////////////////////////////////
+template <class TSettings>
+TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
+
TFuture<TDataQueryResult> SelectGeneration(const TGenerationContextPtr& context) {
// TODO: use prepared queries
@@ -166,29 +169,9 @@ TFuture<TStatus> RegisterGenerationWrapper(
});
}
-} // namespace
-
-////////////////////////////////////////////////////////////////////////////////
-
-TYdbConnection::TYdbConnection(const TDriverConfig& driverConfig,
- const NConfig::TYdbStorageConfig& config)
- : Driver(driverConfig)
- , Client(Driver)
- , DB(config.GetDatabase())
- , TablePathPrefix(JoinPath(DB, config.GetTablePrefix()))
-{
-}
-
-TYdbConnection::~TYdbConnection()
-{
- Driver.Stop(true);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-
-NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
+template <class TSettings>
+TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
TString oauth;
if (config.GetToken()) {
oauth = config.GetToken();
@@ -201,9 +184,10 @@ NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config,
const TString iamEndpoint = config.GetIamEndpoint();
const TString saKeyFile = config.GetSaKeyFile();
- auto driverConfig = TDriverConfig()
- .SetEndpoint(config.GetEndpoint())
- .SetDatabase(config.GetDatabase());
+ TSettings settings;
+ settings
+ .DiscoveryEndpoint(config.GetEndpoint())
+ .Database(config.GetDatabase());
NKikimr::TYdbCredentialsSettings credSettings;
credSettings.UseLocalMetadata = config.GetUseLocalMetadataService();
@@ -211,24 +195,43 @@ NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config,
credSettings.SaKeyFile = config.GetSaKeyFile();
credSettings.IamEndpoint = config.GetIamEndpoint();
- driverConfig.SetCredentialsProviderFactory(credProviderFactory(credSettings));
+ settings.CredentialsProviderFactory(credProviderFactory(credSettings));
if (config.GetUseLocalMetadataService()) {
- driverConfig.UseSecureConnection();
+ settings.EnableSsl(true);
}
if (config.GetCertificateFile()) {
auto cert = StripString(TFileInput(config.GetCertificateFile()).ReadAll());
- driverConfig.UseSecureConnection(cert);
+ settings
+ .EnableSsl(true)
+ .CaCert(cert);
}
- return driverConfig;
+ return settings;
}
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TYdbConnection::TYdbConnection(const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const NYdb::TDriver& driver)
+ : Driver(driver)
+ , TableClient(Driver, GetClientSettings<NYdb::NTable::TClientSettings>(config, credProviderFactory))
+ , SchemeClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory))
+ , DB(config.GetDatabase())
+ , TablePathPrefix(JoinPath(DB, config.GetTablePrefix()))
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
TYdbConnectionPtr NewYdbConnection(const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
- auto driverConfig = GetDriverConfig(config, credProviderFactory);
- return MakeIntrusive<TYdbConnection>(driverConfig, config);
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const NYdb::TDriver& driver) {
+ return MakeIntrusive<TYdbConnection>(config, credProviderFactory, driver);
}
TStatus MakeErrorStatus(
@@ -266,7 +269,7 @@ TFuture<TStatus> CreateTable(
{
auto tablePath = JoinPath(ydbConnection->TablePathPrefix, name.c_str());
- return ydbConnection->Client.RetryOperation(
+ return ydbConnection->TableClient.RetryOperation(
[tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
return session.CreateTable(tablePath, TTableDescription(description));
});
diff --git a/ydb/core/yq/libs/ydb/ydb.h b/ydb/core/yq/libs/ydb/ydb.h
index 14c0742288..9675dd8181 100644
--- a/ydb/core/yq/libs/ydb/ydb.h
+++ b/ydb/core/yq/libs/ydb/ydb.h
@@ -4,6 +4,7 @@
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
namespace NYq {
@@ -11,15 +12,15 @@ namespace NYq {
struct TYdbConnection : public TThrRefBase {
NYdb::TDriver Driver;
- NYdb::NTable::TTableClient Client;
+ NYdb::NTable::TTableClient TableClient;
+ NYdb::NScheme::TSchemeClient SchemeClient;
const TString DB;
const TString TablePathPrefix;
TYdbConnection(
- const NYdb::TDriverConfig& driverConfig,
- const NConfig::TYdbStorageConfig& config);
-
- ~TYdbConnection();
+ const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const NYdb::TDriver& driver);
};
using TYdbConnectionPtr = TIntrusivePtr<TYdbConnection>;
@@ -94,9 +95,7 @@ using TGenerationContextPtr = TIntrusivePtr<TGenerationContext>;
////////////////////////////////////////////////////////////////////////////////
-NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
-
-TYdbConnectionPtr NewYdbConnection(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
+TYdbConnectionPtr NewYdbConnection(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NYdb::TDriver& driver);
NYdb::TStatus MakeErrorStatus(
NYdb::EStatus code,