diff options
author | tesseract <tesseract@yandex-team.com> | 2023-06-29 14:39:56 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-06-29 14:39:56 +0300 |
commit | ee56ba144faaa6e36e9a6f07cebab55aefbba9f7 (patch) | |
tree | 37d62ed43b01bbbda0ab027f7f9ff17920ea50dd | |
parent | 52f27a5cf0d3d90dd9332fb8e0ad9868146e8490 (diff) | |
download | ydb-ee56ba144faaa6e36e9a6f07cebab55aefbba9f7.tar.gz |
Extract initialization to ServiceInitializer
-rw-r--r-- | ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue.cpp | 28 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue.h | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/services_initializer.cpp | 69 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/services_initializer.h | 39 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/topic.cpp | 28 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/topic.h | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ya.make | 1 |
11 files changed, 119 insertions, 56 deletions
diff --git a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt index 60e563ec2ea..32eb38ce846 100644 --- a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt @@ -40,5 +40,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp ) diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt index 35c5bb019ba..dce3c8da1e5 100644 --- a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt @@ -41,5 +41,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp ) diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt index 35c5bb019ba..dce3c8da1e5 100644 --- a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt @@ -41,5 +41,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp ) diff --git a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt index 60e563ec2ea..32eb38ce846 100644 --- a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt @@ -40,5 +40,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp ) diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp index 9a319d9d785..4e3d5db184b 100644 --- a/ydb/services/persqueue_v1/persqueue.cpp +++ b/ydb/services/persqueue_v1/persqueue.cpp @@ -9,14 +9,12 @@ #include "grpc_pq_read.h" #include "grpc_pq_write.h" #include "grpc_pq_schema.h" +#include "services_initializer.h" namespace NKikimr { namespace NGRpcService { namespace V1 { -static const ui32 PersQueueWriteSessionsMaxCount = 1000000; -static const ui32 PersQueueReadSessionsMaxCount = 100000; - TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed) : TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>(system, counters, grpcRequestProxy, rlAllowed) , SchemeCache(schemeCache) @@ -24,34 +22,14 @@ TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TInt void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { CQ_ = cq; - InitNewSchemeCacheActor(); - - if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { - - IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, PersQueueWriteSessionsMaxCount); - TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); - IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, PersQueueReadSessionsMaxCount); - actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); - - IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_); - actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); + ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute(); + if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { SetupIncomingRequests(std::move(logger)); } } -void TGRpcPersQueueService::InitNewSchemeCacheActor() { - auto appData = ActorSystem_->AppData<TAppData>(); - auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache"); - auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); - NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), - TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); -} - void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/persqueue_v1/persqueue.h b/ydb/services/persqueue_v1/persqueue.h index cabcaa18f0f..2627c97afc8 100644 --- a/ydb/services/persqueue_v1/persqueue.h +++ b/ydb/services/persqueue_v1/persqueue.h @@ -27,10 +27,7 @@ public: private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override; - void InitNewSchemeCacheActor(); - NActors::TActorId SchemeCache; - NActors::TActorId NewSchemeCache; }; } // namespace V1 diff --git a/ydb/services/persqueue_v1/services_initializer.cpp b/ydb/services/persqueue_v1/services_initializer.cpp new file mode 100644 index 00000000000..afb18f384bb --- /dev/null +++ b/ydb/services/persqueue_v1/services_initializer.cpp @@ -0,0 +1,69 @@ +#include "services_initializer.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/counters.h> +#include <ydb/core/tx/scheme_board/cache.h> + +#include "grpc_pq_read.h" +#include "grpc_pq_write.h" +#include "grpc_pq_schema.h" + +namespace NKikimr { +namespace NGRpcService { +namespace V1 { + +static const ui32 TopicWriteSessionsMaxCount = 1000000; +static const ui32 TopicReadSessionsMaxCount = 100000; + +void ServicesInitializer::Execute() { + InitWriteService(); + InitReadService(); + InitSchemaService(); +} + +bool ServicesInitializer::RequiresServiceRegistration(const TActorId serviceId) { + return ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled() && !ActorSystem->LookupLocalService(serviceId); +} + +void ServicesInitializer::RegisterService(const TActorId serviceId, IActor* service) { + auto actorId = ActorSystem->Register(service, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); + ActorSystem->RegisterLocalService(serviceId, actorId); +} + +TActorId ServicesInitializer::InitNewSchemeCacheActor() { + auto appData = ActorSystem->AppData<TAppData>(); + auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache"); + auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); + + auto service = CreateSchemeBoardSchemeCache(cacheConfig.Get()); + return ActorSystem->Register(service, TMailboxType::HTSwap, appData->UserPoolId); +} + +void ServicesInitializer::InitWriteService() { + auto serviceId = NGRpcProxy::V1::GetPQWriteServiceActorID(); + if (RequiresServiceRegistration(serviceId)) { + auto service = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, TopicWriteSessionsMaxCount); + RegisterService(serviceId, service); + } +} + +void ServicesInitializer::InitReadService() { + auto serviceId = NGRpcProxy::V1::GetPQReadServiceActorID(); + if (RequiresServiceRegistration(serviceId)) { + auto NewSchemeCache = InitNewSchemeCacheActor(); + auto service = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, TopicReadSessionsMaxCount); + RegisterService(serviceId, service); + } +} + +void ServicesInitializer::InitSchemaService() { + auto serviceId = NGRpcProxy::V1::GetPQSchemaServiceActorID(); + if (RequiresServiceRegistration(serviceId)) { + auto service = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters); + RegisterService(serviceId, service); + } +} + +} // namespace V1 +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/persqueue_v1/services_initializer.h b/ydb/services/persqueue_v1/services_initializer.h new file mode 100644 index 00000000000..1ebd74edcda --- /dev/null +++ b/ydb/services/persqueue_v1/services_initializer.h @@ -0,0 +1,39 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> + +namespace NKikimr { +namespace NGRpcService { +namespace V1 { + +class ServicesInitializer { +public: + ServicesInitializer(NActors::TActorSystem* actorSystem, + NActors::TActorId schemeCache, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) + : ActorSystem(actorSystem) + , SchemeCache(schemeCache) + , Counters(counters) { + } + + void Execute(); + +private: + bool RequiresServiceRegistration(const NActors::TActorId serviceId); + void RegisterService(const NActors::TActorId serviceId, NActors::IActor* service); + + NActors::TActorId InitNewSchemeCacheActor(); + void InitWriteService(); + void InitReadService(); + void InitSchemaService(); + +private: + NActors::TActorSystem* ActorSystem; + NActors::TActorId SchemeCache; + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; +}; + + +} // namespace V1 +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index cb286e2fa62..a74b98de949 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -12,12 +12,10 @@ #include "grpc_pq_read.h" #include "grpc_pq_write.h" #include "grpc_pq_schema.h" +#include "services_initializer.h" namespace NKikimr::NGRpcService::V1 { -static const ui32 TopicWriteSessionsMaxCount = 1000000; -static const ui32 TopicReadSessionsMaxCount = 100000; - TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, @@ -30,34 +28,14 @@ TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { CQ_ = cq; - InitNewSchemeCacheActor(); - - if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { - - IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, TopicWriteSessionsMaxCount); - TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); - IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, TopicReadSessionsMaxCount); - actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); - - IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_); - actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); + ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute(); + if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { SetupIncomingRequests(std::move(logger)); } } -void TGRpcTopicService::InitNewSchemeCacheActor() { - auto appData = ActorSystem_->AppData<TAppData>(); - auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache"); - auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); - NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), - TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); -} - void TGRpcTopicService::DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { TActivationContext::AsActorContext().Register(new TUpdateOffsetsInTransactionActor(p.release())); diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h index ba26ae6a6c1..6873d11dd9a 100644 --- a/ydb/services/persqueue_v1/topic.h +++ b/ydb/services/persqueue_v1/topic.h @@ -26,13 +26,10 @@ public: private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override; - void InitNewSchemeCacheActor(); - static void DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &); NActors::TActorId SchemeCache; - NActors::TActorId NewSchemeCache; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; }; diff --git a/ydb/services/persqueue_v1/ya.make b/ydb/services/persqueue_v1/ya.make index 49ba698b26a..aeb1026c03a 100644 --- a/ydb/services/persqueue_v1/ya.make +++ b/ydb/services/persqueue_v1/ya.make @@ -9,6 +9,7 @@ SRCS( grpc_pq_write.h persqueue.cpp persqueue.h + services_initializer.cpp topic.cpp ) |