aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-03-24 15:30:29 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-03-24 15:30:29 +0300
commit04156efbfd06d47a7b749159b1a00a756fa5c41b (patch)
tree11eb26006a9707677bb22f0aa9576a34d712a6f6
parent0a35af177414ff232a34dba196112ffaeb7dfb6d (diff)
downloadydb-04156efbfd06d47a7b749159b1a00a756fa5c41b.tar.gz
easy forwarding of PQReadSessionsInfoWorkerFactory
fix ref:50b6df6cb339e4ffc71f422f8b9614d7376cd9ce
-rw-r--r--ydb/core/base/appdata.h5
-rw-r--r--ydb/core/client/server/msgbus_server.cpp9
-rw-r--r--ydb/core/client/server/msgbus_server.h10
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp12
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.h6
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest.cpp12
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest.h8
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp3
-rw-r--r--ydb/core/client/server/msgbus_server_pq_read_session_info.h4
-rw-r--r--ydb/core/client/server/msgbus_server_proxy.cpp7
-rw-r--r--ydb/core/client/server/msgbus_server_proxy.h7
-rw-r--r--ydb/core/client/server/msgbus_server_scheme_request.cpp2
-rw-r--r--ydb/core/client/server/msgbus_server_tracer.cpp9
-rw-r--r--ydb/core/client/server/msgbus_server_tracer.h3
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp7
-rw-r--r--ydb/core/driver_lib/run/run.cpp10
-rw-r--r--ydb/core/driver_lib/run/run.h5
-rw-r--r--ydb/core/testlib/test_client.cpp3
-rw-r--r--ydb/core/viewer/json_pqconsumerinfo.h11
-rw-r--r--ydb/core/viewer/viewer.cpp33
-rw-r--r--ydb/core/viewer/viewer.h5
-rw-r--r--ydb/public/lib/base/msgbus.h2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp3
23 files changed, 51 insertions, 125 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index c48d712dae..d6c0075e62 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -58,6 +58,10 @@ namespace NSQS {
class IAuthFactory;
}
+namespace NMsgBusProxy {
+ class IPersQueueGetReadSessionsInfoWorkerFactory;
+}
+
namespace NPQ {
class IPersQueueMirrorReaderFactory;
}
@@ -84,6 +88,7 @@ struct TAppData {
IActor*(*FolderServiceFactory)(const NKikimrProto::NFolderService::TFolderServiceConfig&);
+ const NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory* PersQueueGetReadSessionsInfoWorkerFactory = nullptr;
const NPQ::IPersQueueMirrorReaderFactory* PersQueueMirrorReaderFactory = nullptr;
NYdb::TDriver* YdbDriver = nullptr;
const NPDisk::IIoContextFactory* IoContextFactory = nullptr;
diff --git a/ydb/core/client/server/msgbus_server.cpp b/ydb/core/client/server/msgbus_server.cpp
index 8a4d8a3bbd..cee4716396 100644
--- a/ydb/core/client/server/msgbus_server.cpp
+++ b/ydb/core/client/server/msgbus_server.cpp
@@ -438,12 +438,10 @@ protected:
TMessageBusServer::TMessageBusServer(
const NBus::TBusServerSessionConfig &sessionConfig,
NBus::TBusMessageQueue *busQueue,
- ui32 bindPort,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ ui32 bindPort
)
: SessionConfig(sessionConfig)
, BusQueue(busQueue)
- , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory)
, Protocol(bindPort)
{}
@@ -599,7 +597,7 @@ void TMessageBusServer::UnknownMessage(TBusMessageContext &msg) {
}
IActor* TMessageBusServer::CreateProxy() {
- return CreateMessageBusServerProxy(this, PQReadSessionsInfoWorkerFactory);
+ return CreateMessageBusServerProxy(this);
}
IActor* TMessageBusServer::CreateMessageBusTraceService() {
@@ -609,10 +607,9 @@ IActor* TMessageBusServer::CreateMessageBusTraceService() {
IMessageBusServer* CreateMsgBusServer(
NBus::TBusMessageQueue *queue,
const NBus::TBusServerSessionConfig &config,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory,
ui32 bindPort
) {
- return new TMessageBusServer(config, queue, bindPort, pqReadSessionsInfoWorkerFactory);
+ return new TMessageBusServer(config, queue, bindPort);
}
}
diff --git a/ydb/core/client/server/msgbus_server.h b/ydb/core/client/server/msgbus_server.h
index 3d40fc9001..71212fb5df 100644
--- a/ydb/core/client/server/msgbus_server.h
+++ b/ydb/core/client/server/msgbus_server.h
@@ -136,7 +136,6 @@ class TMessageBusServer : public IMessageBusServer, public NBus::IBusServerHandl
NBus::TBusServerSessionPtr Session;
TActorId Proxy;
TActorId Monitor;
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
protected:
TProtocol Protocol;
TActorSystem *ActorSystem = nullptr;
@@ -145,8 +144,7 @@ public:
TMessageBusServer(
const NBus::TBusServerSessionConfig &sessionConfig,
NBus::TBusMessageQueue *busQueue,
- ui32 bindPort,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ ui32 bindPort
);
~TMessageBusServer();
@@ -272,12 +270,8 @@ void CopyProtobufsByFieldName(TProtoTo& protoTo, const TProtoFrom& protoFrom) {
}
}
-class IPersQueueGetReadSessionsInfoWorkerFactory;
-IActor* CreateMessageBusServerProxy(
- TMessageBusServer *server,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
-);
+IActor* CreateMessageBusServerProxy(TMessageBusServer *server);
//IActor* CreateMessageBusDatashardSetConfig(TBusMessageContext &msg);
IActor* CreateMessageBusTabletCountersRequest(TBusMessageContext &msg);
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index 347471e787..ac3487369f 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -1391,7 +1391,6 @@ public:
template <template <class TImpl, class... TArgs> class TSenderImpl, class... T>
IActor* CreatePersQueueRequestProcessor(
const NKikimrClient::TPersQueueRequest& request,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory,
T&&... constructorParams
) {
try {
@@ -1415,8 +1414,7 @@ IActor* CreatePersQueueRequestProcessor(
return new TSenderImpl<TPersQueueGetPartitionStatusProcessor>(std::forward<T>(constructorParams)...);
} else if (meta.HasCmdGetReadSessionsInfo()) {
return new TSenderImpl<TPersQueueGetReadSessionsInfoProcessor>(
- std::forward<T>(constructorParams)...,
- pqReadSessionsInfoWorkerFactory
+ std::forward<T>(constructorParams)...
);
} else {
throw std::runtime_error("Not implemented yet");
@@ -1461,13 +1459,11 @@ public:
IActor* CreateMessageBusServerPersQueue(
TBusMessageContext& msg,
- const TActorId& schemeCache,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ const TActorId& schemeCache
) {
const NKikimrClient::TPersQueueRequest& request = static_cast<TBusPersQueue*>(msg.GetMessage())->Record;
return CreatePersQueueRequestProcessor<TMessageBusServerPersQueue>(
request,
- pqReadSessionsInfoWorkerFactory,
msg,
schemeCache
);
@@ -1476,12 +1472,10 @@ IActor* CreateMessageBusServerPersQueue(
IActor* CreateActorServerPersQueue(
const TActorId& parentId,
const NKikimrClient::TPersQueueRequest& request,
- const TActorId& schemeCache,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ const TActorId& schemeCache
) {
return CreatePersQueueRequestProcessor<TReplierToParent>(
request,
- pqReadSessionsInfoWorkerFactory,
parentId,
request,
schemeCache
diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h
index 1a915080e0..fe3c38f1b0 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.h
+++ b/ydb/core/client/server/msgbus_server_persqueue.h
@@ -30,14 +30,12 @@ TProcessingResult ProcessMetaCacheSingleTopicsResponse(const NSchemeCache::TSche
// Worker actor creation
IActor* CreateMessageBusServerPersQueue(
TBusMessageContext& msg,
- const TActorId& schemeCache,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory = nullptr
+ const TActorId& schemeCache
);
IActor* CreateActorServerPersQueue(
const TActorId& parentId,
const NKikimrClient::TPersQueueRequest& request,
- const TActorId& schemeCache,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory = nullptr
+ const TActorId& schemeCache
);
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
index 32f37251cc..11d9153e77 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
@@ -440,11 +440,9 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer(
TPersQueueGetReadSessionsInfoProcessor::TPersQueueGetReadSessionsInfoProcessor(
const NKikimrClient::TPersQueueRequest& request,
- const TActorId& schemeCache,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ const TActorId& schemeCache
)
: TPersQueueBaseRequestProcessor(request, schemeCache, true)
- , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory)
{
const auto& cmd = RequestProto->GetMetaRequest().GetCmdGetReadSessionsInfo();
const auto& topics = cmd.GetTopic();
@@ -532,10 +530,12 @@ bool TPersQueueGetReadSessionsInfoTopicWorker::WaitAllPipeEvents(const TActorCon
}
THolder<IActor> TPersQueueGetReadSessionsInfoProcessor::CreateSessionsSubactor(
- const THashMap<TString, TActorId>&& readSessions
+ const THashMap<TString, TActorId>&& readSessions,
+ const TActorContext& ctx
) {
- if (PQReadSessionsInfoWorkerFactory) {
- return PQReadSessionsInfoWorkerFactory->Create(SelfId(), std::move(readSessions), NodesInfo);
+ auto factory = AppData(ctx)->PersQueueGetReadSessionsInfoWorkerFactory;
+ if (factory) {
+ return factory->Create(SelfId(), std::move(readSessions), NodesInfo);
}
return MakeHolder<TPersQueueGetReadSessionsInfoWorker>(SelfId(), std::move(readSessions), NodesInfo);
}
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.h b/ydb/core/client/server/msgbus_server_pq_metarequest.h
index 1d56c28dc0..0c98cdde39 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest.h
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest.h
@@ -133,8 +133,7 @@ class TPersQueueGetReadSessionsInfoProcessor : public TPersQueueBaseRequestProce
public:
TPersQueueGetReadSessionsInfoProcessor(
const NKikimrClient::TPersQueueRequest& request,
- const TActorId& schemeCache,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ const TActorId& schemeCache
);
bool ReadyForAnswer(const TActorContext& ctx) override {
@@ -143,7 +142,7 @@ public:
return true;
}
HasSessionsRequest = true;
- auto actorId = ctx.Register(CreateSessionsSubactor(std::move(ReadSessions)).Release());
+ auto actorId = ctx.Register(CreateSessionsSubactor(std::move(ReadSessions), ctx).Release());
Children.emplace(actorId, MakeHolder<TPerTopicInfo>());
}
return false;
@@ -170,9 +169,8 @@ public:
private:
THolder<IActor> CreateTopicSubactor(const TSchemeEntry& topicEntry, const TString& name) override;
- THolder<IActor> CreateSessionsSubactor(const THashMap<TString, TActorId>&& readSessions);
+ THolder<IActor> CreateSessionsSubactor(const THashMap<TString, TActorId>&& readSessions, const TActorContext& ctx);
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
mutable bool HasSessionsRequest = false;
THashMap<TString, TActorId> ReadSessions;
};
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
index 237bba147b..d11372f6f1 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
@@ -228,8 +228,7 @@ protected:
Actor = CreateActorServerPersQueue(
EdgeActorId,
request,
- GetMockPQMetaCache().SelfId(),
- nullptr
+ GetMockPQMetaCache().SelfId()
);
TestMainActorId = Runtime->Register(Actor);
TestActors.insert(TestMainActorId);
diff --git a/ydb/core/client/server/msgbus_server_pq_read_session_info.h b/ydb/core/client/server/msgbus_server_pq_read_session_info.h
index 03a5b37234..374abbf7e5 100644
--- a/ydb/core/client/server/msgbus_server_pq_read_session_info.h
+++ b/ydb/core/client/server/msgbus_server_pq_read_session_info.h
@@ -91,7 +91,7 @@ public:
const TActorId& parentId,
const THashMap<TString, TActorId>& readSessions,
std::shared_ptr<const TPersQueueBaseRequestProcessor::TNodesInfo> nodesInfo
- ) = 0;
+ ) const = 0;
};
class TPersQueueGetReadSessionsInfoWorkerFactory : public IPersQueueGetReadSessionsInfoWorkerFactory {
@@ -100,7 +100,7 @@ public:
const TActorId& parentId,
const THashMap<TString, TActorId>& readSessions,
std::shared_ptr<const TPersQueueBaseRequestProcessor::TNodesInfo> nodesInfo
- ) override {
+ ) const override {
return MakeHolder<TPersQueueGetReadSessionsInfoWorker>(parentId, readSessions, nodesInfo);
}
};
diff --git a/ydb/core/client/server/msgbus_server_proxy.cpp b/ydb/core/client/server/msgbus_server_proxy.cpp
index 7f3146d21a..92c6c68b2a 100644
--- a/ydb/core/client/server/msgbus_server_proxy.cpp
+++ b/ydb/core/client/server/msgbus_server_proxy.cpp
@@ -100,11 +100,8 @@ public:
}
};
-IActor* CreateMessageBusServerProxy(
- TMessageBusServer* server,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
-) {
- return new TMessageBusServerProxy(server, pqReadSessionsInfoWorkerFactory);
+IActor* CreateMessageBusServerProxy(TMessageBusServer* server) {
+ return new TMessageBusServerProxy(server);
}
TBusResponse* ProposeTransactionStatusToResponse(EResponseStatus status,
diff --git a/ydb/core/client/server/msgbus_server_proxy.h b/ydb/core/client/server/msgbus_server_proxy.h
index cb9f9c19d3..d042698bec 100644
--- a/ydb/core/client/server/msgbus_server_proxy.h
+++ b/ydb/core/client/server/msgbus_server_proxy.h
@@ -32,7 +32,6 @@ struct TMessageBusDbOpsCounters : TAtomicRefCount<TMessageBusDbOpsCounters> {
class TMessageBusServerProxy : public TActorBootstrapped<TMessageBusServerProxy> {
TMessageBusServer* const Server;
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
TIntrusivePtr<NMonitoring::TDynamicCounters> SchemeCacheCounters;
@@ -61,12 +60,8 @@ public:
return NKikimrServices::TActivity::MSGBUS_PROXY_ACTOR;
}
- TMessageBusServerProxy(
- TMessageBusServer* server,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
- )
+ TMessageBusServerProxy(TMessageBusServer* server)
: Server(server)
- , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory)
{
}
diff --git a/ydb/core/client/server/msgbus_server_scheme_request.cpp b/ydb/core/client/server/msgbus_server_scheme_request.cpp
index 7df5e262d0..0bb3deb33a 100644
--- a/ydb/core/client/server/msgbus_server_scheme_request.cpp
+++ b/ydb/core/client/server/msgbus_server_scheme_request.cpp
@@ -218,7 +218,7 @@ void TMessageBusServerProxy::Handle(TEvBusProxy::TEvPersQueue::TPtr& ev, const T
ctx.Register(new TMessageBusServerSchemeRequest<TBusPersQueue>(ev->Get()), TMailboxType::HTSwap, AppData()->UserPoolId);
return;
}
- ctx.Register(CreateMessageBusServerPersQueue(msg->MsgContext, PqMetaCache, PQReadSessionsInfoWorkerFactory));
+ ctx.Register(CreateMessageBusServerPersQueue(msg->MsgContext, PqMetaCache));
}
void TMessageBusServerProxy::Handle(TEvBusProxy::TEvFlatTxRequest::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/client/server/msgbus_server_tracer.cpp b/ydb/core/client/server/msgbus_server_tracer.cpp
index ff93f8d1cb..5cfe51305a 100644
--- a/ydb/core/client/server/msgbus_server_tracer.cpp
+++ b/ydb/core/client/server/msgbus_server_tracer.cpp
@@ -10,10 +10,9 @@ TMessageBusTracingServer::TMessageBusTracingServer(
const NBus::TBusServerSessionConfig &sessionConfig,
NBus::TBusMessageQueue *busQueue,
const TString& tracePath,
- ui32 bindPort,
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ ui32 bindPort
)
- : NKikimr::NMsgBusProxy::TMessageBusServer(sessionConfig, busQueue, bindPort, pqReadSessionsInfoWorkerFactory)
+ : NKikimr::NMsgBusProxy::TMessageBusServer(sessionConfig, busQueue, bindPort)
, MessageBusTracerActorID(MakeMessageBusTraceServiceID())
, TraceActive(false)
, TracePath(tracePath)
@@ -243,15 +242,13 @@ IMessageBusServer* CreateMsgBusTracingServer(
NBus::TBusMessageQueue *queue,
const NBus::TBusServerSessionConfig &config,
const TString &tracePath,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory,
ui32 bindPort
) {
return new NMessageBusTracer::TMessageBusTracingServer(
config,
queue,
tracePath,
- bindPort,
- pqReadSessionsInfoWorkerFactory
+ bindPort
);
}
diff --git a/ydb/core/client/server/msgbus_server_tracer.h b/ydb/core/client/server/msgbus_server_tracer.h
index 57a3812f5d..6c4c847530 100644
--- a/ydb/core/client/server/msgbus_server_tracer.h
+++ b/ydb/core/client/server/msgbus_server_tracer.h
@@ -21,8 +21,7 @@ public:
const NBus::TBusServerSessionConfig &sessionConfig,
NBus::TBusMessageQueue *busQueue,
const TString &tracePath,
- ui32 bindPort,
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ ui32 bindPort
);
IActor* CreateMessageBusTraceService() override;
protected:
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 15a004a5e2..e7b56b6c60 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1524,10 +1524,7 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
{
if (!IsServiceInitialized(setup, NMsgBusProxy::CreateMsgBusProxyId())
&& Config.HasGRpcConfig() && Config.GetGRpcConfig().GetStartGRpcProxy()) {
- IActor * proxy = NMsgBusProxy::CreateMessageBusServerProxy(
- nullptr,
- Factories ? Factories->PQReadSessionsInfoWorkerFactory : nullptr
- );
+ IActor * proxy = NMsgBusProxy::CreateMessageBusServerProxy(nullptr);
Y_VERIFY(proxy);
setup->LocalServices.emplace_back(
NMsgBusProxy::CreateMsgBusProxyId(),
@@ -1802,7 +1799,7 @@ void TViewerInitializer::InitializeServices(
const NKikimr::TAppData* appData
) {
using namespace NViewer;
- IActor* viewer = CreateViewer(KikimrRunConfig, Factories->PQReadSessionsInfoWorkerFactory);
+ IActor* viewer = CreateViewer(KikimrRunConfig);
SetupPQVirtualHandlers(dynamic_cast<IViewer*>(viewer));
SetupDBVirtualHandlers(dynamic_cast<IViewer*>(viewer));
setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeViewerID(NodeId),
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 5aa6bba9a2..06639fdb15 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -399,10 +399,7 @@ void TKikimrRunner::InitializeControlBoard(const TKikimrRunConfig& runConfig)
}
}
-void TKikimrRunner::InitializeMessageBus(
- const TKikimrRunConfig& runConfig,
- std::shared_ptr<TModuleFactories> factories
-) {
+void TKikimrRunner::InitializeMessageBus(const TKikimrRunConfig& runConfig) {
if (runConfig.AppConfig.HasMessageBusConfig() && runConfig.AppConfig.GetMessageBusConfig().GetStartBusProxy()) {
const auto& msgbusConfig = runConfig.AppConfig.GetMessageBusConfig();
@@ -445,7 +442,6 @@ void TKikimrRunner::InitializeMessageBus(
Bus.Get(),
ProxyBusSessionConfig,
msgbusConfig.GetTracePath(),
- factories ? factories->PQReadSessionsInfoWorkerFactory : nullptr,
msgbusConfig.GetBusProxyPort())
);
}
@@ -453,7 +449,6 @@ void TKikimrRunner::InitializeMessageBus(
BusServer.Reset(NMsgBusProxy::CreateMsgBusServer(
Bus.Get(),
ProxyBusSessionConfig,
- factories ? factories->PQReadSessionsInfoWorkerFactory : nullptr,
msgbusConfig.GetBusProxyPort()
));
}
@@ -866,6 +861,7 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
AppData->DataShardExportFactory = ModuleFactories ? ModuleFactories->DataShardExportFactory.get() : nullptr;
AppData->SqsEventsWriterFactory = ModuleFactories ? ModuleFactories->SqsEventsWriterFactory.get() : nullptr;
AppData->PersQueueMirrorReaderFactory = ModuleFactories ? ModuleFactories->PersQueueMirrorReaderFactory.get() : nullptr;
+ AppData->PersQueueGetReadSessionsInfoWorkerFactory = ModuleFactories ? ModuleFactories->PQReadSessionsInfoWorkerFactory.get() : nullptr;
AppData->IoContextFactory = ModuleFactories ? ModuleFactories->IoContextFactory.get() : nullptr;
AppData->SqsAuthFactory = ModuleFactories
@@ -1583,7 +1579,7 @@ TIntrusivePtr<TKikimrRunner> TKikimrRunner::CreateKikimrRunner(
runner->InitializeRegistries(runConfig);
runner->InitializeMonitoring(runConfig);
runner->InitializeControlBoard(runConfig);
- runner->InitializeMessageBus(runConfig, factories);
+ runner->InitializeMessageBus(runConfig);
runner->InitializeAppData(runConfig);
runner->InitializeLogSettings(runConfig);
TIntrusivePtr<TServiceInitializersList> sil(runner->CreateServiceInitializersList(runConfig, runConfig.ServicesMask));
diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h
index dfd93a24e7..21c7e47d03 100644
--- a/ydb/core/driver_lib/run/run.h
+++ b/ydb/core/driver_lib/run/run.h
@@ -81,10 +81,7 @@ protected:
void InitializeMonitoringLogin(const TKikimrRunConfig& runConfig);
- void InitializeMessageBus(
- const TKikimrRunConfig& runConfig,
- std::shared_ptr<TModuleFactories> factories = nullptr
- );
+ void InitializeMessageBus(const TKikimrRunConfig& runConfig);
void InitializeGRpc(const TKikimrRunConfig& runConfig);
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 4dcaa93f4e..9623365974 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -219,6 +219,7 @@ namespace Tests {
Runtime->GetAppData(nodeIdx).StreamingConfig.MergeFrom(Settings->AppConfig.GetGRpcConfig().GetStreamingConfig());
Runtime->GetAppData(nodeIdx).EnforceUserTokenRequirement = Settings->AppConfig.GetDomainsConfig().GetSecurityConfig().GetEnforceUserTokenRequirement();
Runtime->GetAppData(nodeIdx).DomainsConfig.MergeFrom(Settings->AppConfig.GetDomainsConfig());
+ Runtime->GetAppData(nodeIdx).PersQueueGetReadSessionsInfoWorkerFactory = Settings->PersQueueGetReadSessionsInfoWorkerFactory.get();
SetupConfigurators(nodeIdx);
SetupProxies(nodeIdx);
}
@@ -232,14 +233,12 @@ namespace Tests {
Bus.Get(),
BusServerSessionConfig,
tracePath,
- Settings->PersQueueGetReadSessionsInfoWorkerFactory,
port
));
} else {
BusServer.Reset(NMsgBusProxy::CreateMsgBusServer(
Bus.Get(),
BusServerSessionConfig,
- Settings->PersQueueGetReadSessionsInfoWorkerFactory,
port
));
}
diff --git a/ydb/core/viewer/json_pqconsumerinfo.h b/ydb/core/viewer/json_pqconsumerinfo.h
index 5ccabdb378..b7526edf7d 100644
--- a/ydb/core/viewer/json_pqconsumerinfo.h
+++ b/ydb/core/viewer/json_pqconsumerinfo.h
@@ -19,7 +19,6 @@ class TJsonPQConsumerInfo : public TActorBootstrapped<TJsonPQConsumerInfo> {
using TBase = TActorBootstrapped<TJsonPQConsumerInfo>;
IViewer* Viewer;
NMon::TEvHttpInfo::TPtr Event;
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
NKikimrClient::TResponse Result;
TJsonSettings JsonSettings;
TString Topic;
@@ -37,12 +36,10 @@ public:
TJsonPQConsumerInfo(
IViewer* viewer,
- NMon::TEvHttpInfo::TPtr& ev,
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory = nullptr
+ NMon::TEvHttpInfo::TPtr& ev
)
: Viewer(viewer)
, Event(ev)
- , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory)
{}
void Bootstrap(const TActorContext& ctx) {
@@ -71,8 +68,7 @@ public:
ctx.Register(NMsgBusProxy::CreateActorServerPersQueue(
ctx.SelfID,
request,
- NMsgBusProxy::CreatePersQueueMetaCacheV2Id(),
- PQReadSessionsInfoWorkerFactory
+ NMsgBusProxy::CreatePersQueueMetaCacheV2Id()
));
++Requests;
}
@@ -83,8 +79,7 @@ public:
ctx.Register(NMsgBusProxy::CreateActorServerPersQueue(
ctx.SelfID,
request,
- NMsgBusProxy::CreatePersQueueMetaCacheV2Id(),
- PQReadSessionsInfoWorkerFactory
+ NMsgBusProxy::CreatePersQueueMetaCacheV2Id()
));
++Requests;
}
diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp
index f70f84276a..796fbc069d 100644
--- a/ydb/core/viewer/viewer.cpp
+++ b/ydb/core/viewer/viewer.cpp
@@ -92,23 +92,6 @@ public:
}
};
-template <typename ActorRequestType>
-class TPersQueueJsonHandler : public TJsonHandler<ActorRequestType> {
-public:
- TPersQueueJsonHandler(
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
- )
- : PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory)
- {}
-
- IActor* CreateRequestActor(IViewer* viewer, NMon::TEvHttpInfo::TPtr& event) override {
- return new ActorRequestType(viewer, event, PQReadSessionsInfoWorkerFactory);
- }
-
-private:
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
-};
-
void SetupPQVirtualHandlers(IViewer* viewer) {
viewer->RegisterVirtualHandler(
NKikimrViewer::EObjectType::Root,
@@ -146,12 +129,8 @@ public:
return NKikimrServices::TActivity::TABLET_MONITORING_PROXY;
}
- TViewer(
- const TKikimrRunConfig &kikimrRunConfig,
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
- )
+ TViewer(const TKikimrRunConfig &kikimrRunConfig)
: KikimrRunConfig(kikimrRunConfig)
- , PQReadSessionsInfoWorkerFactory(pqReadSessionsInfoWorkerFactory)
{}
void Bootstrap(const TActorContext &ctx) {
@@ -212,9 +191,7 @@ public:
JsonHandlers["/json/config"] = new TJsonHandler<TJsonConfig>;
JsonHandlers["/json/counters"] = new TJsonHandler<TJsonCounters>;
JsonHandlers["/json/topicinfo"] = new TJsonHandler<TJsonTopicInfo>;
- JsonHandlers["/json/pqconsumerinfo"] = new TPersQueueJsonHandler<TJsonPQConsumerInfo>(
- PQReadSessionsInfoWorkerFactory
- );
+ JsonHandlers["/json/pqconsumerinfo"] = new TJsonHandler<TJsonPQConsumerInfo>();
JsonHandlers["/json/tabletcounters"] = new TJsonHandler<TJsonTabletCounters>;
JsonHandlers["/json/storage"] = new TJsonHandler<TJsonStorage>;
JsonHandlers["/json/metainfo"] = new TJsonHandler<TJsonMetaInfo>;
@@ -272,7 +249,6 @@ public:
private:
THashMap<TString, TAutoPtr<TJsonHandlerBase>> JsonHandlers;
const TKikimrRunConfig KikimrRunConfig;
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
std::unordered_multimap<NKikimrViewer::EObjectType, TVirtualHandler> VirtualHandlersByParentType;
std::unordered_map<NKikimrViewer::EObjectType, TContentHandler> ContentHandlers;
TString AllowOrigin;
@@ -531,11 +507,10 @@ TString IViewer::TContentRequestContext::Dump() const
ui32 CurrentMonitoringPort = 8765;
IActor* CreateViewer(
- const TKikimrRunConfig &kikimrRunConfig,
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
+ const TKikimrRunConfig &kikimrRunConfig
) {
CurrentMonitoringPort = kikimrRunConfig.AppConfig.GetMonitoringConfig().GetMonitoringPort();
- return new TViewer(kikimrRunConfig, pqReadSessionsInfoWorkerFactory);
+ return new TViewer(kikimrRunConfig);
}
TString TViewer::GetHTTPOKJSON(const NMon::TEvHttpInfo* request) {
diff --git a/ydb/core/viewer/viewer.h b/ydb/core/viewer/viewer.h
index f5544c78f8..5b14a9e4a5 100644
--- a/ydb/core/viewer/viewer.h
+++ b/ydb/core/viewer/viewer.h
@@ -18,10 +18,7 @@ inline TActorId MakeViewerID(ui32 node = 0) {
return TActorId(node, TStringBuf(x, 12));
}
-IActor* CreateViewer(
- const TKikimrRunConfig &kikimrRunConfig,
- std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory
-);
+IActor* CreateViewer(const TKikimrRunConfig &kikimrRunConfig);
class IViewer {
public:
diff --git a/ydb/public/lib/base/msgbus.h b/ydb/public/lib/base/msgbus.h
index f11bd9771f..3662bed19a 100644
--- a/ydb/public/lib/base/msgbus.h
+++ b/ydb/public/lib/base/msgbus.h
@@ -263,14 +263,12 @@ class IPersQueueGetReadSessionsInfoWorkerFactory;
IMessageBusServer* CreateMsgBusServer(
NBus::TBusMessageQueue *queue,
const NBus::TBusServerSessionConfig &config,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory,
ui32 bindPort = TProtocol::DefaultPort
);
IMessageBusServer* CreateMsgBusTracingServer(
NBus::TBusMessageQueue *queue,
const NBus::TBusServerSessionConfig &config,
const TString &tracePath,
- std::shared_ptr<IPersQueueGetReadSessionsInfoWorkerFactory> pqReadSessionsInfoWorkerFactory,
ui32 bindPort = TProtocol::DefaultPort
);
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index ba6b9fe5ef..3e46415a89 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -2907,8 +2907,7 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorC
ctx.Register(NMsgBusProxy::CreateActorServerPersQueue(
ctx.SelfID,
proto,
- SchemeCache,
- std::make_shared<NMsgBusProxy::TPersQueueGetReadSessionsInfoWorkerFactory>()
+ SchemeCache
));
}