diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/oldmodule/module.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.cpp')
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 881 |
1 files changed, 881 insertions, 0 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp new file mode 100644 index 0000000000..24bd778799 --- /dev/null +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -0,0 +1,881 @@ +#include "module.h" + +#include <library/cpp/messagebus/scheduler_actor.h> +#include <library/cpp/messagebus/thread_extra.h> +#include <library/cpp/messagebus/actor/actor.h> +#include <library/cpp/messagebus/actor/queue_in_actor.h> +#include <library/cpp/messagebus/actor/what_thread_does.h> +#include <library/cpp/messagebus/actor/what_thread_does_guard.h> + +#include <util/generic/singleton.h> +#include <util/string/printf.h> +#include <util/system/event.h> + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + +namespace { + Y_POD_STATIC_THREAD(TBusJob*) + ThreadCurrentJob; + + struct TThreadCurrentJobGuard { + TBusJob* Prev; + + TThreadCurrentJobGuard(TBusJob* job) + : Prev(ThreadCurrentJob) + { + Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job); + ThreadCurrentJob = job; + } + ~TThreadCurrentJobGuard() { + ThreadCurrentJob = Prev; + } + }; + + void ClearState(NBus::TJobState* state) { + /// skip sendbacks handlers + if (state->Message != state->Reply) { + if (state->Message) { + delete state->Message; + state->Message = nullptr; + } + + if (state->Reply) { + delete state->Reply; + state->Reply = nullptr; + } + } + } + + void ClearJobStateVector(NBus::TJobStateVec* vec) { + Y_ASSERT(vec); + + for (auto& call : *vec) { + ClearState(&call); + } + + vec->clear(); + } + +} + +namespace NBus { + namespace NPrivate { + class TJobStorage { + }; + + struct TModuleClientHandler + : public IBusClientHandler { + TModuleClientHandler(TBusModuleImpl* module) + : Module(module) + { + } + + void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override; + void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; + void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override; + void OnClientConnectionEvent(const TClientConnectionEvent& event) override; + + TBusModuleImpl* const Module; + }; + + struct TModuleServerHandler + : public IBusServerHandler { + TModuleServerHandler(TBusModuleImpl* module) + : Module(module) + { + } + + void OnMessage(TOnMessageContext& msg) override; + + TBusModuleImpl* const Module; + }; + + struct TBusModuleImpl: public TBusModuleInternal { + TBusModule* const Module; + + TBusMessageQueue* Queue; + + TScheduler Scheduler; + + const char* const Name; + + typedef TList<TJobRunner*> TBusJobList; + /// jobs currently in-flight on this module + TBusJobList Jobs; + /// module level mutex + TMutex Lock; + TCondVar ShutdownCondVar; + TAtomic JobCount; + + enum EState { + CREATED, + RUNNING, + STOPPED, + }; + + TAtomic State; + TBusModuleConfig ModuleConfig; + TBusServerSessionPtr ExternalSession; + /// protocol for local proxy session + THolder<IBusClientHandler> ModuleClientHandler; + THolder<IBusServerHandler> ModuleServerHandler; + TVector<TSimpleSharedPtr<TBusStarter>> Starters; + + // Sessions must be destroyed before + // ModuleClientHandler / ModuleServerHandler + TVector<TBusClientSessionPtr> ClientSessions; + TVector<TBusServerSessionPtr> ServerSessions; + + TBusModuleImpl(TBusModule* module, const char* name) + : Module(module) + , Queue() + , Name(name) + , JobCount(0) + , State(CREATED) + , ExternalSession(nullptr) + , ModuleClientHandler(new TModuleClientHandler(this)) + , ModuleServerHandler(new TModuleServerHandler(this)) + { + } + + ~TBusModuleImpl() override { + // Shutdown cannot be called from destructor, + // because module has virtual methods. + Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor"); + + Scheduler.Stop(); + + while (!Jobs.empty()) { + DestroyJob(Jobs.front()); + } + Y_VERIFY(JobCount == 0, "state check"); + } + + void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&); + + void AddJob(TJobRunner* jobRunner); + + void DestroyJob(TJobRunner* job); + + /// terminate job on this message + void CancelJob(TBusJob* job, EMessageStatus status); + /// prints statuses of jobs + TString GetStatus(unsigned flags); + + size_t Size() const { + return AtomicGet(JobCount); + } + + void Shutdown(); + + TVector<TBusClientSessionPtr> GetClientSessionsInternal() override { + return ClientSessions; + } + + TVector<TBusServerSessionPtr> GetServerSessionsInternal() override { + return ServerSessions; + } + + TBusMessageQueue* GetQueue() override { + return Queue; + } + + TString GetNameInternal() override { + return Name; + } + + TString GetStatusSingleLine() override { + TStringStream ss; + ss << "jobs: " << Size(); + return ss.Str(); + } + + void OnClientConnectionEvent(const TClientConnectionEvent& event) { + Module->OnClientConnectionEvent(event); + } + }; + + struct TJobResponseMessage { + TBusMessage* Request; + TBusMessage* Response; + EMessageStatus Status; + + TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status) + : Request(request) + , Response(response) + , Status(status) + { + } + }; + + struct TJobRunner: public TAtomicRefCount<TJobRunner>, + public NActor::TActor<TJobRunner>, + public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, + public TScheduleActor<TJobRunner> { + THolder<TBusJob> Job; + + TList<TJobRunner*>::iterator JobStorageIterator; + + TJobRunner(TAutoPtr<TBusJob> job) + : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor()) + , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler) + , Job(job.Release()) + , JobStorageIterator() + { + Job->Runner = this; + } + + ~TJobRunner() override { + Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator()); + } + + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) { + Job->CallReplyHandler(message.Status, message.Request, message.Response); + } + + void Destroy() { + if (!!Job->OnMessageContext) { + if (!Job->ReplySent) { + Job->OnMessageContext.ForgetRequest(); + } + } + Job->ModuleImpl->DestroyJob(this); + } + + void Act(NActor::TDefaultTag) { + if (JobStorageIterator == TList<TJobRunner*>::iterator()) { + return; + } + + if (Job->SleepUntil != 0) { + if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) { + Destroy(); + return; + } + } + + TThreadCurrentJobGuard g(Job.Get()); + + NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll(); + + if (Alarm.FetchTask()) { + if (Job->AnyPendingToSend()) { + Y_ASSERT(Job->SleepUntil == 0); + Job->SendPending(); + if (Job->AnyPendingToSend()) { + } + } else { + // regular alarm + Y_ASSERT(Job->Pending.empty()); + Y_ASSERT(Job->SleepUntil != 0); + Job->SleepUntil = 0; + } + } + + for (;;) { + if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) { + TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); + + Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message); + } + + if (Job->SleepUntil != 0) { + ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil)); + return; + } + + Job->SendPending(); + + if (Job->AnyPendingToSend()) { + ScheduleAt(TInstant::Now() + TDuration::Seconds(1)); + return; + } + + if (!Job->Pending.empty()) { + // waiting replies + return; + } + + if (Job->IsDone()) { + Destroy(); + return; + } + } + } + }; + + } + + static inline TJobRunner* GetJob(TBusMessage* message) { + return (TJobRunner*)message->Data; + } + + static inline void SetJob(TBusMessage* message, TJobRunner* job) { + message->Data = job; + } + + TBusJob::TBusJob(TBusModule* module, TBusMessage* message) + : Status(MESSAGE_OK) + , Runner() + , Message(message) + , ReplySent(false) + , Module(module) + , ModuleImpl(module->Impl.Get()) + , SleepUntil(0) + { + Handler = TJobHandler(&TBusModule::Start); + } + + TBusJob::~TBusJob() { + Y_ASSERT(Pending.size() == 0); + //Y_ASSERT(SleepUntil == 0); + + ClearAllMessageStates(); + } + + TNetAddr TBusJob::GetPeerAddrNetAddr() const { + Y_VERIFY(!!OnMessageContext); + return OnMessageContext.GetPeerAddrNetAddr(); + } + + void TBusJob::CheckThreadCurrentJob() { + Y_ASSERT(ThreadCurrentJob == this); + } + + ///////////////////////////////////////////////////////// + /// \brief Send messages in pending list + + /// If at least one message is gone return true + /// If message has not been send, move it to Finished with appropriate error code + bool TBusJob::SendPending() { + // Iterator type must be size_t, not vector::iterator, + // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector, + // that in turn invalidates iterator. + // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending` + // (not erases, and not inserts) so iteration by index is valid. + size_t it = 0; + while (it != Pending.size()) { + TJobState& call = Pending[it]; + + if (call.Status == MESSAGE_DONT_ASK) { + EMessageStatus getAddressStatus = MESSAGE_OK; + TNetAddr addr; + if (call.UseAddr) { + addr = call.Addr; + } else { + getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr); + } + + if (getAddressStatus == MESSAGE_OK) { + // hold extra reference for each request in flight + Runner->Ref(); + + if (call.OneWay) { + call.Status = call.Session->SendMessageOneWay(call.Message, &addr); + } else { + call.Status = call.Session->SendMessage(call.Message, &addr); + } + + if (call.Status != MESSAGE_OK) { + Runner->UnRef(); + } + + } else { + call.Status = getAddressStatus; + } + } + + if (call.Status == MESSAGE_OK) { + ++it; // keep pending list until we get reply + } else if (call.Status == MESSAGE_BUSY) { + Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight"); + } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) { + ++it; // try up to call.MaxRetries times to send message + call.NumRetries++; + DoCallReplyHandler(call); + call.Status = MESSAGE_DONT_ASK; + call.Message->Reset(); // generate new Id + } else { + Finished.push_back(call); + DoCallReplyHandler(call); + Pending.erase(Pending.begin() + it); + } + } + return Pending.size() > 0; + } + + bool TBusJob::AnyPendingToSend() { + for (unsigned i = 0; i < Pending.size(); ++i) { + if (Pending[i].Status == MESSAGE_DONT_ASK) { + return true; + } + } + + return false; + } + + bool TBusJob::IsDone() { + bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); + return r; + } + + void TBusJob::CallJobHandlerOnly() { + TThreadCurrentJobGuard threadCurrentJobGuard(this); + TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); + + Handler = Handler(ModuleImpl->Module, this, Message); + } + + bool TBusJob::CallJobHandler() { + /// go on as far as we can go without waiting + while (!IsDone()) { + /// call the handler + CallJobHandlerOnly(); + + /// quit if job is canceled + if (Status != MESSAGE_OK) { + break; + } + + /// there are messages to send and wait for reply + SendPending(); + + if (!Pending.empty()) { + break; + } + + /// asked to sleep + if (SleepUntil) { + break; + } + } + + Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent), + "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d", + Message->GetHeader()->Id, Message->GetHeader()->Type); + + return IsDone(); + } + + void TBusJob::DoCallReplyHandler(TJobState& call) { + if (call.Handler) { + TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)"); + + TThreadCurrentJobGuard threadCurrentJobGuard(this); + (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); + } + } + + int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + /// find handler for given message and update it's status + size_t i = 0; + for (; i < Pending.size(); ++i) { + TJobState& call = Pending[i]; + if (call.Message == mess) { + break; + } + } + + /// if not found, report error + if (i == Pending.size()) { + Y_FAIL("must not happen"); + } + + /// fill in response into job state + TJobState& call = Pending[i]; + call.Status = status; + Y_ASSERT(call.Message == mess); + call.Reply = reply; + + if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { + call.NumRetries++; + call.Status = MESSAGE_DONT_ASK; + call.Message->Reset(); // generate new Id + DoCallReplyHandler(call); + return 0; + } + + /// call the handler if provided + DoCallReplyHandler(call); + + /// move job state into the finished stack + Finished.push_back(Pending[i]); + Pending.erase(Pending.begin() + i); + + return 0; + } + + /////////////////////////////////////////////////////////////// + /// send message to any other session or application + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { + CheckThreadCurrentJob(); + + SetJob(mess.Get(), Runner); + Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); + } + + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { + CheckThreadCurrentJob(); + + SetJob(mess.Get(), Runner); + Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); + } + + void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { + CheckThreadCurrentJob(); + + SetJob(req.Get(), Runner); + Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); + } + + void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) { + CheckThreadCurrentJob(); + + SetJob(req.Get(), Runner); + Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true)); + } + + /////////////////////////////////////////////////////////////// + /// send reply to the starter message + void TBusJob::SendReply(TBusMessageAutoPtr reply) { + CheckThreadCurrentJob(); + + Y_VERIFY(!ReplySent, "cannot call SendReply twice"); + ReplySent = true; + if (!OnMessageContext) + return; + + EMessageStatus ok = OnMessageContext.SendReplyMove(reply); + if (ok != MESSAGE_OK) { + // TODO: count errors + } + } + + /// set the flag to terminate job at the earliest convenience + void TBusJob::Cancel(EMessageStatus status) { + CheckThreadCurrentJob(); + + Status = status; + } + + void TBusJob::ClearState(TJobState& call) { + TJobStateVec::iterator it; + for (it = Finished.begin(); it != Finished.end(); ++it) { + TJobState& state = *it; + if (&call == &state) { + ::ClearState(&call); + Finished.erase(it); + return; + } + } + Y_ASSERT(0); + } + + void TBusJob::ClearAllMessageStates() { + ClearJobStateVector(&Finished); + ClearJobStateVector(&Pending); + } + + void TBusJob::Sleep(int milliSeconds) { + CheckThreadCurrentJob(); + + Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job"); + Y_VERIFY(SleepUntil == 0, "must not override sleep"); + + SleepUntil = Now() + milliSeconds; + } + + TString TBusJob::GetStatus(unsigned flags) { + TString strReturn; + strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", + Message->GetHeader()->Id, + (int)Message->GetHeader()->Type, + (int)(Now() - Message->GetHeader()->SendTime) / 1000, + (int)Pending.size(), + (int)Finished.size(), + Status != MESSAGE_OK ? ToString(Status).data() : ""); + + TJobStateVec::iterator it; + for (it = Pending.begin(); it != Pending.end(); ++it) { + TJobState& call = *it; + strReturn += call.GetStatus(flags); + } + return strReturn; + } + + TString TJobState::GetStatus(unsigned flags) { + Y_UNUSED(flags); + TString strReturn; + strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n", + Message->GetHeader()->Id, + (int)Message->GetHeader()->Type, + Session->GetProto()->GetService(), + (int)(Now() - Message->GetHeader()->SendTime) / 1000, + ToString(Status).data()); + return strReturn; + } + + ////////////////////////////////////////////////////////////////////// + + void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) { + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob"); + if (job) { + job->Cancel(status); + } + } + + TString TBusModuleImpl::GetStatus(unsigned flags) { + Y_UNUSED(flags); + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); + TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size()); + for (auto job : Jobs) { + //strReturn += job->Job->GetStatus(flags); + Y_UNUSED(job); + strReturn += "TODO\n"; + } + return strReturn; + } + + TBusModuleConfig::TBusModuleConfig() + : StarterMaxInFlight(1000) + { + } + + TBusModuleConfig::TSecret::TSecret() + : SchedulePeriod(TDuration::Seconds(1)) + { + } + + TBusModule::TBusModule(const char* name) + : Impl(new TBusModuleImpl(this, name)) + { + } + + TBusModule::~TBusModule() { + } + + const char* TBusModule::GetName() const { + return Impl->Name; + } + + void TBusModule::SetConfig(const TBusModuleConfig& config) { + Impl->ModuleConfig = config; + } + + bool TBusModule::StartInput() { + Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check"); + Y_VERIFY(!!Impl->Queue, "state check"); + Impl->State = TBusModuleImpl::RUNNING; + + Y_ASSERT(!Impl->ExternalSession); + TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue); + if (extSession != nullptr) { + Impl->ExternalSession = extSession; + } + + return true; + } + + bool TBusModule::Shutdown() { + Impl->Shutdown(); + + return true; + } + + TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) { + TBusJob* job = new TBusJob(this, message); + return job; + } + + /** +Example for external session creation: + +TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { + TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig); + session->RegisterService(hostname, begin, end); + return session; +*/ + + bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) { + Impl->Queue = queue; + return true; + } + + int TBusModule::GetModuleSessionInFlight() const { + return Impl->Size(); + } + + TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() { + return Impl.Get(); + } + + TBusServerSessionPtr TBusModule::CreateDefaultDestination( + TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) { + TBusServerSessionConfig patchedConfig = config; + patchedConfig.ExecuteOnMessageInWorkerPool = false; + if (!patchedConfig.Name) { + patchedConfig.Name = name; + } + if (!patchedConfig.Name) { + patchedConfig.Name = Impl->Name; + } + TBusServerSessionPtr session = + TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue); + Impl->ServerSessions.push_back(session); + return session; + } + + TBusClientSessionPtr TBusModule::CreateDefaultSource( + TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) { + TBusClientSessionConfig patchedConfig = config; + patchedConfig.ExecuteOnReplyInWorkerPool = false; + if (!patchedConfig.Name) { + patchedConfig.Name = name; + } + if (!patchedConfig.Name) { + patchedConfig.Name = Impl->Name; + } + TBusClientSessionPtr session = + TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue); + Impl->ClientSessions.push_back(session); + return session; + } + + TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) { + TBusStarter* session = new TBusStarter(this, config); + Impl->Starters.push_back(session); + return session; + } + + void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) { + Y_UNUSED(event); + } + + TString TBusModule::GetStatus(unsigned flags) { + TString strReturn = Sprintf("%s\n", Impl->Name); + strReturn += Impl->GetStatus(flags); + return strReturn; + } + +} + +void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob"); + Jobs.push_back(jobRunner); + jobRunner->JobStorageIterator = Jobs.end(); + --jobRunner->JobStorageIterator; +} + +void TBusModuleImpl::DestroyJob(TJobRunner* job) { + Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator()); + + { + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob"); + int jobCount = AtomicDecrement(JobCount); + Y_VERIFY(jobCount >= 0, "decremented too much"); + Jobs.erase(job->JobStorageIterator); + + if (AtomicGet(State) == STOPPED) { + if (jobCount == 0) { + ShutdownCondVar.BroadCast(); + } + } + } + + job->JobStorageIterator = TList<TJobRunner*>::iterator(); +} + +void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) { + TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage(); + Y_VERIFY(!!msg); + + THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg))); + jobRunner->Job->MessageHolder.Reset(msg0.Release()); + jobRunner->Job->OnMessageContext.Swap(context); + SetJob(jobRunner->Job->Message, jobRunner.Get()); + + AtomicIncrement(JobCount); + + AddJob(jobRunner.Get()); + + jobRunner.Release()->Schedule(); +} + +void TBusModuleImpl::Shutdown() { + if (AtomicGet(State) != TBusModuleImpl::RUNNING) { + AtomicSet(State, TBusModuleImpl::STOPPED); + return; + } + AtomicSet(State, TBusModuleImpl::STOPPED); + + for (auto& clientSession : ClientSessions) { + clientSession->Shutdown(); + } + for (auto& serverSession : ServerSessions) { + serverSession->Shutdown(); + } + + for (size_t starter = 0; starter < Starters.size(); ++starter) { + Starters[starter]->Shutdown(); + } + + { + TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown"); + for (auto& Job : Jobs) { + Job->Schedule(); + } + + while (!Jobs.empty()) { + ShutdownCondVar.WaitI(Lock); + } + } +} + +EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) { + Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING); + Y_VERIFY(!!Impl->Queue); + + if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) { + return MESSAGE_BUSY; + } + + TOnMessageContext dummy; + Impl->OnMessageReceived(message.Release(), dummy); + + return MESSAGE_OK; +} + +void TModuleServerHandler::OnMessage(TOnMessageContext& msg) { + Module->OnMessageReceived(nullptr, msg); +} + +void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) { + TJobRunner* job = GetJob(req.Get()); + Y_ASSERT(job); + Y_ASSERT(job->Job->Message != req.Get()); + job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK)); + job->UnRef(); +} + +void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) { + TJobRunner* job = GetJob(req.Get()); + Y_ASSERT(job); + Y_ASSERT(job->Job->Message != req.Get()); + job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK)); + job->UnRef(); +} + +void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) { + TJobRunner* job = GetJob(msg.Get()); + if (job) { + Y_ASSERT(job->Job->Message != msg.Get()); + job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status)); + job->UnRef(); + } +} + +void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) { + Module->OnClientConnectionEvent(event); +} |