aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/messqueue.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/messqueue.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/messqueue.cpp')
-rw-r--r--library/cpp/messagebus/messqueue.cpp198
1 files changed, 198 insertions, 0 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
new file mode 100644
index 0000000000..3474d62705
--- /dev/null
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -0,0 +1,198 @@
+#include "key_value_printer.h"
+#include "mb_lwtrace.h"
+#include "remote_client_session.h"
+#include "remote_server_session.h"
+#include "ybus.h"
+
+#include <util/generic/singleton.h>
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NActor;
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) {
+ return new TBusMessageQueue(config, executor, locator, name);
+}
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name) {
+ TExecutor::TConfig executorConfig;
+ executorConfig.WorkerCount = config.NumWorkers;
+ executorConfig.Name = name;
+ TExecutorPtr executor = new TExecutor(executorConfig);
+ return CreateMessageQueue(config, executor, locator, name);
+}
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
+ return CreateMessageQueue(config, new TBusLocator, name);
+}
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(TExecutorPtr executor, const char* name) {
+ return CreateMessageQueue(TBusQueueConfig(), executor, new TBusLocator, name);
+}
+
+TBusMessageQueuePtr NBus::CreateMessageQueue(const char* name) {
+ TBusQueueConfig config;
+ return CreateMessageQueue(config, name);
+}
+
+namespace {
+ TBusQueueConfig QueueConfigFillDefaults(const TBusQueueConfig& orig, const TString& name) {
+ TBusQueueConfig patched = orig;
+ if (!patched.Name) {
+ patched.Name = name;
+ }
+ return patched;
+ }
+}
+
+TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name)
+ : Config(QueueConfigFillDefaults(config, name))
+ , Locator(locator)
+ , WorkQueue(executor)
+ , Running(1)
+{
+ InitBusLwtrace();
+ InitNetworkSubSystem();
+}
+
+TBusMessageQueue::~TBusMessageQueue() {
+ Stop();
+}
+
+void TBusMessageQueue::Stop() {
+ if (!AtomicCas(&Running, 0, 1)) {
+ ShutdownComplete.WaitI();
+ return;
+ }
+
+ Scheduler.Stop();
+
+ DestroyAllSessions();
+
+ WorkQueue->Stop();
+
+ ShutdownComplete.Signal();
+}
+
+bool TBusMessageQueue::IsRunning() {
+ return AtomicGet(Running);
+}
+
+TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const {
+ TBusMessageQueueStatus r;
+ r.ExecutorStatus = WorkQueue->GetStatusRecordInternal();
+ r.Config = Config;
+ return r;
+}
+
+TString TBusMessageQueue::GetStatusSelf() const {
+ return GetStatusRecordInternal().PrintToString();
+}
+
+TString TBusMessageQueue::GetStatusSingleLine() const {
+ return WorkQueue->GetStatusSingleLine();
+}
+
+TString TBusMessageQueue::GetStatus(ui16 flags) const {
+ TStringStream ss;
+
+ ss << GetStatusSelf();
+
+ TList<TIntrusivePtr<TBusSessionImpl>> sessions;
+ {
+ TGuard<TMutex> scope(Lock);
+ sessions = Sessions;
+ }
+
+ for (TList<TIntrusivePtr<TBusSessionImpl>>::const_iterator session = sessions.begin();
+ session != sessions.end(); ++session) {
+ ss << Endl;
+ ss << (*session)->GetStatus(flags);
+ }
+
+ ss << Endl;
+ ss << "object counts (not necessarily owned by this message queue):" << Endl;
+ TKeyValuePrinter p;
+ p.AddRow("TRemoteClientConnection", TObjectCounter<TRemoteClientConnection>::ObjectCount(), false);
+ p.AddRow("TRemoteServerConnection", TObjectCounter<TRemoteServerConnection>::ObjectCount(), false);
+ p.AddRow("TRemoteClientSession", TObjectCounter<TRemoteClientSession>::ObjectCount(), false);
+ p.AddRow("TRemoteServerSession", TObjectCounter<TRemoteServerSession>::ObjectCount(), false);
+ p.AddRow("NEventLoop::TEventLoop", TObjectCounter<NEventLoop::TEventLoop>::ObjectCount(), false);
+ p.AddRow("NEventLoop::TChannel", TObjectCounter<NEventLoop::TChannel>::ObjectCount(), false);
+ ss << p.PrintToString();
+
+ return ss.Str();
+}
+
+TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
+ TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
+ Add(session.Get());
+ return session.Get();
+}
+
+TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
+ TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name));
+ try {
+ int port = config.ListenPort;
+ if (port == 0) {
+ port = Locator->GetLocalPort(proto->GetService());
+ }
+ if (port == 0) {
+ port = proto->GetPort();
+ }
+
+ session->Listen(port, this);
+
+ Add(session.Get());
+ return session.Release();
+ } catch (...) {
+ Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str());
+ }
+}
+
+TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name) {
+ TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name));
+ try {
+ session->Listen(bindTo, this);
+ Add(session.Get());
+ return session.Release();
+ } catch (...) {
+ Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str());
+ }
+}
+
+void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
+ TGuard<TMutex> scope(Lock);
+ Sessions.push_back(session);
+}
+
+void TBusMessageQueue::Remove(TBusSession* session) {
+ TGuard<TMutex> scope(Lock);
+ TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session);
+ Y_VERIFY(it != Sessions.end(), "do not destroy session twice");
+ Sessions.erase(it);
+}
+
+void TBusMessageQueue::Destroy(TBusSession* session) {
+ session->Shutdown();
+}
+
+void TBusMessageQueue::DestroyAllSessions() {
+ TList<TIntrusivePtr<TBusSessionImpl>> sessions;
+ {
+ TGuard<TMutex> scope(Lock);
+ sessions = Sessions;
+ }
+
+ for (auto& session : sessions) {
+ Y_VERIFY(session->IsDown(), "Session must be shut down prior to queue shutdown");
+ }
+}
+
+void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
+ Scheduler.Schedule(i);
+}
+
+TString TBusMessageQueue::GetNameInternal() const {
+ return Config.Name;
+}