aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/module.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/oldmodule/module.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.cpp')
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp881
1 files changed, 881 insertions, 0 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
new file mode 100644
index 0000000000..24bd778799
--- /dev/null
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -0,0 +1,881 @@
+#include "module.h"
+
+#include <library/cpp/messagebus/scheduler_actor.h>
+#include <library/cpp/messagebus/thread_extra.h>
+#include <library/cpp/messagebus/actor/actor.h>
+#include <library/cpp/messagebus/actor/queue_in_actor.h>
+#include <library/cpp/messagebus/actor/what_thread_does.h>
+#include <library/cpp/messagebus/actor/what_thread_does_guard.h>
+
+#include <util/generic/singleton.h>
+#include <util/string/printf.h>
+#include <util/system/event.h>
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+namespace {
+ Y_POD_STATIC_THREAD(TBusJob*)
+ ThreadCurrentJob;
+
+ struct TThreadCurrentJobGuard {
+ TBusJob* Prev;
+
+ TThreadCurrentJobGuard(TBusJob* job)
+ : Prev(ThreadCurrentJob)
+ {
+ Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job);
+ ThreadCurrentJob = job;
+ }
+ ~TThreadCurrentJobGuard() {
+ ThreadCurrentJob = Prev;
+ }
+ };
+
+ void ClearState(NBus::TJobState* state) {
+ /// skip sendbacks handlers
+ if (state->Message != state->Reply) {
+ if (state->Message) {
+ delete state->Message;
+ state->Message = nullptr;
+ }
+
+ if (state->Reply) {
+ delete state->Reply;
+ state->Reply = nullptr;
+ }
+ }
+ }
+
+ void ClearJobStateVector(NBus::TJobStateVec* vec) {
+ Y_ASSERT(vec);
+
+ for (auto& call : *vec) {
+ ClearState(&call);
+ }
+
+ vec->clear();
+ }
+
+}
+
+namespace NBus {
+ namespace NPrivate {
+ class TJobStorage {
+ };
+
+ struct TModuleClientHandler
+ : public IBusClientHandler {
+ TModuleClientHandler(TBusModuleImpl* module)
+ : Module(module)
+ {
+ }
+
+ void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
+ void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
+ void OnClientConnectionEvent(const TClientConnectionEvent& event) override;
+
+ TBusModuleImpl* const Module;
+ };
+
+ struct TModuleServerHandler
+ : public IBusServerHandler {
+ TModuleServerHandler(TBusModuleImpl* module)
+ : Module(module)
+ {
+ }
+
+ void OnMessage(TOnMessageContext& msg) override;
+
+ TBusModuleImpl* const Module;
+ };
+
+ struct TBusModuleImpl: public TBusModuleInternal {
+ TBusModule* const Module;
+
+ TBusMessageQueue* Queue;
+
+ TScheduler Scheduler;
+
+ const char* const Name;
+
+ typedef TList<TJobRunner*> TBusJobList;
+ /// jobs currently in-flight on this module
+ TBusJobList Jobs;
+ /// module level mutex
+ TMutex Lock;
+ TCondVar ShutdownCondVar;
+ TAtomic JobCount;
+
+ enum EState {
+ CREATED,
+ RUNNING,
+ STOPPED,
+ };
+
+ TAtomic State;
+ TBusModuleConfig ModuleConfig;
+ TBusServerSessionPtr ExternalSession;
+ /// protocol for local proxy session
+ THolder<IBusClientHandler> ModuleClientHandler;
+ THolder<IBusServerHandler> ModuleServerHandler;
+ TVector<TSimpleSharedPtr<TBusStarter>> Starters;
+
+ // Sessions must be destroyed before
+ // ModuleClientHandler / ModuleServerHandler
+ TVector<TBusClientSessionPtr> ClientSessions;
+ TVector<TBusServerSessionPtr> ServerSessions;
+
+ TBusModuleImpl(TBusModule* module, const char* name)
+ : Module(module)
+ , Queue()
+ , Name(name)
+ , JobCount(0)
+ , State(CREATED)
+ , ExternalSession(nullptr)
+ , ModuleClientHandler(new TModuleClientHandler(this))
+ , ModuleServerHandler(new TModuleServerHandler(this))
+ {
+ }
+
+ ~TBusModuleImpl() override {
+ // Shutdown cannot be called from destructor,
+ // because module has virtual methods.
+ Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");
+
+ Scheduler.Stop();
+
+ while (!Jobs.empty()) {
+ DestroyJob(Jobs.front());
+ }
+ Y_VERIFY(JobCount == 0, "state check");
+ }
+
+ void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);
+
+ void AddJob(TJobRunner* jobRunner);
+
+ void DestroyJob(TJobRunner* job);
+
+ /// terminate job on this message
+ void CancelJob(TBusJob* job, EMessageStatus status);
+ /// prints statuses of jobs
+ TString GetStatus(unsigned flags);
+
+ size_t Size() const {
+ return AtomicGet(JobCount);
+ }
+
+ void Shutdown();
+
+ TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
+ return ClientSessions;
+ }
+
+ TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
+ return ServerSessions;
+ }
+
+ TBusMessageQueue* GetQueue() override {
+ return Queue;
+ }
+
+ TString GetNameInternal() override {
+ return Name;
+ }
+
+ TString GetStatusSingleLine() override {
+ TStringStream ss;
+ ss << "jobs: " << Size();
+ return ss.Str();
+ }
+
+ void OnClientConnectionEvent(const TClientConnectionEvent& event) {
+ Module->OnClientConnectionEvent(event);
+ }
+ };
+
+ struct TJobResponseMessage {
+ TBusMessage* Request;
+ TBusMessage* Response;
+ EMessageStatus Status;
+
+ TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
+ : Request(request)
+ , Response(response)
+ , Status(status)
+ {
+ }
+ };
+
+ struct TJobRunner: public TAtomicRefCount<TJobRunner>,
+ public NActor::TActor<TJobRunner>,
+ public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
+ public TScheduleActor<TJobRunner> {
+ THolder<TBusJob> Job;
+
+ TList<TJobRunner*>::iterator JobStorageIterator;
+
+ TJobRunner(TAutoPtr<TBusJob> job)
+ : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
+ , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
+ , Job(job.Release())
+ , JobStorageIterator()
+ {
+ Job->Runner = this;
+ }
+
+ ~TJobRunner() override {
+ Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
+ }
+
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
+ Job->CallReplyHandler(message.Status, message.Request, message.Response);
+ }
+
+ void Destroy() {
+ if (!!Job->OnMessageContext) {
+ if (!Job->ReplySent) {
+ Job->OnMessageContext.ForgetRequest();
+ }
+ }
+ Job->ModuleImpl->DestroyJob(this);
+ }
+
+ void Act(NActor::TDefaultTag) {
+ if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
+ return;
+ }
+
+ if (Job->SleepUntil != 0) {
+ if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
+ Destroy();
+ return;
+ }
+ }
+
+ TThreadCurrentJobGuard g(Job.Get());
+
+ NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();
+
+ if (Alarm.FetchTask()) {
+ if (Job->AnyPendingToSend()) {
+ Y_ASSERT(Job->SleepUntil == 0);
+ Job->SendPending();
+ if (Job->AnyPendingToSend()) {
+ }
+ } else {
+ // regular alarm
+ Y_ASSERT(Job->Pending.empty());
+ Y_ASSERT(Job->SleepUntil != 0);
+ Job->SleepUntil = 0;
+ }
+ }
+
+ for (;;) {
+ if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
+ TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
+
+ Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
+ }
+
+ if (Job->SleepUntil != 0) {
+ ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
+ return;
+ }
+
+ Job->SendPending();
+
+ if (Job->AnyPendingToSend()) {
+ ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
+ return;
+ }
+
+ if (!Job->Pending.empty()) {
+ // waiting replies
+ return;
+ }
+
+ if (Job->IsDone()) {
+ Destroy();
+ return;
+ }
+ }
+ }
+ };
+
+ }
+
+ static inline TJobRunner* GetJob(TBusMessage* message) {
+ return (TJobRunner*)message->Data;
+ }
+
+ static inline void SetJob(TBusMessage* message, TJobRunner* job) {
+ message->Data = job;
+ }
+
+ TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
+ : Status(MESSAGE_OK)
+ , Runner()
+ , Message(message)
+ , ReplySent(false)
+ , Module(module)
+ , ModuleImpl(module->Impl.Get())
+ , SleepUntil(0)
+ {
+ Handler = TJobHandler(&TBusModule::Start);
+ }
+
+ TBusJob::~TBusJob() {
+ Y_ASSERT(Pending.size() == 0);
+ //Y_ASSERT(SleepUntil == 0);
+
+ ClearAllMessageStates();
+ }
+
+ TNetAddr TBusJob::GetPeerAddrNetAddr() const {
+ Y_VERIFY(!!OnMessageContext);
+ return OnMessageContext.GetPeerAddrNetAddr();
+ }
+
+ void TBusJob::CheckThreadCurrentJob() {
+ Y_ASSERT(ThreadCurrentJob == this);
+ }
+
+ /////////////////////////////////////////////////////////
+ /// \brief Send messages in pending list
+
+ /// If at least one message is gone return true
+ /// If message has not been send, move it to Finished with appropriate error code
+ bool TBusJob::SendPending() {
+ // Iterator type must be size_t, not vector::iterator,
+ // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector,
+ // that in turn invalidates iterator.
+ // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending`
+ // (not erases, and not inserts) so iteration by index is valid.
+ size_t it = 0;
+ while (it != Pending.size()) {
+ TJobState& call = Pending[it];
+
+ if (call.Status == MESSAGE_DONT_ASK) {
+ EMessageStatus getAddressStatus = MESSAGE_OK;
+ TNetAddr addr;
+ if (call.UseAddr) {
+ addr = call.Addr;
+ } else {
+ getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr);
+ }
+
+ if (getAddressStatus == MESSAGE_OK) {
+ // hold extra reference for each request in flight
+ Runner->Ref();
+
+ if (call.OneWay) {
+ call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
+ } else {
+ call.Status = call.Session->SendMessage(call.Message, &addr);
+ }
+
+ if (call.Status != MESSAGE_OK) {
+ Runner->UnRef();
+ }
+
+ } else {
+ call.Status = getAddressStatus;
+ }
+ }
+
+ if (call.Status == MESSAGE_OK) {
+ ++it; // keep pending list until we get reply
+ } else if (call.Status == MESSAGE_BUSY) {
+ Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight");
+ } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) {
+ ++it; // try up to call.MaxRetries times to send message
+ call.NumRetries++;
+ DoCallReplyHandler(call);
+ call.Status = MESSAGE_DONT_ASK;
+ call.Message->Reset(); // generate new Id
+ } else {
+ Finished.push_back(call);
+ DoCallReplyHandler(call);
+ Pending.erase(Pending.begin() + it);
+ }
+ }
+ return Pending.size() > 0;
+ }
+
+ bool TBusJob::AnyPendingToSend() {
+ for (unsigned i = 0; i < Pending.size(); ++i) {
+ if (Pending[i].Status == MESSAGE_DONT_ASK) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ bool TBusJob::IsDone() {
+ bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
+ return r;
+ }
+
+ void TBusJob::CallJobHandlerOnly() {
+ TThreadCurrentJobGuard threadCurrentJobGuard(this);
+ TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
+
+ Handler = Handler(ModuleImpl->Module, this, Message);
+ }
+
+ bool TBusJob::CallJobHandler() {
+ /// go on as far as we can go without waiting
+ while (!IsDone()) {
+ /// call the handler
+ CallJobHandlerOnly();
+
+ /// quit if job is canceled
+ if (Status != MESSAGE_OK) {
+ break;
+ }
+
+ /// there are messages to send and wait for reply
+ SendPending();
+
+ if (!Pending.empty()) {
+ break;
+ }
+
+ /// asked to sleep
+ if (SleepUntil) {
+ break;
+ }
+ }
+
+ Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
+ "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
+ Message->GetHeader()->Id, Message->GetHeader()->Type);
+
+ return IsDone();
+ }
+
+ void TBusJob::DoCallReplyHandler(TJobState& call) {
+ if (call.Handler) {
+ TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");
+
+ TThreadCurrentJobGuard threadCurrentJobGuard(this);
+ (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
+ }
+ }
+
+ int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
+ /// find handler for given message and update it's status
+ size_t i = 0;
+ for (; i < Pending.size(); ++i) {
+ TJobState& call = Pending[i];
+ if (call.Message == mess) {
+ break;
+ }
+ }
+
+ /// if not found, report error
+ if (i == Pending.size()) {
+ Y_FAIL("must not happen");
+ }
+
+ /// fill in response into job state
+ TJobState& call = Pending[i];
+ call.Status = status;
+ Y_ASSERT(call.Message == mess);
+ call.Reply = reply;
+
+ if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
+ call.NumRetries++;
+ call.Status = MESSAGE_DONT_ASK;
+ call.Message->Reset(); // generate new Id
+ DoCallReplyHandler(call);
+ return 0;
+ }
+
+ /// call the handler if provided
+ DoCallReplyHandler(call);
+
+ /// move job state into the finished stack
+ Finished.push_back(Pending[i]);
+ Pending.erase(Pending.begin() + i);
+
+ return 0;
+ }
+
+ ///////////////////////////////////////////////////////////////
+ /// send message to any other session or application
+ void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
+ CheckThreadCurrentJob();
+
+ SetJob(mess.Get(), Runner);
+ Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
+ }
+
+ void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
+ CheckThreadCurrentJob();
+
+ SetJob(mess.Get(), Runner);
+ Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
+ }
+
+ void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
+ CheckThreadCurrentJob();
+
+ SetJob(req.Get(), Runner);
+ Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
+ }
+
+ void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
+ CheckThreadCurrentJob();
+
+ SetJob(req.Get(), Runner);
+ Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
+ }
+
+ ///////////////////////////////////////////////////////////////
+ /// send reply to the starter message
+ void TBusJob::SendReply(TBusMessageAutoPtr reply) {
+ CheckThreadCurrentJob();
+
+ Y_VERIFY(!ReplySent, "cannot call SendReply twice");
+ ReplySent = true;
+ if (!OnMessageContext)
+ return;
+
+ EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
+ if (ok != MESSAGE_OK) {
+ // TODO: count errors
+ }
+ }
+
+ /// set the flag to terminate job at the earliest convenience
+ void TBusJob::Cancel(EMessageStatus status) {
+ CheckThreadCurrentJob();
+
+ Status = status;
+ }
+
+ void TBusJob::ClearState(TJobState& call) {
+ TJobStateVec::iterator it;
+ for (it = Finished.begin(); it != Finished.end(); ++it) {
+ TJobState& state = *it;
+ if (&call == &state) {
+ ::ClearState(&call);
+ Finished.erase(it);
+ return;
+ }
+ }
+ Y_ASSERT(0);
+ }
+
+ void TBusJob::ClearAllMessageStates() {
+ ClearJobStateVector(&Finished);
+ ClearJobStateVector(&Pending);
+ }
+
+ void TBusJob::Sleep(int milliSeconds) {
+ CheckThreadCurrentJob();
+
+ Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job");
+ Y_VERIFY(SleepUntil == 0, "must not override sleep");
+
+ SleepUntil = Now() + milliSeconds;
+ }
+
+ TString TBusJob::GetStatus(unsigned flags) {
+ TString strReturn;
+ strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
+ Message->GetHeader()->Id,
+ (int)Message->GetHeader()->Type,
+ (int)(Now() - Message->GetHeader()->SendTime) / 1000,
+ (int)Pending.size(),
+ (int)Finished.size(),
+ Status != MESSAGE_OK ? ToString(Status).data() : "");
+
+ TJobStateVec::iterator it;
+ for (it = Pending.begin(); it != Pending.end(); ++it) {
+ TJobState& call = *it;
+ strReturn += call.GetStatus(flags);
+ }
+ return strReturn;
+ }
+
+ TString TJobState::GetStatus(unsigned flags) {
+ Y_UNUSED(flags);
+ TString strReturn;
+ strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n",
+ Message->GetHeader()->Id,
+ (int)Message->GetHeader()->Type,
+ Session->GetProto()->GetService(),
+ (int)(Now() - Message->GetHeader()->SendTime) / 1000,
+ ToString(Status).data());
+ return strReturn;
+ }
+
+ //////////////////////////////////////////////////////////////////////
+
+ void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
+ if (job) {
+ job->Cancel(status);
+ }
+ }
+
+ TString TBusModuleImpl::GetStatus(unsigned flags) {
+ Y_UNUSED(flags);
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
+ TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size());
+ for (auto job : Jobs) {
+ //strReturn += job->Job->GetStatus(flags);
+ Y_UNUSED(job);
+ strReturn += "TODO\n";
+ }
+ return strReturn;
+ }
+
+ TBusModuleConfig::TBusModuleConfig()
+ : StarterMaxInFlight(1000)
+ {
+ }
+
+ TBusModuleConfig::TSecret::TSecret()
+ : SchedulePeriod(TDuration::Seconds(1))
+ {
+ }
+
+ TBusModule::TBusModule(const char* name)
+ : Impl(new TBusModuleImpl(this, name))
+ {
+ }
+
+ TBusModule::~TBusModule() {
+ }
+
+ const char* TBusModule::GetName() const {
+ return Impl->Name;
+ }
+
+ void TBusModule::SetConfig(const TBusModuleConfig& config) {
+ Impl->ModuleConfig = config;
+ }
+
+ bool TBusModule::StartInput() {
+ Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check");
+ Y_VERIFY(!!Impl->Queue, "state check");
+ Impl->State = TBusModuleImpl::RUNNING;
+
+ Y_ASSERT(!Impl->ExternalSession);
+ TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
+ if (extSession != nullptr) {
+ Impl->ExternalSession = extSession;
+ }
+
+ return true;
+ }
+
+ bool TBusModule::Shutdown() {
+ Impl->Shutdown();
+
+ return true;
+ }
+
+ TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
+ TBusJob* job = new TBusJob(this, message);
+ return job;
+ }
+
+ /**
+Example for external session creation:
+
+TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
+ TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig);
+ session->RegisterService(hostname, begin, end);
+ return session;
+*/
+
+ bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
+ Impl->Queue = queue;
+ return true;
+ }
+
+ int TBusModule::GetModuleSessionInFlight() const {
+ return Impl->Size();
+ }
+
+ TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
+ return Impl.Get();
+ }
+
+ TBusServerSessionPtr TBusModule::CreateDefaultDestination(
+ TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
+ TBusServerSessionConfig patchedConfig = config;
+ patchedConfig.ExecuteOnMessageInWorkerPool = false;
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = name;
+ }
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = Impl->Name;
+ }
+ TBusServerSessionPtr session =
+ TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
+ Impl->ServerSessions.push_back(session);
+ return session;
+ }
+
+ TBusClientSessionPtr TBusModule::CreateDefaultSource(
+ TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
+ TBusClientSessionConfig patchedConfig = config;
+ patchedConfig.ExecuteOnReplyInWorkerPool = false;
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = name;
+ }
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = Impl->Name;
+ }
+ TBusClientSessionPtr session =
+ TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
+ Impl->ClientSessions.push_back(session);
+ return session;
+ }
+
+ TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
+ TBusStarter* session = new TBusStarter(this, config);
+ Impl->Starters.push_back(session);
+ return session;
+ }
+
+ void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
+ Y_UNUSED(event);
+ }
+
+ TString TBusModule::GetStatus(unsigned flags) {
+ TString strReturn = Sprintf("%s\n", Impl->Name);
+ strReturn += Impl->GetStatus(flags);
+ return strReturn;
+ }
+
+}
+
+void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
+ Jobs.push_back(jobRunner);
+ jobRunner->JobStorageIterator = Jobs.end();
+ --jobRunner->JobStorageIterator;
+}
+
+void TBusModuleImpl::DestroyJob(TJobRunner* job) {
+ Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator());
+
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob");
+ int jobCount = AtomicDecrement(JobCount);
+ Y_VERIFY(jobCount >= 0, "decremented too much");
+ Jobs.erase(job->JobStorageIterator);
+
+ if (AtomicGet(State) == STOPPED) {
+ if (jobCount == 0) {
+ ShutdownCondVar.BroadCast();
+ }
+ }
+ }
+
+ job->JobStorageIterator = TList<TJobRunner*>::iterator();
+}
+
+void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
+ TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
+ Y_VERIFY(!!msg);
+
+ THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg)));
+ jobRunner->Job->MessageHolder.Reset(msg0.Release());
+ jobRunner->Job->OnMessageContext.Swap(context);
+ SetJob(jobRunner->Job->Message, jobRunner.Get());
+
+ AtomicIncrement(JobCount);
+
+ AddJob(jobRunner.Get());
+
+ jobRunner.Release()->Schedule();
+}
+
+void TBusModuleImpl::Shutdown() {
+ if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
+ AtomicSet(State, TBusModuleImpl::STOPPED);
+ return;
+ }
+ AtomicSet(State, TBusModuleImpl::STOPPED);
+
+ for (auto& clientSession : ClientSessions) {
+ clientSession->Shutdown();
+ }
+ for (auto& serverSession : ServerSessions) {
+ serverSession->Shutdown();
+ }
+
+ for (size_t starter = 0; starter < Starters.size(); ++starter) {
+ Starters[starter]->Shutdown();
+ }
+
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown");
+ for (auto& Job : Jobs) {
+ Job->Schedule();
+ }
+
+ while (!Jobs.empty()) {
+ ShutdownCondVar.WaitI(Lock);
+ }
+ }
+}
+
+EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
+ Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING);
+ Y_VERIFY(!!Impl->Queue);
+
+ if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
+ return MESSAGE_BUSY;
+ }
+
+ TOnMessageContext dummy;
+ Impl->OnMessageReceived(message.Release(), dummy);
+
+ return MESSAGE_OK;
+}
+
+void TModuleServerHandler::OnMessage(TOnMessageContext& msg) {
+ Module->OnMessageReceived(nullptr, msg);
+}
+
+void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) {
+ TJobRunner* job = GetJob(req.Get());
+ Y_ASSERT(job);
+ Y_ASSERT(job->Job->Message != req.Get());
+ job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
+ job->UnRef();
+}
+
+void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
+ TJobRunner* job = GetJob(req.Get());
+ Y_ASSERT(job);
+ Y_ASSERT(job->Job->Message != req.Get());
+ job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK));
+ job->UnRef();
+}
+
+void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
+ TJobRunner* job = GetJob(msg.Get());
+ if (job) {
+ Y_ASSERT(job->Job->Message != msg.Get());
+ job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status));
+ job->UnRef();
+ }
+}
+
+void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
+ Module->OnClientConnectionEvent(event);
+}