aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-06-29 14:39:56 +0300
committertesseract <tesseract@yandex-team.com>2023-06-29 14:39:56 +0300
commitee56ba144faaa6e36e9a6f07cebab55aefbba9f7 (patch)
tree37d62ed43b01bbbda0ab027f7f9ff17920ea50dd
parent52f27a5cf0d3d90dd9332fb8e0ad9868146e8490 (diff)
downloadydb-ee56ba144faaa6e36e9a6f07cebab55aefbba9f7.tar.gz
Extract initialization to ServiceInitializer
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/persqueue.cpp28
-rw-r--r--ydb/services/persqueue_v1/persqueue.h3
-rw-r--r--ydb/services/persqueue_v1/services_initializer.cpp69
-rw-r--r--ydb/services/persqueue_v1/services_initializer.h39
-rw-r--r--ydb/services/persqueue_v1/topic.cpp28
-rw-r--r--ydb/services/persqueue_v1/topic.h3
-rw-r--r--ydb/services/persqueue_v1/ya.make1
11 files changed, 119 insertions, 56 deletions
diff --git a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt
index 60e563ec2ea..32eb38ce846 100644
--- a/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.darwin-x86_64.txt
@@ -40,5 +40,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp
)
diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt
index 35c5bb019ba..dce3c8da1e5 100644
--- a/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.linux-aarch64.txt
@@ -41,5 +41,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp
)
diff --git a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt
index 35c5bb019ba..dce3c8da1e5 100644
--- a/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.linux-x86_64.txt
@@ -41,5 +41,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp
)
diff --git a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt
index 60e563ec2ea..32eb38ce846 100644
--- a/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.windows-x86_64.txt
@@ -40,5 +40,6 @@ target_sources(ydb-services-persqueue_v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/services_initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp
)
diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp
index 9a319d9d785..4e3d5db184b 100644
--- a/ydb/services/persqueue_v1/persqueue.cpp
+++ b/ydb/services/persqueue_v1/persqueue.cpp
@@ -9,14 +9,12 @@
#include "grpc_pq_read.h"
#include "grpc_pq_write.h"
#include "grpc_pq_schema.h"
+#include "services_initializer.h"
namespace NKikimr {
namespace NGRpcService {
namespace V1 {
-static const ui32 PersQueueWriteSessionsMaxCount = 1000000;
-static const ui32 PersQueueReadSessionsMaxCount = 100000;
-
TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed)
: TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>(system, counters, grpcRequestProxy, rlAllowed)
, SchemeCache(schemeCache)
@@ -24,34 +22,14 @@ TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TInt
void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
CQ_ = cq;
- InitNewSchemeCacheActor();
-
- if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
-
- IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, PersQueueWriteSessionsMaxCount);
- TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
- IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, PersQueueReadSessionsMaxCount);
- actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
-
- IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_);
- actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
+ ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute();
+ if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
SetupIncomingRequests(std::move(logger));
}
}
-void TGRpcPersQueueService::InitNewSchemeCacheActor() {
- auto appData = ActorSystem_->AppData<TAppData>();
- auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache");
- auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
- NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
- TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
-}
-
void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/persqueue_v1/persqueue.h b/ydb/services/persqueue_v1/persqueue.h
index cabcaa18f0f..2627c97afc8 100644
--- a/ydb/services/persqueue_v1/persqueue.h
+++ b/ydb/services/persqueue_v1/persqueue.h
@@ -27,10 +27,7 @@ public:
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override;
- void InitNewSchemeCacheActor();
-
NActors::TActorId SchemeCache;
- NActors::TActorId NewSchemeCache;
};
} // namespace V1
diff --git a/ydb/services/persqueue_v1/services_initializer.cpp b/ydb/services/persqueue_v1/services_initializer.cpp
new file mode 100644
index 00000000000..afb18f384bb
--- /dev/null
+++ b/ydb/services/persqueue_v1/services_initializer.cpp
@@ -0,0 +1,69 @@
+#include "services_initializer.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/counters.h>
+#include <ydb/core/tx/scheme_board/cache.h>
+
+#include "grpc_pq_read.h"
+#include "grpc_pq_write.h"
+#include "grpc_pq_schema.h"
+
+namespace NKikimr {
+namespace NGRpcService {
+namespace V1 {
+
+static const ui32 TopicWriteSessionsMaxCount = 1000000;
+static const ui32 TopicReadSessionsMaxCount = 100000;
+
+void ServicesInitializer::Execute() {
+ InitWriteService();
+ InitReadService();
+ InitSchemaService();
+}
+
+bool ServicesInitializer::RequiresServiceRegistration(const TActorId serviceId) {
+ return ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled() && !ActorSystem->LookupLocalService(serviceId);
+}
+
+void ServicesInitializer::RegisterService(const TActorId serviceId, IActor* service) {
+ auto actorId = ActorSystem->Register(service, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+ ActorSystem->RegisterLocalService(serviceId, actorId);
+}
+
+TActorId ServicesInitializer::InitNewSchemeCacheActor() {
+ auto appData = ActorSystem->AppData<TAppData>();
+ auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache");
+ auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
+
+ auto service = CreateSchemeBoardSchemeCache(cacheConfig.Get());
+ return ActorSystem->Register(service, TMailboxType::HTSwap, appData->UserPoolId);
+}
+
+void ServicesInitializer::InitWriteService() {
+ auto serviceId = NGRpcProxy::V1::GetPQWriteServiceActorID();
+ if (RequiresServiceRegistration(serviceId)) {
+ auto service = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, TopicWriteSessionsMaxCount);
+ RegisterService(serviceId, service);
+ }
+}
+
+void ServicesInitializer::InitReadService() {
+ auto serviceId = NGRpcProxy::V1::GetPQReadServiceActorID();
+ if (RequiresServiceRegistration(serviceId)) {
+ auto NewSchemeCache = InitNewSchemeCacheActor();
+ auto service = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, TopicReadSessionsMaxCount);
+ RegisterService(serviceId, service);
+ }
+}
+
+void ServicesInitializer::InitSchemaService() {
+ auto serviceId = NGRpcProxy::V1::GetPQSchemaServiceActorID();
+ if (RequiresServiceRegistration(serviceId)) {
+ auto service = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters);
+ RegisterService(serviceId, service);
+ }
+}
+
+} // namespace V1
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/persqueue_v1/services_initializer.h b/ydb/services/persqueue_v1/services_initializer.h
new file mode 100644
index 00000000000..1ebd74edcda
--- /dev/null
+++ b/ydb/services/persqueue_v1/services_initializer.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+namespace V1 {
+
+class ServicesInitializer {
+public:
+ ServicesInitializer(NActors::TActorSystem* actorSystem,
+ NActors::TActorId schemeCache,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
+ : ActorSystem(actorSystem)
+ , SchemeCache(schemeCache)
+ , Counters(counters) {
+ }
+
+ void Execute();
+
+private:
+ bool RequiresServiceRegistration(const NActors::TActorId serviceId);
+ void RegisterService(const NActors::TActorId serviceId, NActors::IActor* service);
+
+ NActors::TActorId InitNewSchemeCacheActor();
+ void InitWriteService();
+ void InitReadService();
+ void InitSchemaService();
+
+private:
+ NActors::TActorSystem* ActorSystem;
+ NActors::TActorId SchemeCache;
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+};
+
+
+} // namespace V1
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index cb286e2fa62..a74b98de949 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -12,12 +12,10 @@
#include "grpc_pq_read.h"
#include "grpc_pq_write.h"
#include "grpc_pq_schema.h"
+#include "services_initializer.h"
namespace NKikimr::NGRpcService::V1 {
-static const ui32 TopicWriteSessionsMaxCount = 1000000;
-static const ui32 TopicReadSessionsMaxCount = 100000;
-
TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const NActors::TActorId& schemeCache,
@@ -30,34 +28,14 @@ TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system,
void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
CQ_ = cq;
- InitNewSchemeCacheActor();
-
- if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
-
- IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, TopicWriteSessionsMaxCount);
- TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
- IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, TopicReadSessionsMaxCount);
- actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
-
- IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_);
- actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
+ ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute();
+ if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
SetupIncomingRequests(std::move(logger));
}
}
-void TGRpcTopicService::InitNewSchemeCacheActor() {
- auto appData = ActorSystem_->AppData<TAppData>();
- auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache");
- auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
- NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
- TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
-}
-
void TGRpcTopicService::DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &)
{
TActivationContext::AsActorContext().Register(new TUpdateOffsetsInTransactionActor(p.release()));
diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h
index ba26ae6a6c1..6873d11dd9a 100644
--- a/ydb/services/persqueue_v1/topic.h
+++ b/ydb/services/persqueue_v1/topic.h
@@ -26,13 +26,10 @@ public:
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override;
- void InitNewSchemeCacheActor();
-
static void DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpCtx> p,
const IFacilityProvider &);
NActors::TActorId SchemeCache;
- NActors::TActorId NewSchemeCache;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
};
diff --git a/ydb/services/persqueue_v1/ya.make b/ydb/services/persqueue_v1/ya.make
index 49ba698b26a..aeb1026c03a 100644
--- a/ydb/services/persqueue_v1/ya.make
+++ b/ydb/services/persqueue_v1/ya.make
@@ -9,6 +9,7 @@ SRCS(
grpc_pq_write.h
persqueue.cpp
persqueue.h
+ services_initializer.cpp
topic.cpp
)