diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-03-24 15:30:29 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-03-24 15:30:29 +0300 |
commit | 04156efbfd06d47a7b749159b1a00a756fa5c41b (patch) | |
tree | 11eb26006a9707677bb22f0aa9576a34d712a6f6 | |
parent | 0a35af177414ff232a34dba196112ffaeb7dfb6d (diff) | |
download | ydb-04156efbfd06d47a7b749159b1a00a756fa5c41b.tar.gz |
easy forwarding of PQReadSessionsInfoWorkerFactory
fix
ref:50b6df6cb339e4ffc71f422f8b9614d7376cd9ce
23 files changed, 51 insertions, 125 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h index c48d712dae..d6c0075e62 100644 --- a/ydb/core/base/appdata.h +++ b/ydb/core/base/appdata.h @@ -58,6 +58,10 @@ namespace NSQS { class IAuthFactory; } +namespace NMsgBusProxy { + class IPersQueueGetReadSessionsInfoWorkerFactory; +} + namespace NPQ { class IPersQueueMirrorReaderFactory; } @@ -84,6 +88,7 @@ struct TAppData { IActor*(*FolderServiceFactory)(const NKikimrProto::NFolderService::TFolderServiceConfig&); + const NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory* PersQueueGetReadSessionsInfoWorkerFactory = nullptr; const NPQ::IPersQueueMirrorReaderFactory* PersQueueMirrorReaderFactory = nullptr; NYdb::TDriver* YdbDriver = nullptr; const NPDisk::IIoContextFactory* IoContextFactory = nullptr; diff --git a/ydb/core/client/server/msgbus_server.cpp b/ydb/core/client/server/msgbus_server.cpp index 8a4d8a3bbd..cee4716396 100644 --- a/ydb/core/client/server/msgbus_server.cpp +++ b/ydb/core/client/server/msgbus_server.cpp @@ -438,12 +438,10 @@ protected: TMessageBusServer::TMessageBusServer( const NBus::TBusServerSessionConfig &sessionConfig, NBus::TBusMessageQueue *busQueue, - ui32 bindPort, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + ui32 bindPort ) : SessionConfig(sessionConfig) , BusQueue(busQueue) - , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory) , Protocol(bindPort) {} @@ -599,7 +597,7 @@ void TMessageBusServer::UnknownMessage(TBusMessageContext &msg) { } IActor* TMessageBusServer::CreateProxy() { - return CreateMessageBusServerProxy(this, PQReadSessionsInfoWorkerFactory); + return CreateMessageBusServerProxy(this); } IActor* TMessageBusServer::CreateMessageBusTraceService() { @@ -609,10 +607,9 @@ IActor* TMessageBusServer::CreateMessageBusTraceService() { IMessageBusServer* CreateMsgBusServer( NBus::TBusMessageQueue *queue, const NBus::TBusServerSessionConfig &config, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory, ui32 bindPort ) { - return new TMessageBusServer(config, queue, bindPort, pqReadSessionsInfoWorkerFactory); + return new TMessageBusServer(config, queue, bindPort); } } diff --git a/ydb/core/client/server/msgbus_server.h b/ydb/core/client/server/msgbus_server.h index 3d40fc9001..71212fb5df 100644 --- a/ydb/core/client/server/msgbus_server.h +++ b/ydb/core/client/server/msgbus_server.h @@ -136,7 +136,6 @@ class TMessageBusServer : public IMessageBusServer, public NBus::IBusServerHandl NBus::TBusServerSessionPtr Session; TActorId Proxy; TActorId Monitor; - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; protected: TProtocol Protocol; TActorSystem *ActorSystem = nullptr; @@ -145,8 +144,7 @@ public: TMessageBusServer( const NBus::TBusServerSessionConfig &sessionConfig, NBus::TBusMessageQueue *busQueue, - ui32 bindPort, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + ui32 bindPort ); ~TMessageBusServer(); @@ -272,12 +270,8 @@ void CopyProtobufsByFieldName(TProtoTo& protoTo, const TProtoFrom& protoFrom) { } } -class IPersQueueGetReadSessionsInfoWorkerFactory; -IActor* CreateMessageBusServerProxy( - TMessageBusServer *server, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory -); +IActor* CreateMessageBusServerProxy(TMessageBusServer *server); //IActor* CreateMessageBusDatashardSetConfig(TBusMessageContext &msg); IActor* CreateMessageBusTabletCountersRequest(TBusMessageContext &msg); diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index 347471e787..ac3487369f 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -1391,7 +1391,6 @@ public: template <template <class TImpl, class... TArgs> class TSenderImpl, class... T> IActor* CreatePersQueueRequestProcessor( const NKikimrClient::TPersQueueRequest& request, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory, T&&... constructorParams ) { try { @@ -1415,8 +1414,7 @@ IActor* CreatePersQueueRequestProcessor( return new TSenderImpl<TPersQueueGetPartitionStatusProcessor>(std::forward<T>(constructorParams)...); } else if (meta.HasCmdGetReadSessionsInfo()) { return new TSenderImpl<TPersQueueGetReadSessionsInfoProcessor>( - std::forward<T>(constructorParams)..., - pqReadSessionsInfoWorkerFactory + std::forward<T>(constructorParams)... ); } else { throw std::runtime_error("Not implemented yet"); @@ -1461,13 +1459,11 @@ public: IActor* CreateMessageBusServerPersQueue( TBusMessageContext& msg, - const TActorId& schemeCache, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + const TActorId& schemeCache ) { const NKikimrClient::TPersQueueRequest& request = static_cast<TBusPersQueue*>(msg.GetMessage())->Record; return CreatePersQueueRequestProcessor<TMessageBusServerPersQueue>( request, - pqReadSessionsInfoWorkerFactory, msg, schemeCache ); @@ -1476,12 +1472,10 @@ IActor* CreateMessageBusServerPersQueue( IActor* CreateActorServerPersQueue( const TActorId& parentId, const NKikimrClient::TPersQueueRequest& request, - const TActorId& schemeCache, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + const TActorId& schemeCache ) { return CreatePersQueueRequestProcessor<TReplierToParent>( request, - pqReadSessionsInfoWorkerFactory, parentId, request, schemeCache diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h index 1a915080e0..fe3c38f1b0 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.h +++ b/ydb/core/client/server/msgbus_server_persqueue.h @@ -30,14 +30,12 @@ TProcessingResult ProcessMetaCacheSingleTopicsResponse(const NSchemeCache::TSche // Worker actor creation IActor* CreateMessageBusServerPersQueue( TBusMessageContext& msg, - const TActorId& schemeCache, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory = nullptr + const TActorId& schemeCache ); IActor* CreateActorServerPersQueue( const TActorId& parentId, const NKikimrClient::TPersQueueRequest& request, - const TActorId& schemeCache, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory = nullptr + const TActorId& schemeCache ); diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp index 32f37251cc..11d9153e77 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp @@ -440,11 +440,9 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer( TPersQueueGetReadSessionsInfoProcessor::TPersQueueGetReadSessionsInfoProcessor( const NKikimrClient::TPersQueueRequest& request, - const TActorId& schemeCache, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + const TActorId& schemeCache ) : TPersQueueBaseRequestProcessor(request, schemeCache, true) - , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory) { const auto& cmd = RequestProto->GetMetaRequest().GetCmdGetReadSessionsInfo(); const auto& topics = cmd.GetTopic(); @@ -532,10 +530,12 @@ bool TPersQueueGetReadSessionsInfoTopicWorker::WaitAllPipeEvents(const TActorCon } THolder<IActor> TPersQueueGetReadSessionsInfoProcessor::CreateSessionsSubactor( - const THashMap<TString, TActorId>&& readSessions + const THashMap<TString, TActorId>&& readSessions, + const TActorContext& ctx ) { - if (PQReadSessionsInfoWorkerFactory) { - return PQReadSessionsInfoWorkerFactory->Create(SelfId(), std::move(readSessions), NodesInfo); + auto factory = AppData(ctx)->PersQueueGetReadSessionsInfoWorkerFactory; + if (factory) { + return factory->Create(SelfId(), std::move(readSessions), NodesInfo); } return MakeHolder<TPersQueueGetReadSessionsInfoWorker>(SelfId(), std::move(readSessions), NodesInfo); } diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.h b/ydb/core/client/server/msgbus_server_pq_metarequest.h index 1d56c28dc0..0c98cdde39 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest.h +++ b/ydb/core/client/server/msgbus_server_pq_metarequest.h @@ -133,8 +133,7 @@ class TPersQueueGetReadSessionsInfoProcessor : public TPersQueueBaseRequestProce public: TPersQueueGetReadSessionsInfoProcessor( const NKikimrClient::TPersQueueRequest& request, - const TActorId& schemeCache, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + const TActorId& schemeCache ); bool ReadyForAnswer(const TActorContext& ctx) override { @@ -143,7 +142,7 @@ public: return true; } HasSessionsRequest = true; - auto actorId = ctx.Register(CreateSessionsSubactor(std::move(ReadSessions)).Release()); + auto actorId = ctx.Register(CreateSessionsSubactor(std::move(ReadSessions), ctx).Release()); Children.emplace(actorId, MakeHolder<TPerTopicInfo>()); } return false; @@ -170,9 +169,8 @@ public: private: THolder<IActor> CreateTopicSubactor(const TSchemeEntry& topicEntry, const TString& name) override; - THolder<IActor> CreateSessionsSubactor(const THashMap<TString, TActorId>&& readSessions); + THolder<IActor> CreateSessionsSubactor(const THashMap<TString, TActorId>&& readSessions, const TActorContext& ctx); - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; mutable bool HasSessionsRequest = false; THashMap<TString, TActorId> ReadSessions; }; diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp index 237bba147b..d11372f6f1 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp @@ -228,8 +228,7 @@ protected: Actor = CreateActorServerPersQueue( EdgeActorId, request, - GetMockPQMetaCache().SelfId(), - nullptr + GetMockPQMetaCache().SelfId() ); TestMainActorId = Runtime->Register(Actor); TestActors.insert(TestMainActorId); diff --git a/ydb/core/client/server/msgbus_server_pq_read_session_info.h b/ydb/core/client/server/msgbus_server_pq_read_session_info.h index 03a5b37234..374abbf7e5 100644 --- a/ydb/core/client/server/msgbus_server_pq_read_session_info.h +++ b/ydb/core/client/server/msgbus_server_pq_read_session_info.h @@ -91,7 +91,7 @@ public: const TActorId& parentId, const THashMap<TString, TActorId>& readSessions, std::shared_ptr<const TPersQueueBaseRequestProcessor::TNodesInfo> nodesInfo - ) = 0; + ) const = 0; }; class TPersQueueGetReadSessionsInfoWorkerFactory : public IPersQueueGetReadSessionsInfoWorkerFactory { @@ -100,7 +100,7 @@ public: const TActorId& parentId, const THashMap<TString, TActorId>& readSessions, std::shared_ptr<const TPersQueueBaseRequestProcessor::TNodesInfo> nodesInfo - ) override { + ) const override { return MakeHolder<TPersQueueGetReadSessionsInfoWorker>(parentId, readSessions, nodesInfo); } }; diff --git a/ydb/core/client/server/msgbus_server_proxy.cpp b/ydb/core/client/server/msgbus_server_proxy.cpp index 7f3146d21a..92c6c68b2a 100644 --- a/ydb/core/client/server/msgbus_server_proxy.cpp +++ b/ydb/core/client/server/msgbus_server_proxy.cpp @@ -100,11 +100,8 @@ public: } }; -IActor* CreateMessageBusServerProxy( - TMessageBusServer* server, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory -) { - return new TMessageBusServerProxy(server, pqReadSessionsInfoWorkerFactory); +IActor* CreateMessageBusServerProxy(TMessageBusServer* server) { + return new TMessageBusServerProxy(server); } TBusResponse* ProposeTransactionStatusToResponse(EResponseStatus status, diff --git a/ydb/core/client/server/msgbus_server_proxy.h b/ydb/core/client/server/msgbus_server_proxy.h index cb9f9c19d3..d042698bec 100644 --- a/ydb/core/client/server/msgbus_server_proxy.h +++ b/ydb/core/client/server/msgbus_server_proxy.h @@ -32,7 +32,6 @@ struct TMessageBusDbOpsCounters : TAtomicRefCount<TMessageBusDbOpsCounters> { class TMessageBusServerProxy : public TActorBootstrapped<TMessageBusServerProxy> { TMessageBusServer* const Server; - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; TIntrusivePtr<NMonitoring::TDynamicCounters> SchemeCacheCounters; @@ -61,12 +60,8 @@ public: return NKikimrServices::TActivity::MSGBUS_PROXY_ACTOR; } - TMessageBusServerProxy( - TMessageBusServer* server, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory - ) + TMessageBusServerProxy(TMessageBusServer* server) : Server(server) - , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory) { } diff --git a/ydb/core/client/server/msgbus_server_scheme_request.cpp b/ydb/core/client/server/msgbus_server_scheme_request.cpp index 7df5e262d0..0bb3deb33a 100644 --- a/ydb/core/client/server/msgbus_server_scheme_request.cpp +++ b/ydb/core/client/server/msgbus_server_scheme_request.cpp @@ -218,7 +218,7 @@ void TMessageBusServerProxy::Handle(TEvBusProxy::TEvPersQueue::TPtr& ev, const T ctx.Register(new TMessageBusServerSchemeRequest<TBusPersQueue>(ev->Get()), TMailboxType::HTSwap, AppData()->UserPoolId); return; } - ctx.Register(CreateMessageBusServerPersQueue(msg->MsgContext, PqMetaCache, PQReadSessionsInfoWorkerFactory)); + ctx.Register(CreateMessageBusServerPersQueue(msg->MsgContext, PqMetaCache)); } void TMessageBusServerProxy::Handle(TEvBusProxy::TEvFlatTxRequest::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/client/server/msgbus_server_tracer.cpp b/ydb/core/client/server/msgbus_server_tracer.cpp index ff93f8d1cb..5cfe51305a 100644 --- a/ydb/core/client/server/msgbus_server_tracer.cpp +++ b/ydb/core/client/server/msgbus_server_tracer.cpp @@ -10,10 +10,9 @@ TMessageBusTracingServer::TMessageBusTracingServer( const NBus::TBusServerSessionConfig &sessionConfig, NBus::TBusMessageQueue *busQueue, const TString& tracePath, - ui32 bindPort, - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + ui32 bindPort ) - : NKikimr::NMsgBusProxy::TMessageBusServer(sessionConfig, busQueue, bindPort, pqReadSessionsInfoWorkerFactory) + : NKikimr::NMsgBusProxy::TMessageBusServer(sessionConfig, busQueue, bindPort) , MessageBusTracerActorID(MakeMessageBusTraceServiceID()) , TraceActive(false) , TracePath(tracePath) @@ -243,15 +242,13 @@ IMessageBusServer* CreateMsgBusTracingServer( NBus::TBusMessageQueue *queue, const NBus::TBusServerSessionConfig &config, const TString &tracePath, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory, ui32 bindPort ) { return new NMessageBusTracer::TMessageBusTracingServer( config, queue, tracePath, - bindPort, - pqReadSessionsInfoWorkerFactory + bindPort ); } diff --git a/ydb/core/client/server/msgbus_server_tracer.h b/ydb/core/client/server/msgbus_server_tracer.h index 57a3812f5d..6c4c847530 100644 --- a/ydb/core/client/server/msgbus_server_tracer.h +++ b/ydb/core/client/server/msgbus_server_tracer.h @@ -21,8 +21,7 @@ public: const NBus::TBusServerSessionConfig &sessionConfig, NBus::TBusMessageQueue *busQueue, const TString &tracePath, - ui32 bindPort, - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + ui32 bindPort ); IActor* CreateMessageBusTraceService() override; protected: diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 15a004a5e2..e7b56b6c60 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1524,10 +1524,7 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se { if (!IsServiceInitialized(setup, NMsgBusProxy::CreateMsgBusProxyId()) && Config.HasGRpcConfig() && Config.GetGRpcConfig().GetStartGRpcProxy()) { - IActor * proxy = NMsgBusProxy::CreateMessageBusServerProxy( - nullptr, - Factories ? Factories->PQReadSessionsInfoWorkerFactory : nullptr - ); + IActor * proxy = NMsgBusProxy::CreateMessageBusServerProxy(nullptr); Y_VERIFY(proxy); setup->LocalServices.emplace_back( NMsgBusProxy::CreateMsgBusProxyId(), @@ -1802,7 +1799,7 @@ void TViewerInitializer::InitializeServices( const NKikimr::TAppData* appData ) { using namespace NViewer; - IActor* viewer = CreateViewer(KikimrRunConfig, Factories->PQReadSessionsInfoWorkerFactory); + IActor* viewer = CreateViewer(KikimrRunConfig); SetupPQVirtualHandlers(dynamic_cast<IViewer*>(viewer)); SetupDBVirtualHandlers(dynamic_cast<IViewer*>(viewer)); setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeViewerID(NodeId), diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 5aa6bba9a2..06639fdb15 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -399,10 +399,7 @@ void TKikimrRunner::InitializeControlBoard(const TKikimrRunConfig& runConfig) } } -void TKikimrRunner::InitializeMessageBus( - const TKikimrRunConfig& runConfig, - std::shared_ptr<TModuleFactories> factories -) { +void TKikimrRunner::InitializeMessageBus(const TKikimrRunConfig& runConfig) { if (runConfig.AppConfig.HasMessageBusConfig() && runConfig.AppConfig.GetMessageBusConfig().GetStartBusProxy()) { const auto& msgbusConfig = runConfig.AppConfig.GetMessageBusConfig(); @@ -445,7 +442,6 @@ void TKikimrRunner::InitializeMessageBus( Bus.Get(), ProxyBusSessionConfig, msgbusConfig.GetTracePath(), - factories ? factories->PQReadSessionsInfoWorkerFactory : nullptr, msgbusConfig.GetBusProxyPort()) ); } @@ -453,7 +449,6 @@ void TKikimrRunner::InitializeMessageBus( BusServer.Reset(NMsgBusProxy::CreateMsgBusServer( Bus.Get(), ProxyBusSessionConfig, - factories ? factories->PQReadSessionsInfoWorkerFactory : nullptr, msgbusConfig.GetBusProxyPort() )); } @@ -866,6 +861,7 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) AppData->DataShardExportFactory = ModuleFactories ? ModuleFactories->DataShardExportFactory.get() : nullptr; AppData->SqsEventsWriterFactory = ModuleFactories ? ModuleFactories->SqsEventsWriterFactory.get() : nullptr; AppData->PersQueueMirrorReaderFactory = ModuleFactories ? ModuleFactories->PersQueueMirrorReaderFactory.get() : nullptr; + AppData->PersQueueGetReadSessionsInfoWorkerFactory = ModuleFactories ? ModuleFactories->PQReadSessionsInfoWorkerFactory.get() : nullptr; AppData->IoContextFactory = ModuleFactories ? ModuleFactories->IoContextFactory.get() : nullptr; AppData->SqsAuthFactory = ModuleFactories @@ -1583,7 +1579,7 @@ TIntrusivePtr<TKikimrRunner> TKikimrRunner::CreateKikimrRunner( runner->InitializeRegistries(runConfig); runner->InitializeMonitoring(runConfig); runner->InitializeControlBoard(runConfig); - runner->InitializeMessageBus(runConfig, factories); + runner->InitializeMessageBus(runConfig); runner->InitializeAppData(runConfig); runner->InitializeLogSettings(runConfig); TIntrusivePtr<TServiceInitializersList> sil(runner->CreateServiceInitializersList(runConfig, runConfig.ServicesMask)); diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h index dfd93a24e7..21c7e47d03 100644 --- a/ydb/core/driver_lib/run/run.h +++ b/ydb/core/driver_lib/run/run.h @@ -81,10 +81,7 @@ protected: void InitializeMonitoringLogin(const TKikimrRunConfig& runConfig); - void InitializeMessageBus( - const TKikimrRunConfig& runConfig, - std::shared_ptr<TModuleFactories> factories = nullptr - ); + void InitializeMessageBus(const TKikimrRunConfig& runConfig); void InitializeGRpc(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 4dcaa93f4e..9623365974 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -219,6 +219,7 @@ namespace Tests { Runtime->GetAppData(nodeIdx).StreamingConfig.MergeFrom(Settings->AppConfig.GetGRpcConfig().GetStreamingConfig()); Runtime->GetAppData(nodeIdx).EnforceUserTokenRequirement = Settings->AppConfig.GetDomainsConfig().GetSecurityConfig().GetEnforceUserTokenRequirement(); Runtime->GetAppData(nodeIdx).DomainsConfig.MergeFrom(Settings->AppConfig.GetDomainsConfig()); + Runtime->GetAppData(nodeIdx).PersQueueGetReadSessionsInfoWorkerFactory = Settings->PersQueueGetReadSessionsInfoWorkerFactory.get(); SetupConfigurators(nodeIdx); SetupProxies(nodeIdx); } @@ -232,14 +233,12 @@ namespace Tests { Bus.Get(), BusServerSessionConfig, tracePath, - Settings->PersQueueGetReadSessionsInfoWorkerFactory, port )); } else { BusServer.Reset(NMsgBusProxy::CreateMsgBusServer( Bus.Get(), BusServerSessionConfig, - Settings->PersQueueGetReadSessionsInfoWorkerFactory, port )); } diff --git a/ydb/core/viewer/json_pqconsumerinfo.h b/ydb/core/viewer/json_pqconsumerinfo.h index 5ccabdb378..b7526edf7d 100644 --- a/ydb/core/viewer/json_pqconsumerinfo.h +++ b/ydb/core/viewer/json_pqconsumerinfo.h @@ -19,7 +19,6 @@ class TJsonPQConsumerInfo : public TActorBootstrapped<TJsonPQConsumerInfo> { using TBase = TActorBootstrapped<TJsonPQConsumerInfo>; IViewer* Viewer; NMon::TEvHttpInfo::TPtr Event; - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; NKikimrClient::TResponse Result; TJsonSettings JsonSettings; TString Topic; @@ -37,12 +36,10 @@ public: TJsonPQConsumerInfo( IViewer* viewer, - NMon::TEvHttpInfo::TPtr& ev, - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory = nullptr + NMon::TEvHttpInfo::TPtr& ev ) : Viewer(viewer) , Event(ev) - , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory) {} void Bootstrap(const TActorContext& ctx) { @@ -71,8 +68,7 @@ public: ctx.Register(NMsgBusProxy::CreateActorServerPersQueue( ctx.SelfID, request, - NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), - PQReadSessionsInfoWorkerFactory + NMsgBusProxy::CreatePersQueueMetaCacheV2Id() )); ++Requests; } @@ -83,8 +79,7 @@ public: ctx.Register(NMsgBusProxy::CreateActorServerPersQueue( ctx.SelfID, request, - NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), - PQReadSessionsInfoWorkerFactory + NMsgBusProxy::CreatePersQueueMetaCacheV2Id() )); ++Requests; } diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp index f70f84276a..796fbc069d 100644 --- a/ydb/core/viewer/viewer.cpp +++ b/ydb/core/viewer/viewer.cpp @@ -92,23 +92,6 @@ public: } }; -template <typename ActorRequestType> -class TPersQueueJsonHandler : public TJsonHandler<ActorRequestType> { -public: - TPersQueueJsonHandler( - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory - ) - : PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory) - {} - - IActor* CreateRequestActor(IViewer* viewer, NMon::TEvHttpInfo::TPtr& event) override { - return new ActorRequestType(viewer, event, PQReadSessionsInfoWorkerFactory); - } - -private: - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; -}; - void SetupPQVirtualHandlers(IViewer* viewer) { viewer->RegisterVirtualHandler( NKikimrViewer::EObjectType::Root, @@ -146,12 +129,8 @@ public: return NKikimrServices::TActivity::TABLET_MONITORING_PROXY; } - TViewer( - const TKikimrRunConfig &kikimrRunConfig, - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory - ) + TViewer(const TKikimrRunConfig &kikimrRunConfig) : KikimrRunConfig(kikimrRunConfig) - , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory) {} void Bootstrap(const TActorContext &ctx) { @@ -212,9 +191,7 @@ public: JsonHandlers["/json/config"] = new TJsonHandler<TJsonConfig>; JsonHandlers["/json/counters"] = new TJsonHandler<TJsonCounters>; JsonHandlers["/json/topicinfo"] = new TJsonHandler<TJsonTopicInfo>; - JsonHandlers["/json/pqconsumerinfo"] = new TPersQueueJsonHandler<TJsonPQConsumerInfo>( - PQReadSessionsInfoWorkerFactory - ); + JsonHandlers["/json/pqconsumerinfo"] = new TJsonHandler<TJsonPQConsumerInfo>(); JsonHandlers["/json/tabletcounters"] = new TJsonHandler<TJsonTabletCounters>; JsonHandlers["/json/storage"] = new TJsonHandler<TJsonStorage>; JsonHandlers["/json/metainfo"] = new TJsonHandler<TJsonMetaInfo>; @@ -272,7 +249,6 @@ public: private: THashMap<TString, TAutoPtr<TJsonHandlerBase>> JsonHandlers; const TKikimrRunConfig KikimrRunConfig; - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; std::unordered_multimap<NKikimrViewer::EObjectType, TVirtualHandler> VirtualHandlersByParentType; std::unordered_map<NKikimrViewer::EObjectType, TContentHandler> ContentHandlers; TString AllowOrigin; @@ -531,11 +507,10 @@ TString IViewer::TContentRequestContext::Dump() const ui32 CurrentMonitoringPort = 8765; IActor* CreateViewer( - const TKikimrRunConfig &kikimrRunConfig, - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory + const TKikimrRunConfig &kikimrRunConfig ) { CurrentMonitoringPort = kikimrRunConfig.AppConfig.GetMonitoringConfig().GetMonitoringPort(); - return new TViewer(kikimrRunConfig, pqReadSessionsInfoWorkerFactory); + return new TViewer(kikimrRunConfig); } TString TViewer::GetHTTPOKJSON(const NMon::TEvHttpInfo* request) { diff --git a/ydb/core/viewer/viewer.h b/ydb/core/viewer/viewer.h index f5544c78f8..5b14a9e4a5 100644 --- a/ydb/core/viewer/viewer.h +++ b/ydb/core/viewer/viewer.h @@ -18,10 +18,7 @@ inline TActorId MakeViewerID(ui32 node = 0) { return TActorId(node, TStringBuf(x, 12)); } -IActor* CreateViewer( - const TKikimrRunConfig &kikimrRunConfig, - std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory -); +IActor* CreateViewer(const TKikimrRunConfig &kikimrRunConfig); class IViewer { public: diff --git a/ydb/public/lib/base/msgbus.h b/ydb/public/lib/base/msgbus.h index f11bd9771f..3662bed19a 100644 --- a/ydb/public/lib/base/msgbus.h +++ b/ydb/public/lib/base/msgbus.h @@ -263,14 +263,12 @@ class IPersQueueGetReadSessionsInfoWorkerFactory; IMessageBusServer* CreateMsgBusServer( NBus::TBusMessageQueue *queue, const NBus::TBusServerSessionConfig &config, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory, ui32 bindPort = TProtocol::DefaultPort ); IMessageBusServer* CreateMsgBusTracingServer( NBus::TBusMessageQueue *queue, const NBus::TBusServerSessionConfig &config, const TString &tracePath, - std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory, ui32 bindPort = TProtocol::DefaultPort ); diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index ba6b9fe5ef..3e46415a89 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -2907,8 +2907,7 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorC ctx.Register(NMsgBusProxy::CreateActorServerPersQueue( ctx.SelfID, proto, - SchemeCache, - std::make_shared<NMsgBusProxy::TPersQueueGetReadSessionsInfoWorkerFactory>() + SchemeCache )); } |