diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/messqueue.cpp | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/messqueue.cpp')
-rw-r--r-- | library/cpp/messagebus/messqueue.cpp | 222 |
1 files changed, 111 insertions, 111 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index f77e54b548..3474d62705 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -1,58 +1,58 @@ #include "key_value_printer.h" #include "mb_lwtrace.h" -#include "remote_client_session.h" -#include "remote_server_session.h" +#include "remote_client_session.h" +#include "remote_server_session.h" #include "ybus.h" #include <util/generic/singleton.h> -using namespace NBus; -using namespace NBus::NPrivate; -using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NActor; -TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) { - return new TBusMessageQueue(config, executor, locator, name); -} - -TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name) { - TExecutor::TConfig executorConfig; - executorConfig.WorkerCount = config.NumWorkers; - executorConfig.Name = name; - TExecutorPtr executor = new TExecutor(executorConfig); - return CreateMessageQueue(config, executor, locator, name); +TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) { + return new TBusMessageQueue(config, executor, locator, name); } -TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) { - return CreateMessageQueue(config, new TBusLocator, name); +TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name) { + TExecutor::TConfig executorConfig; + executorConfig.WorkerCount = config.NumWorkers; + executorConfig.Name = name; + TExecutorPtr executor = new TExecutor(executorConfig); + return CreateMessageQueue(config, executor, locator, name); } -TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) { - return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name); -} - -TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) { +TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) { + return CreateMessageQueue(config, new TBusLocator, name); +} + +TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) { + return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name); +} + +TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) { TBusQueueConfig config; - return CreateMessageQueue(config, name); + return CreateMessageQueue(config, name); } -namespace { +namespace { TBusQueueConfig QueueConfigFillDefaults(const TBusQueueConfig& orig, const TString& name) { - TBusQueueConfig patched = orig; - if (!patched.Name) { - patched.Name = name; - } - return patched; - } -} - -TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) - : Config(QueueConfigFillDefaults(config, name)) - , Locator(locator) - , WorkQueue(executor) - , Running(1) + TBusQueueConfig patched = orig; + if (!patched.Name) { + patched.Name = name; + } + return patched; + } +} + +TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) + : Config(QueueConfigFillDefaults(config, name)) + , Locator(locator) + , WorkQueue(executor) + , Running(1) { - InitBusLwtrace(); - InitNetworkSubSystem(); + InitBusLwtrace(); + InitNetworkSubSystem(); } TBusMessageQueue::~TBusMessageQueue() { @@ -60,73 +60,73 @@ TBusMessageQueue::~TBusMessageQueue() { } void TBusMessageQueue::Stop() { - if (!AtomicCas(&Running, 0, 1)) { - ShutdownComplete.WaitI(); - return; - } - - Scheduler.Stop(); - - DestroyAllSessions(); - - WorkQueue->Stop(); - - ShutdownComplete.Signal(); -} - -bool TBusMessageQueue::IsRunning() { - return AtomicGet(Running); -} - + if (!AtomicCas(&Running, 0, 1)) { + ShutdownComplete.WaitI(); + return; + } + + Scheduler.Stop(); + + DestroyAllSessions(); + + WorkQueue->Stop(); + + ShutdownComplete.Signal(); +} + +bool TBusMessageQueue::IsRunning() { + return AtomicGet(Running); +} + TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const { - TBusMessageQueueStatus r; - r.ExecutorStatus = WorkQueue->GetStatusRecordInternal(); - r.Config = Config; - return r; -} - + TBusMessageQueueStatus r; + r.ExecutorStatus = WorkQueue->GetStatusRecordInternal(); + r.Config = Config; + return r; +} + TString TBusMessageQueue::GetStatusSelf() const { - return GetStatusRecordInternal().PrintToString(); -} - + return GetStatusRecordInternal().PrintToString(); +} + TString TBusMessageQueue::GetStatusSingleLine() const { - return WorkQueue->GetStatusSingleLine(); -} - + return WorkQueue->GetStatusSingleLine(); +} + TString TBusMessageQueue::GetStatus(ui16 flags) const { - TStringStream ss; - - ss << GetStatusSelf(); - + TStringStream ss; + + ss << GetStatusSelf(); + TList<TIntrusivePtr<TBusSessionImpl>> sessions; - { - TGuard<TMutex> scope(Lock); - sessions = Sessions; - } - + { + TGuard<TMutex> scope(Lock); + sessions = Sessions; + } + for (TList<TIntrusivePtr<TBusSessionImpl>>::const_iterator session = sessions.begin(); session != sessions.end(); ++session) { - ss << Endl; - ss << (*session)->GetStatus(flags); - } - - ss << Endl; - ss << "object counts (not necessarily owned by this message queue):" << Endl; - TKeyValuePrinter p; - p.AddRow("TRemoteClientConnection", TObjectCounter<TRemoteClientConnection>::ObjectCount(), false); - p.AddRow("TRemoteServerConnection", TObjectCounter<TRemoteServerConnection>::ObjectCount(), false); - p.AddRow("TRemoteClientSession", TObjectCounter<TRemoteClientSession>::ObjectCount(), false); - p.AddRow("TRemoteServerSession", TObjectCounter<TRemoteServerSession>::ObjectCount(), false); - p.AddRow("NEventLoop::TEventLoop", TObjectCounter<NEventLoop::TEventLoop>::ObjectCount(), false); - p.AddRow("NEventLoop::TChannel", TObjectCounter<NEventLoop::TChannel>::ObjectCount(), false); - ss << p.PrintToString(); - - return ss.Str(); -} - + ss << Endl; + ss << (*session)->GetStatus(flags); + } + + ss << Endl; + ss << "object counts (not necessarily owned by this message queue):" << Endl; + TKeyValuePrinter p; + p.AddRow("TRemoteClientConnection", TObjectCounter<TRemoteClientConnection>::ObjectCount(), false); + p.AddRow("TRemoteServerConnection", TObjectCounter<TRemoteServerConnection>::ObjectCount(), false); + p.AddRow("TRemoteClientSession", TObjectCounter<TRemoteClientSession>::ObjectCount(), false); + p.AddRow("TRemoteServerSession", TObjectCounter<TRemoteServerSession>::ObjectCount(), false); + p.AddRow("NEventLoop::TEventLoop", TObjectCounter<NEventLoop::TEventLoop>::ObjectCount(), false); + p.AddRow("NEventLoop::TChannel", TObjectCounter<NEventLoop::TChannel>::ObjectCount(), false); + ss << p.PrintToString(); + + return ss.Str(); +} + TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) { - TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name)); - Add(session.Get()); + TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name)); + Add(session.Get()); return session.Get(); } @@ -161,27 +161,27 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB } } -void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) { +void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) { TGuard<TMutex> scope(Lock); Sessions.push_back(session); } -void TBusMessageQueue::Remove(TBusSession* session) { - TGuard<TMutex> scope(Lock); +void TBusMessageQueue::Remove(TBusSession* session) { + TGuard<TMutex> scope(Lock); TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session); Y_VERIFY(it != Sessions.end(), "do not destroy session twice"); - Sessions.erase(it); -} - + Sessions.erase(it); +} + void TBusMessageQueue::Destroy(TBusSession* session) { - session->Shutdown(); -} - -void TBusMessageQueue::DestroyAllSessions() { + session->Shutdown(); +} + +void TBusMessageQueue::DestroyAllSessions() { TList<TIntrusivePtr<TBusSessionImpl>> sessions; { TGuard<TMutex> scope(Lock); - sessions = Sessions; + sessions = Sessions; } for (auto& session : sessions) { @@ -194,5 +194,5 @@ void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) { } TString TBusMessageQueue::GetNameInternal() const { - return Config.Name; -} + return Config.Name; +} |