aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-03-23 12:30:48 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-03-23 12:30:48 +0300
commit3db4a2c7361fe6e945546319a890ef1ef84e44d6 (patch)
tree4bcfa4fbf9f0d454353b44bacbd58370dd896d55
parent97b4fd5e68d79891f0705fd4f5953d5a03a44309 (diff)
downloadydb-3db4a2c7361fe6e945546319a890ef1ef84e44d6.tar.gz
YQ-911 Use YDB driver from YQ shared resources in internal YDB connections for CP and checkpointing
Use CaCert setting for client Refactor YDB connection Refactor ydb lib Split two drivers ref:b529cad63e7e5aa22cb8d32f77f150e75135f71a
-rw-r--r--ydb/core/testlib/CMakeLists.txt1
-rw-r--r--ydb/core/testlib/test_client.cpp10
-rw-r--r--ydb/core/testlib/test_client.h2
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp2
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp12
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h5
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp19
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_proxy.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_service.cpp5
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_service.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp46
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp28
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp2
-rw-r--r--ydb/core/yq/libs/init/init.cpp10
-rw-r--r--ydb/core/yq/libs/shared_resources/shared_resources.cpp8
-rw-r--r--ydb/core/yq/libs/shared_resources/shared_resources.h6
-rw-r--r--ydb/core/yq/libs/ydb/create_schema.cpp6
-rw-r--r--ydb/core/yq/libs/ydb/ydb.cpp71
-rw-r--r--ydb/core/yq/libs/ydb/ydb.h15
24 files changed, 155 insertions, 118 deletions
diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt
index c55d5096f89..d58dbacfe05 100644
--- a/ydb/core/testlib/CMakeLists.txt
+++ b/ydb/core/testlib/CMakeLists.txt
@@ -62,6 +62,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
libs-audit-mock
yq-libs-init
yq-libs-mock
+ yq-libs-shared_resources
ydb-library-aclib
library-folder_service-mock
library-mkql_proto-protos
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 67799128cb0..4dcaa93f4e2 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -804,7 +804,7 @@ namespace Tests {
const auto ydbCredFactory = NKikimr::CreateYdbCredentialsProviderFactory;
auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
- auto yqSharedResources = NYq::CreateYqSharedResources(protoConfig, ydbCredFactory, counters);
+ YqSharedResources = NYq::CreateYqSharedResources(protoConfig, ydbCredFactory, counters);
NYq::Init(
protoConfig,
Runtime->GetNodeId(nodeIdx),
@@ -812,13 +812,13 @@ namespace Tests {
&appData,
"TestTenant",
nullptr, // MakeIntrusive<NPq::NConfigurationManager::TConnections>(),
- yqSharedResources,
+ YqSharedResources,
NKikimr::NFolderService::CreateMockFolderServiceActor,
NYq::CreateMockYqAuditServiceActor,
ydbCredFactory,
/*IcPort = */0
);
- NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, yqSharedResources);
+ NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, YqSharedResources);
}
}
@@ -885,6 +885,10 @@ namespace Tests {
GRpcServer->Stop();
}
+ if (YqSharedResources) {
+ YqSharedResources->Stop();
+ }
+
if (Runtime) {
Runtime.Destroy();
}
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 0f1bfa7a46b..b3673907ae4 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -24,6 +24,7 @@
#include <ydb/core/kesus/tablet/events.h>
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/base/grpc_service_factory.h>
+#include <ydb/core/yq/libs/shared_resources/interface/shared_resources.h>
#include <google/protobuf/text_format.h>
@@ -260,6 +261,7 @@ namespace Tests {
TAutoPtr<NMsgBusProxy::IMessageBusServer> BusServer;
std::unique_ptr<NGrpc::TGRpcServer> GRpcServer;
TIntrusivePtr<NMonitoring::TDynamicCounters> GRpcServerRootCounters;
+ NYq::IYqSharedResources::TPtr YqSharedResources;
};
class TClient {
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index 30c1a013434..fbeab5a6474 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -73,7 +73,7 @@ public:
, YqSharedResources(yqSharedResources)
, IcPort(icPort)
, Client(
- YqSharedResources->YdbDriver,
+ YqSharedResources->CoreYdbDriver,
NYdb::TCommonClientSettings()
.DiscoveryEndpoint(PrivateApiConfig.GetTaskServiceEndpoint())
.EnableSsl(PrivateApiConfig.GetSecureTaskService())
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 9c40cd890f1..f24225aaf28 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -160,7 +160,7 @@ public:
, Guid(CreateGuidAsString())
, ClientCounters(clientCounters)
, Client(
- YqSharedResources->YdbDriver,
+ YqSharedResources->CoreYdbDriver,
NYdb::TCommonClientSettings()
.DiscoveryEndpoint(PrivateApiConfig.GetTaskServiceEndpoint())
.EnableSsl(PrivateApiConfig.GetSecureTaskService())
@@ -336,7 +336,7 @@ private:
const auto createdAt = TInstant::Now();
TRunActorParams params(
- YqSharedResources->YdbDriver, S3Gateway,
+ YqSharedResources, S3Gateway,
FunctionRegistry, RandomProvider,
ModuleResolver, ModuleResolver->GetNextUniqueId(),
DqCompFactory, PqCmConnections,
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 451a9f7db19..ab282856410 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -113,7 +113,7 @@ public:
Params.QueryId,
Params.Owner,
TPrivateClient(
- Params.Driver,
+ Params.YqSharedResources->CoreYdbDriver,
NYdb::TCommonClientSettings()
.DiscoveryEndpoint(Params.PrivateApiConfig.GetTaskServiceEndpoint())
.EnableSsl(Params.PrivateApiConfig.GetSecureTaskService())
@@ -412,7 +412,7 @@ private:
::NYq::MakeReadRuleCreatorActor(
SelfId(),
Params.QueryId,
- Params.Driver,
+ Params.YqSharedResources->UserSpaceYdbDriver,
std::move(TopicsForConsumersCreation),
std::move(CredentialsForConsumersCreation)
)
@@ -771,7 +771,7 @@ private:
::NYq::MakeReadRuleDeleterActor(
SelfId(),
Params.QueryId,
- Params.Driver,
+ Params.YqSharedResources->UserSpaceYdbDriver,
Params.CreatedTopicConsumers,
std::move(credentials)
)
@@ -832,7 +832,7 @@ private:
}
resultId = NActors::TActivationContext::Register(
CreateResultWriter(
- Params.Driver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig,
+ Params.YqSharedResources->UserSpaceYdbDriver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig,
writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ClientCounters));
} else {
LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
@@ -1096,7 +1096,7 @@ private:
}
{
- dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.Driver, Params.CredentialsFactory, dbResolver));
+ dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}
{
@@ -1109,7 +1109,7 @@ private:
{
NYql::TPqGatewayServices pqServices(
- Params.Driver,
+ Params.YqSharedResources->UserSpaceYdbDriver,
Params.PqCmConnections,
Params.CredentialsFactory,
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index 14933dd6e7c..09d9003d116 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -5,7 +5,7 @@ namespace NYq {
using namespace NActors;
TRunActorParams::TRunActorParams(
- NYdb::TDriver driver,
+ TYqSharedResources::TPtr yqSharedResources,
NYql::IHTTPGateway::TPtr s3Gateway,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<IRandomProvider> randomProvider,
@@ -48,7 +48,7 @@ TRunActorParams::TRunActorParams(
TInstant createdAt,
const TString& tenantName
)
- : Driver(driver)
+ : YqSharedResources(yqSharedResources)
, S3Gateway(s3Gateway)
, FunctionRegistry(functionRegistry)
, RandomProvider(randomProvider)
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index fd907234f35..1f25e807ffc 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -3,6 +3,7 @@
#include <ydb/core/yq/libs/config/protos/pinger.pb.h>
#include <ydb/core/yq/libs/config/protos/yq_config.pb.h>
#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
@@ -19,7 +20,7 @@ namespace NYq {
struct TRunActorParams { // TODO2 : Change name
TRunActorParams(
- NYdb::TDriver driver,
+ TYqSharedResources::TPtr yqSharedResources,
NYql::IHTTPGateway::TPtr s3Gateway,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<IRandomProvider> randomProvider,
@@ -66,7 +67,7 @@ struct TRunActorParams { // TODO2 : Change name
TRunActorParams(const TRunActorParams& params) = default;
TRunActorParams(TRunActorParams&& params) = default;
- NYdb::TDriver Driver;
+ TYqSharedResources::TPtr YqSharedResources;
NYql::IHTTPGateway::TPtr S3Gateway;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
TIntrusivePtr<IRandomProvider> RandomProvider;
diff --git a/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt b/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt
index f2ab9f4661b..735c28fa79f 100644
--- a/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt
+++ b/ydb/core/yq/libs/checkpoint_storage/CMakeLists.txt
@@ -22,6 +22,7 @@ target_link_libraries(yq-libs-checkpoint_storage PUBLIC
libs-checkpoint_storage-events
libs-checkpoint_storage-proto
yq-libs-checkpointing_common
+ yq-libs-shared_resources
ydb-library-security
cpp-client-ydb_scheme
cpp-client-ydb_table
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
index 07c8d5101a0..f504653e6f2 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
@@ -41,12 +41,14 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
TStateStoragePtr StateStorage;
TActorId ActorGC;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
+ TYqSharedResources::TPtr YqSharedResources;
public:
explicit TStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
void Bootstrap();
@@ -100,22 +102,24 @@ static void FillDefaultParameters(NConfig::TCheckpointCoordinatorConfig& checkpo
TStorageProxy::TStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
: Config(config)
, CommonConfig(commonConfig)
, StorageConfig(Config.GetStorage())
- , CredentialsProviderFactory(credentialsProviderFactory) {
+ , CredentialsProviderFactory(credentialsProviderFactory)
+ , YqSharedResources(yqSharedResources) {
FillDefaultParameters(Config, StorageConfig);
}
void TStorageProxy::Bootstrap() {
- CheckpointStorage = NewYdbCheckpointStorage(StorageConfig, CredentialsProviderFactory, CreateEntityIdGenerator(CommonConfig.GetIdsPrefix()));
+ CheckpointStorage = NewYdbCheckpointStorage(StorageConfig, CredentialsProviderFactory, CreateEntityIdGenerator(CommonConfig.GetIdsPrefix()), YqSharedResources);
auto issues = CheckpointStorage->Init().GetValueSync();
if (!issues.Empty()) {
LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint storage: " << issues.ToOneLineString());
}
- StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory);
+ StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory, YqSharedResources);
issues = StateStorage->Init().GetValueSync();
if (!issues.Empty()) {
LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint state storage: " << issues.ToOneLineString());
@@ -393,9 +397,10 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
std::unique_ptr<NActors::IActor> NewStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
- return std::unique_ptr<NActors::IActor>(new TStorageProxy(config, commonConfig, credentialsProviderFactory));
+ return std::unique_ptr<NActors::IActor>(new TStorageProxy(config, commonConfig, credentialsProviderFactory, yqSharedResources));
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h
index c67d928a151..f487024867c 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.h
@@ -2,6 +2,7 @@
#include <ydb/core/yq/libs/config/protos/checkpoint_coordinator.pb.h>
#include <ydb/core/yq/libs/config/protos/common.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
@@ -14,6 +15,7 @@ namespace NYq {
std::unique_ptr<NActors::IActor> NewStorageProxy(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp
index 421c19d7fdb..aa45745711b 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_service.cpp
@@ -11,9 +11,10 @@ using namespace NActors;
std::unique_ptr<NActors::IActor> NewCheckpointStorageService(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
- return NewStorageProxy(config, commonConfig, credentialsProviderFactory);
+ return NewStorageProxy(config, commonConfig, credentialsProviderFactory, yqSharedResources);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_service.h b/ydb/core/yq/libs/checkpoint_storage/storage_service.h
index 5eab844545a..44c426ef05b 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_service.h
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_service.h
@@ -2,6 +2,7 @@
#include <ydb/core/yq/libs/config/protos/checkpoint_coordinator.pb.h>
#include <ydb/core/yq/libs/config/protos/common.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
@@ -14,6 +15,7 @@ namespace NYq {
std::unique_ptr<NActors::IActor> NewCheckpointStorageService(
const NConfig::TCheckpointCoordinatorConfig& config,
const NConfig::TCommonConfig& commonConfig,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
index a5349371db9..a3308b9206b 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
@@ -548,6 +548,7 @@ TFuture<TStatus> UpdateCheckpointWithCheckWrapper(
////////////////////////////////////////////////////////////////////////////////
class TCheckpointStorage : public ICheckpointStorage {
+ TYqSharedResources::TPtr YqSharedResources;
TYdbConnectionPtr YdbConnection;
const NConfig::TYdbStorageConfig Config;
@@ -555,7 +556,8 @@ public:
explicit TCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator);
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources);
~TCheckpointStorage() = default;
@@ -624,8 +626,10 @@ private:
TCheckpointStorage::TCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator)
- : YdbConnection(NewYdbConnection(config, credentialsProviderFactory))
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources)
+ : YqSharedResources(yqSharedResources)
+ , YdbConnection(NewYdbConnection(config, credentialsProviderFactory, YqSharedResources->CoreYdbDriver))
, Config(config)
, EntityIdGenerator(entityIdGenerator)
{
@@ -637,8 +641,7 @@ TFuture<TIssues> TCheckpointStorage::Init()
// TODO: list at first?
if (YdbConnection->DB != YdbConnection->TablePathPrefix) {
- auto schemeClient = NScheme::TSchemeClient(YdbConnection->Driver);
- auto status = schemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
+ auto status = YdbConnection->SchemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
if (!status.IsSuccess() && status.GetStatus() != EStatus::ALREADY_EXISTS) {
issues = status.GetIssues();
@@ -713,7 +716,7 @@ TFuture<TIssues> TCheckpointStorage::Init()
TFuture<TIssues> TCheckpointStorage::RegisterGraphCoordinator(const TCoordinatorId& coordinator)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator] (TSession session) {
auto context = MakeIntrusive<TGenerationContext>(
session,
@@ -734,7 +737,7 @@ TFuture<TIssues> TCheckpointStorage::RegisterGraphCoordinator(const TCoordinator
TFuture<ICheckpointStorage::TGetCoordinatorsResult> TCheckpointStorage::GetCoordinators() {
auto getContext = MakeIntrusive<TGetCoordinatorsContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, getContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -788,7 +791,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateCheckpointImpl(const TCoordinatorId& coordinator, const TCheckpointContextPtr& checkpointContext) {
Y_VERIFY(checkpointContext->CheckpointGraphDescriptionContext->GraphDescId || checkpointContext->EntityIdGenerator);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -823,7 +826,7 @@ TFuture<TIssues> TCheckpointStorage::UpdateCheckpointStatus(
ECheckpointStatus prevStatus)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -849,7 +852,7 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint(
const TCheckpointId& checkpointId)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -879,7 +882,7 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp
{
auto getContext = MakeIntrusive<TGetCheckpointsContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graph, getContext, statuses, limit, loadGraphDescription, settings = DefaultExecDataQuerySettings()] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
session,
@@ -906,7 +909,7 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp
}
TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId] (TSession session) {
// TODO: use prepared queries
auto query = Sprintf(R"(
@@ -944,7 +947,7 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC(
const TString& graphId,
const TCheckpointId& checkpointUpperBound)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
// TODO: use prepared queries
auto query = Sprintf(R"(
@@ -984,7 +987,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
const TString& graphId,
const TCheckpointId& checkpointUpperBound)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
// TODO: use prepared queries
using namespace fmt::literals;
@@ -1055,9 +1058,9 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
return StatusToIssues(future);
}
-TFuture<ICheckpointStorage::TAddToStateSizeResult> NYq::TCheckpointStorage::AddToStateSize(const TString& graphId, const TCheckpointId& checkpointId, ui64 size) {
+TFuture<ICheckpointStorage::TAddToStateSizeResult> TCheckpointStorage::AddToStateSize(const TString& graphId, const TCheckpointId& checkpointId, ui64 size) {
auto result = MakeIntrusive<TAddToStateSizeContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, size, result, thisPtr = TIntrusivePtr(this)](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
@@ -1120,9 +1123,9 @@ TFuture<ICheckpointStorage::TAddToStateSizeResult> NYq::TCheckpointStorage::AddT
});
}
-TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> NYq::TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
+TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
@@ -1169,7 +1172,7 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> NYq::TCheckpoin
});
}
-TExecDataQuerySettings NYq::TCheckpointStorage::DefaultExecDataQuerySettings() {
+TExecDataQuerySettings TCheckpointStorage::DefaultExecDataQuerySettings() {
return TExecDataQuerySettings()
.KeepInQueryCache(true)
.ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec()))
@@ -1184,10 +1187,11 @@ TExecDataQuerySettings NYq::TCheckpointStorage::DefaultExecDataQuerySettings() {
TCheckpointStoragePtr NewYdbCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator)
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
Y_VERIFY(entityIdGenerator);
- return new TCheckpointStorage(config, credentialsProviderFactory, entityIdGenerator);
+ return new TCheckpointStorage(config, credentialsProviderFactory, entityIdGenerator, yqSharedResources);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h
index c52b006f916..aac3ca84c34 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.h
@@ -5,6 +5,7 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/yq/libs/common/entity_id.h>
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
namespace NYq {
@@ -13,6 +14,7 @@ namespace NYq {
TCheckpointStoragePtr NewYdbCheckpointStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const IEntityIdGenerator::TPtr& entityIdGenerator);
+ const IEntityIdGenerator::TPtr& entityIdGenerator,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
index ab209ec1e89..516ce8a2100 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
@@ -146,13 +146,15 @@ TFuture<TStatus> ProcessState(
////////////////////////////////////////////////////////////////////////////////
class TStateStorage : public IStateStorage {
+ TYqSharedResources::TPtr YqSharedResources;
TYdbConnectionPtr YdbConnection;
const NConfig::TYdbStorageConfig Config;
public:
explicit TStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
~TStateStorage() = default;
TFuture<TIssues> Init() override;
@@ -188,8 +190,10 @@ public:
TStateStorage::TStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
- : YdbConnection(NewYdbConnection(config, credentialsProviderFactory))
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
+ : YqSharedResources(yqSharedResources)
+ , YdbConnection(NewYdbConnection(config, credentialsProviderFactory, YqSharedResources->UserSpaceYdbDriver))
, Config(config)
{
}
@@ -201,8 +205,7 @@ TFuture<TIssues> TStateStorage::Init()
// TODO: list at first?
if (YdbConnection->DB != YdbConnection->TablePathPrefix) {
//LOG_STREAMS_STORAGE_SERVICE_INFO("Creating directory: " << YdbConnection->TablePathPrefix);
- auto schemeClient = NScheme::TSchemeClient(YdbConnection->Driver);
- auto status = schemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
+ auto status = YdbConnection->SchemeClient.MakeDirectory(YdbConnection->TablePathPrefix).GetValueSync();
if (!status.IsSuccess() && status.GetStatus() != EStatus::ALREADY_EXISTS) {
issues = status.GetIssues();
@@ -250,7 +253,7 @@ TFuture<TIssues> TStateStorage::SaveState(
const TCheckpointId& checkpointId,
const NYql::NDqProto::TComputeActorState& state)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, taskId, graphId, checkpointId, state, thisPtr = TIntrusivePtr(this)] (TSession session) {
auto context = MakeIntrusive<TContext>(
prefix,
@@ -289,7 +292,7 @@ TFuture<IStateStorage::TGetStateResult> TStateStorage::GetState(
graphId,
checkpointId);
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[context, thisPtr = TIntrusivePtr(this)] (TSession session) {
context->Session = session;
auto future = thisPtr->SelectState(context);
@@ -314,7 +317,7 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates(
{
auto context = MakeIntrusive<TCountStateContext>();
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, context, thisPtr = TIntrusivePtr(this)] (TSession session) {
// publish nodes
@@ -379,7 +382,7 @@ TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() {
}
TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this)] (TSession session) {
// publish nodes
@@ -419,7 +422,7 @@ TFuture<TIssues> TStateStorage::DeleteCheckpoints(
const TString& graphId,
const TCheckpointId& checkpointUpperBound)
{
- auto future = YdbConnection->Client.RetryOperation(
+ auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
// publish nodes
@@ -556,9 +559,10 @@ TFuture<TStatus> TStateStorage::UpsertState(const TContextPtr& context) {
TStateStoragePtr NewYdbStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources)
{
- return new TStateStorage(config, credentialsProviderFactory);
+ return new TStateStorage(config, credentialsProviderFactory, yqSharedResources);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h
index 61dba973de7..c9e5e73de6c 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.h
@@ -4,6 +4,7 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
namespace NYq {
@@ -11,6 +12,7 @@ namespace NYq {
TStateStoragePtr NewYdbStateStorage(
const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TYqSharedResources::TPtr& yqSharedResources);
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 9ec09009527..160865be648 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -48,7 +48,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YQ_CONTROL_PLANE_STORAGE_PROVIDER));
DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10);
- YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory);
+ YdbConnection = NewYdbConnection(Config.Proto.GetStorage(), CredProviderFactory, YqSharedResources->CoreYdbDriver);
CreateDirectory();
CreateQueriesTable();
CreatePendingSmallTable();
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 22149a78659..ae8c7bf7d23 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -97,7 +97,7 @@ void Init(
}
if (protoConfig.GetCheckpointCoordinator().GetEnabled()) {
- auto checkpointStorage = NYq::NewCheckpointStorageService(protoConfig.GetCheckpointCoordinator(), protoConfig.GetCommon(), credentialsProviderFactory);
+ auto checkpointStorage = NYq::NewCheckpointStorageService(protoConfig.GetCheckpointCoordinator(), protoConfig.GetCommon(), credentialsProviderFactory, yqSharedResources);
actorRegistrator(NYql::NDq::MakeCheckpointStorageID(), checkpointStorage.release());
}
@@ -106,7 +106,7 @@ void Init(
NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
NYql::GetCommonDqFactory(),
- NYql::GetDqYdbFactory(yqSharedResources->YdbDriver),
+ NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver),
NKikimr::NMiniKQL::GetYqlFactory()
});
@@ -132,13 +132,13 @@ void Init(
}
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent);
- RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
- RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory);
+ RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
+ RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory,
httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
RegisterClickHouseReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway);
- RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->YdbDriver, credentialsFactory);
+ RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterDQSolomonWriteActorFactory(*sinkActorFactory, credentialsFactory);
}
diff --git a/ydb/core/yq/libs/shared_resources/shared_resources.cpp b/ydb/core/yq/libs/shared_resources/shared_resources.cpp
index 3dfd566232f..131d67d2bd1 100644
--- a/ydb/core/yq/libs/shared_resources/shared_resources.cpp
+++ b/ydb/core/yq/libs/shared_resources/shared_resources.cpp
@@ -99,7 +99,8 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
}
void Stop() override {
- YdbDriver.Stop(true);
+ CoreYdbDriver.Stop(true);
+ // UserSpaceYdbDriver.Stop(true); // For now it points to the same driver as CoreYdbDriver, so don't call Stop
}
NYdb::TDriverConfig GetYdbDriverConfig(const NYq::NConfig::TYdbDriverConfig& config) {
@@ -122,7 +123,7 @@ struct TYqSharedResourcesImpl : public TActorSystemPtrMixin, public TYqSharedRes
const NYq::NConfig::TDbPoolConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const NMonitoring::TDynamicCounterPtr& counters) {
- DbPoolHolder = MakeIntrusive<NYq::TDbPoolHolder>(config, YdbDriver, credentialsProviderFactory, counters);
+ DbPoolHolder = MakeIntrusive<NYq::TDbPoolHolder>(config, CoreYdbDriver, credentialsProviderFactory, counters);
}
};
@@ -136,7 +137,8 @@ TYqSharedResources::TPtr CreateYqSharedResourcesImpl(
}
TYqSharedResources::TYqSharedResources(NYdb::TDriver driver)
- : YdbDriver(std::move(driver))
+ : CoreYdbDriver(driver)
+ , UserSpaceYdbDriver(std::move(driver))
{
}
diff --git a/ydb/core/yq/libs/shared_resources/shared_resources.h b/ydb/core/yq/libs/shared_resources/shared_resources.h
index bfd1ec66953..bd8e5e3950f 100644
--- a/ydb/core/yq/libs/shared_resources/shared_resources.h
+++ b/ydb/core/yq/libs/shared_resources/shared_resources.h
@@ -16,7 +16,11 @@ struct TYqSharedResources : public IYqSharedResources {
static TPtr Cast(const IYqSharedResources::TPtr& ptr);
// Resources
- NYdb::TDriver YdbDriver;
+
+ // Separated YDB drivers for user queries execution and for YQ core usage.
+ // For now they are actually point to the same driver, but it can be changed in the future.
+ NYdb::TDriver CoreYdbDriver;
+ NYdb::TDriver UserSpaceYdbDriver;
TDbPoolHolder::TPtr DbPoolHolder;
protected:
diff --git a/ydb/core/yq/libs/ydb/create_schema.cpp b/ydb/core/yq/libs/ydb/create_schema.cpp
index 51af951a06f..a0ce2efe221 100644
--- a/ydb/core/yq/libs/ydb/create_schema.cpp
+++ b/ydb/core/yq/libs/ydb/create_schema.cpp
@@ -165,7 +165,7 @@ protected:
}
void CreateSession() {
- Connection->Client.GetSession().Subscribe(
+ Connection->TableClient.GetSession().Subscribe(
[actorId = SelfId(), actorSystem = NActors::TActivationContext::ActorSystem()](const NYdb::NTable::TAsyncCreateSessionResult& result) {
actorSystem->Send(actorId, new TEvPrivate::TEvCreateSessionResult(result.GetValue()));
}
@@ -234,7 +234,6 @@ public:
ui64 cookie)
: TCreateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie)
, DirectoryPath(directoryPath)
- , SchemeClient(Connection->Driver)
{
}
@@ -245,12 +244,11 @@ private:
NYdb::TAsyncStatus CallYdbSdk() override {
SCHEMA_LOG_DEBUG("Call create directory \"" << DirectoryPath << "\"");
- return SchemeClient.MakeDirectory(DirectoryPath);
+ return Connection->SchemeClient.MakeDirectory(DirectoryPath);
}
private:
const TString DirectoryPath;
- NYdb::NScheme::TSchemeClient SchemeClient;
};
NActors::IActor* MakeCreateTableActor(
diff --git a/ydb/core/yq/libs/ydb/ydb.cpp b/ydb/core/yq/libs/ydb/ydb.cpp
index d6096abce32..190efa13df6 100644
--- a/ydb/core/yq/libs/ydb/ydb.cpp
+++ b/ydb/core/yq/libs/ydb/ydb.cpp
@@ -22,6 +22,9 @@ namespace {
////////////////////////////////////////////////////////////////////////////////
+template <class TSettings>
+TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
+
TFuture<TDataQueryResult> SelectGeneration(const TGenerationContextPtr& context) {
// TODO: use prepared queries
@@ -166,29 +169,9 @@ TFuture<TStatus> RegisterGenerationWrapper(
});
}
-} // namespace
-
-////////////////////////////////////////////////////////////////////////////////
-
-TYdbConnection::TYdbConnection(const TDriverConfig& driverConfig,
- const NConfig::TYdbStorageConfig& config)
- : Driver(driverConfig)
- , Client(Driver)
- , DB(config.GetDatabase())
- , TablePathPrefix(JoinPath(DB, config.GetTablePrefix()))
-{
-}
-
-TYdbConnection::~TYdbConnection()
-{
- Driver.Stop(true);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-
-NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
+template <class TSettings>
+TSettings GetClientSettings(const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
TString oauth;
if (config.GetToken()) {
oauth = config.GetToken();
@@ -201,9 +184,10 @@ NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config,
const TString iamEndpoint = config.GetIamEndpoint();
const TString saKeyFile = config.GetSaKeyFile();
- auto driverConfig = TDriverConfig()
- .SetEndpoint(config.GetEndpoint())
- .SetDatabase(config.GetDatabase());
+ TSettings settings;
+ settings
+ .DiscoveryEndpoint(config.GetEndpoint())
+ .Database(config.GetDatabase());
NKikimr::TYdbCredentialsSettings credSettings;
credSettings.UseLocalMetadata = config.GetUseLocalMetadataService();
@@ -211,24 +195,43 @@ NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config,
credSettings.SaKeyFile = config.GetSaKeyFile();
credSettings.IamEndpoint = config.GetIamEndpoint();
- driverConfig.SetCredentialsProviderFactory(credProviderFactory(credSettings));
+ settings.CredentialsProviderFactory(credProviderFactory(credSettings));
if (config.GetUseLocalMetadataService()) {
- driverConfig.UseSecureConnection();
+ settings.EnableSsl(true);
}
if (config.GetCertificateFile()) {
auto cert = StripString(TFileInput(config.GetCertificateFile()).ReadAll());
- driverConfig.UseSecureConnection(cert);
+ settings
+ .EnableSsl(true)
+ .CaCert(cert);
}
- return driverConfig;
+ return settings;
}
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TYdbConnection::TYdbConnection(const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const NYdb::TDriver& driver)
+ : Driver(driver)
+ , TableClient(Driver, GetClientSettings<NYdb::NTable::TClientSettings>(config, credProviderFactory))
+ , SchemeClient(Driver, GetClientSettings<NYdb::TCommonClientSettings>(config, credProviderFactory))
+ , DB(config.GetDatabase())
+ , TablePathPrefix(JoinPath(DB, config.GetTablePrefix()))
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
TYdbConnectionPtr NewYdbConnection(const NConfig::TYdbStorageConfig& config,
- const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory) {
- auto driverConfig = GetDriverConfig(config, credProviderFactory);
- return MakeIntrusive<TYdbConnection>(driverConfig, config);
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const NYdb::TDriver& driver) {
+ return MakeIntrusive<TYdbConnection>(config, credProviderFactory, driver);
}
TStatus MakeErrorStatus(
@@ -266,7 +269,7 @@ TFuture<TStatus> CreateTable(
{
auto tablePath = JoinPath(ydbConnection->TablePathPrefix, name.c_str());
- return ydbConnection->Client.RetryOperation(
+ return ydbConnection->TableClient.RetryOperation(
[tablePath = std::move(tablePath), description = std::move(description)] (TSession session) mutable {
return session.CreateTable(tablePath, TTableDescription(description));
});
diff --git a/ydb/core/yq/libs/ydb/ydb.h b/ydb/core/yq/libs/ydb/ydb.h
index 14c07422883..9675dd81816 100644
--- a/ydb/core/yq/libs/ydb/ydb.h
+++ b/ydb/core/yq/libs/ydb/ydb.h
@@ -4,6 +4,7 @@
#include <ydb/core/yq/libs/config/protos/storage.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
namespace NYq {
@@ -11,15 +12,15 @@ namespace NYq {
struct TYdbConnection : public TThrRefBase {
NYdb::TDriver Driver;
- NYdb::NTable::TTableClient Client;
+ NYdb::NTable::TTableClient TableClient;
+ NYdb::NScheme::TSchemeClient SchemeClient;
const TString DB;
const TString TablePathPrefix;
TYdbConnection(
- const NYdb::TDriverConfig& driverConfig,
- const NConfig::TYdbStorageConfig& config);
-
- ~TYdbConnection();
+ const NConfig::TYdbStorageConfig& config,
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const NYdb::TDriver& driver);
};
using TYdbConnectionPtr = TIntrusivePtr<TYdbConnection>;
@@ -94,9 +95,7 @@ using TGenerationContextPtr = TIntrusivePtr<TGenerationContext>;
////////////////////////////////////////////////////////////////////////////////
-NYdb::TDriverConfig GetDriverConfig(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
-
-TYdbConnectionPtr NewYdbConnection(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory);
+TYdbConnectionPtr NewYdbConnection(const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NYdb::TDriver& driver);
NYdb::TStatus MakeErrorStatus(
NYdb::EStatus code,