diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/oldmodule/startsession.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/oldmodule/startsession.cpp')
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.cpp | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp new file mode 100644 index 0000000000..7c38801d62 --- /dev/null +++ b/library/cpp/messagebus/oldmodule/startsession.cpp @@ -0,0 +1,65 @@ +/////////////////////////////////////////////////////////// +/// \file +/// \brief Starter session implementation + +/// Starter session will generate emtpy message to insert +/// into local session that are registered under same protocol + +/// Starter (will one day) automatically adjust number +/// of message inflight to make sure that at least one of source +/// sessions within message queue is at the limit (bottle neck) + +/// Maximum number of messages that starter will instert into +/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight + +#include "startsession.h" + +#include "module.h" + +#include <library/cpp/messagebus/ybus.h> + +namespace NBus { + void* TBusStarter::_starter(void* data) { + TBusStarter* pThis = static_cast<TBusStarter*>(data); + pThis->Starter(); + return nullptr; + } + + TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config) + : Module(module) + , Config(config) + , StartThread(_starter, this) + , Exiting(false) + { + StartThread.Start(); + } + + TBusStarter::~TBusStarter() { + Shutdown(); + } + + void TBusStarter::Shutdown() { + { + TGuard<TMutex> g(ExitLock); + Exiting = true; + ExitSignal.Signal(); + } + StartThread.Join(); + } + + void TBusStarter::Starter() { + TGuard<TMutex> g(ExitLock); + while (!Exiting) { + TAutoPtr<TBusMessage> empty(new TBusMessage(0)); + + EMessageStatus status = Module->StartJob(empty); + + if (Config.SendTimeout > 0) { + ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout)); + } else { + ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero()); + } + } + } + +} |