aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/messqueue.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/messqueue.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/messqueue.cpp')
-rw-r--r--library/cpp/messagebus/messqueue.cpp222
1 files changed, 111 insertions, 111 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 3474d62705..f77e54b548 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -1,58 +1,58 @@
#include "key_value_printer.h"
#include "mb_lwtrace.h"
-#include "remote_client_session.h"
-#include "remote_server_session.h"
+#include "remote_client_session.h"
+#include "remote_server_session.h"
#include "ybus.h"
#include <util/generic/singleton.h>
-using namespace NBus;
-using namespace NBus::NPrivate;
-using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NActor;
-TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) {
- return new TBusMessageQueue(config, executor, locator, name);
+TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) {
+ return new TBusMessageQueue(config, executor, locator, name);
+}
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name) {
+ TExecutor::TConfig executorConfig;
+ executorConfig.WorkerCount = config.NumWorkers;
+ executorConfig.Name = name;
+ TExecutorPtr executor = new TExecutor(executorConfig);
+ return CreateMessageQueue(config, executor, locator, name);
}
-TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name) {
- TExecutor::TConfig executorConfig;
- executorConfig.WorkerCount = config.NumWorkers;
- executorConfig.Name = name;
- TExecutorPtr executor = new TExecutor(executorConfig);
- return CreateMessageQueue(config, executor, locator, name);
+TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
+ return CreateMessageQueue(config, new TBusLocator, name);
}
-TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
- return CreateMessageQueue(config, new TBusLocator, name);
-}
-
-TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) {
- return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name);
-}
-
-TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) {
+TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) {
+ return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name);
+}
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) {
TBusQueueConfig config;
- return CreateMessageQueue(config, name);
+ return CreateMessageQueue(config, name);
}
-namespace {
+namespace {
TBusQueueConfig QueueConfigFillDefaults(const TBusQueueConfig& orig, const TString& name) {
- TBusQueueConfig patched = orig;
- if (!patched.Name) {
- patched.Name = name;
- }
- return patched;
- }
-}
-
-TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name)
- : Config(QueueConfigFillDefaults(config, name))
- , Locator(locator)
- , WorkQueue(executor)
- , Running(1)
+ TBusQueueConfig patched = orig;
+ if (!patched.Name) {
+ patched.Name = name;
+ }
+ return patched;
+ }
+}
+
+TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name)
+ : Config(QueueConfigFillDefaults(config, name))
+ , Locator(locator)
+ , WorkQueue(executor)
+ , Running(1)
{
- InitBusLwtrace();
- InitNetworkSubSystem();
+ InitBusLwtrace();
+ InitNetworkSubSystem();
}
TBusMessageQueue::~TBusMessageQueue() {
@@ -60,73 +60,73 @@ TBusMessageQueue::~TBusMessageQueue() {
}
void TBusMessageQueue::Stop() {
- if (!AtomicCas(&Running, 0, 1)) {
- ShutdownComplete.WaitI();
- return;
- }
-
- Scheduler.Stop();
-
- DestroyAllSessions();
-
- WorkQueue->Stop();
-
- ShutdownComplete.Signal();
-}
-
-bool TBusMessageQueue::IsRunning() {
- return AtomicGet(Running);
-}
-
+ if (!AtomicCas(&Running, 0, 1)) {
+ ShutdownComplete.WaitI();
+ return;
+ }
+
+ Scheduler.Stop();
+
+ DestroyAllSessions();
+
+ WorkQueue->Stop();
+
+ ShutdownComplete.Signal();
+}
+
+bool TBusMessageQueue::IsRunning() {
+ return AtomicGet(Running);
+}
+
TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const {
- TBusMessageQueueStatus r;
- r.ExecutorStatus = WorkQueue->GetStatusRecordInternal();
- r.Config = Config;
- return r;
-}
-
+ TBusMessageQueueStatus r;
+ r.ExecutorStatus = WorkQueue->GetStatusRecordInternal();
+ r.Config = Config;
+ return r;
+}
+
TString TBusMessageQueue::GetStatusSelf() const {
- return GetStatusRecordInternal().PrintToString();
-}
-
+ return GetStatusRecordInternal().PrintToString();
+}
+
TString TBusMessageQueue::GetStatusSingleLine() const {
- return WorkQueue->GetStatusSingleLine();
-}
-
+ return WorkQueue->GetStatusSingleLine();
+}
+
TString TBusMessageQueue::GetStatus(ui16 flags) const {
- TStringStream ss;
-
- ss << GetStatusSelf();
-
+ TStringStream ss;
+
+ ss << GetStatusSelf();
+
TList<TIntrusivePtr<TBusSessionImpl>> sessions;
- {
- TGuard<TMutex> scope(Lock);
- sessions = Sessions;
- }
-
+ {
+ TGuard<TMutex> scope(Lock);
+ sessions = Sessions;
+ }
+
for (TList<TIntrusivePtr<TBusSessionImpl>>::const_iterator session = sessions.begin();
session != sessions.end(); ++session) {
- ss << Endl;
- ss << (*session)->GetStatus(flags);
- }
-
- ss << Endl;
- ss << "object counts (not necessarily owned by this message queue):" << Endl;
- TKeyValuePrinter p;
- p.AddRow("TRemoteClientConnection", TObjectCounter<TRemoteClientConnection>::ObjectCount(), false);
- p.AddRow("TRemoteServerConnection", TObjectCounter<TRemoteServerConnection>::ObjectCount(), false);
- p.AddRow("TRemoteClientSession", TObjectCounter<TRemoteClientSession>::ObjectCount(), false);
- p.AddRow("TRemoteServerSession", TObjectCounter<TRemoteServerSession>::ObjectCount(), false);
- p.AddRow("NEventLoop::TEventLoop", TObjectCounter<NEventLoop::TEventLoop>::ObjectCount(), false);
- p.AddRow("NEventLoop::TChannel", TObjectCounter<NEventLoop::TChannel>::ObjectCount(), false);
- ss << p.PrintToString();
-
- return ss.Str();
-}
-
+ ss << Endl;
+ ss << (*session)->GetStatus(flags);
+ }
+
+ ss << Endl;
+ ss << "object counts (not necessarily owned by this message queue):" << Endl;
+ TKeyValuePrinter p;
+ p.AddRow("TRemoteClientConnection", TObjectCounter<TRemoteClientConnection>::ObjectCount(), false);
+ p.AddRow("TRemoteServerConnection", TObjectCounter<TRemoteServerConnection>::ObjectCount(), false);
+ p.AddRow("TRemoteClientSession", TObjectCounter<TRemoteClientSession>::ObjectCount(), false);
+ p.AddRow("TRemoteServerSession", TObjectCounter<TRemoteServerSession>::ObjectCount(), false);
+ p.AddRow("NEventLoop::TEventLoop", TObjectCounter<NEventLoop::TEventLoop>::ObjectCount(), false);
+ p.AddRow("NEventLoop::TChannel", TObjectCounter<NEventLoop::TChannel>::ObjectCount(), false);
+ ss << p.PrintToString();
+
+ return ss.Str();
+}
+
TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
- TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
- Add(session.Get());
+ TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
+ Add(session.Get());
return session.Get();
}
@@ -161,27 +161,27 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
}
}
-void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
+void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
TGuard<TMutex> scope(Lock);
Sessions.push_back(session);
}
-void TBusMessageQueue::Remove(TBusSession* session) {
- TGuard<TMutex> scope(Lock);
+void TBusMessageQueue::Remove(TBusSession* session) {
+ TGuard<TMutex> scope(Lock);
TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session);
Y_VERIFY(it != Sessions.end(), "do not destroy session twice");
- Sessions.erase(it);
-}
-
+ Sessions.erase(it);
+}
+
void TBusMessageQueue::Destroy(TBusSession* session) {
- session->Shutdown();
-}
-
-void TBusMessageQueue::DestroyAllSessions() {
+ session->Shutdown();
+}
+
+void TBusMessageQueue::DestroyAllSessions() {
TList<TIntrusivePtr<TBusSessionImpl>> sessions;
{
TGuard<TMutex> scope(Lock);
- sessions = Sessions;
+ sessions = Sessions;
}
for (auto& session : sessions) {
@@ -194,5 +194,5 @@ void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
}
TString TBusMessageQueue::GetNameInternal() const {
- return Config.Name;
-}
+ return Config.Name;
+}