aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/messqueue.cpp
diff options
context:
space:
mode:
authorvladimir <vladimir@yandex-team.ru>2022-02-10 16:50:29 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:29 +0300
commit4bac7bacd041dac72ece081598805d03d2e80a3e (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/messqueue.cpp
parent3e7ff6e4ee637c04455854159e84850e613ebc16 (diff)
downloadydb-4bac7bacd041dac72ece081598805d03d2e80a3e.tar.gz
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/messqueue.cpp')
-rw-r--r--library/cpp/messagebus/messqueue.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 8040dbf69e..3474d62705 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -3,13 +3,13 @@
#include "remote_client_session.h"
#include "remote_server_session.h"
#include "ybus.h"
-
+
#include <util/generic/singleton.h>
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NActor;
-
+
TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) {
return new TBusMessageQueue(config, executor, locator, name);
}
@@ -20,8 +20,8 @@ TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBus
executorConfig.Name = name;
TExecutorPtr executor = new TExecutor(executorConfig);
return CreateMessageQueue(config, executor, locator, name);
-}
-
+}
+
TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
return CreateMessageQueue(config, new TBusLocator, name);
}
@@ -50,15 +50,15 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e
, Locator(locator)
, WorkQueue(executor)
, Running(1)
-{
+{
InitBusLwtrace();
InitNetworkSubSystem();
-}
-
-TBusMessageQueue::~TBusMessageQueue() {
+}
+
+TBusMessageQueue::~TBusMessageQueue() {
Stop();
-}
-
+}
+
void TBusMessageQueue::Stop() {
if (!AtomicCas(&Running, 0, 1)) {
ShutdownComplete.WaitI();
@@ -128,8 +128,8 @@ TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusCli
TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
Add(session.Get());
return session.Get();
-}
-
+}
+
TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name));
try {
@@ -140,7 +140,7 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
if (port == 0) {
port = proto->GetPort();
}
-
+
session->Listen(port, this);
Add(session.Get());
@@ -164,8 +164,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
TGuard<TMutex> scope(Lock);
Sessions.push_back(session);
-}
-
+}
+
void TBusMessageQueue::Remove(TBusSession* session) {
TGuard<TMutex> scope(Lock);
TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session);