diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/oldmodule | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 380 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.h | 62 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/ya.make | 24 |
4 files changed, 234 insertions, 234 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..ccebcfb7cc 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -1,4 +1,4 @@ -#include "module.h" +#include "module.h" #include <library/cpp/messagebus/scheduler_actor.h> #include <library/cpp/messagebus/thread_extra.h> @@ -9,19 +9,19 @@ #include <util/generic/singleton.h> #include <util/string/printf.h> -#include <util/system/event.h> - -using namespace NActor; -using namespace NBus; -using namespace NBus::NPrivate; +#include <util/system/event.h> +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + namespace { Y_POD_STATIC_THREAD(TBusJob*) ThreadCurrentJob; struct TThreadCurrentJobGuard { TBusJob* Prev; - + TThreadCurrentJobGuard(TBusJob* job) : Prev(ThreadCurrentJob) { @@ -32,7 +32,7 @@ namespace { ThreadCurrentJob = Prev; } }; - + void ClearState(NBus::TJobState* state) { /// skip sendbacks handlers if (state->Message != state->Reply) { @@ -46,11 +46,11 @@ namespace { state->Reply = nullptr; } } - } - + } + void ClearJobStateVector(NBus::TJobStateVec* vec) { Y_ASSERT(vec); - + for (auto& call : *vec) { ClearState(&call); } @@ -71,12 +71,12 @@ namespace NBus { : Module(module) { } - + void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override; void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override; void OnClientConnectionEvent(const TClientConnectionEvent& event) override; - + TBusModuleImpl* const Module; }; @@ -94,13 +94,13 @@ namespace NBus { struct TBusModuleImpl: public TBusModuleInternal { TBusModule* const Module; - + TBusMessageQueue* Queue; - + TScheduler Scheduler; - + const char* const Name; - + typedef TList<TJobRunner*> TBusJobList; /// jobs currently in-flight on this module TBusJobList Jobs; @@ -108,13 +108,13 @@ namespace NBus { TMutex Lock; TCondVar ShutdownCondVar; TAtomic JobCount; - + enum EState { CREATED, RUNNING, STOPPED, }; - + TAtomic State; TBusModuleConfig ModuleConfig; TBusServerSessionPtr ExternalSession; @@ -122,12 +122,12 @@ namespace NBus { THolder<IBusClientHandler> ModuleClientHandler; THolder<IBusServerHandler> ModuleServerHandler; TVector<TSimpleSharedPtr<TBusStarter>> Starters; - + // Sessions must be destroyed before // ModuleClientHandler / ModuleServerHandler TVector<TBusClientSessionPtr> ClientSessions; TVector<TBusServerSessionPtr> ServerSessions; - + TBusModuleImpl(TBusModule* module, const char* name) : Module(module) , Queue() @@ -139,12 +139,12 @@ namespace NBus { , ModuleServerHandler(new TModuleServerHandler(this)) { } - + ~TBusModuleImpl() override { // Shutdown cannot be called from destructor, // because module has virtual methods. Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor"); - + Scheduler.Stop(); while (!Jobs.empty()) { @@ -152,56 +152,56 @@ namespace NBus { } Y_VERIFY(JobCount == 0, "state check"); } - + void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&); - + void AddJob(TJobRunner* jobRunner); - + void DestroyJob(TJobRunner* job); - + /// terminate job on this message void CancelJob(TBusJob* job, EMessageStatus status); /// prints statuses of jobs TString GetStatus(unsigned flags); - + size_t Size() const { return AtomicGet(JobCount); } - + void Shutdown(); - + TVector<TBusClientSessionPtr> GetClientSessionsInternal() override { return ClientSessions; } - + TVector<TBusServerSessionPtr> GetServerSessionsInternal() override { return ServerSessions; } - + TBusMessageQueue* GetQueue() override { return Queue; } - + TString GetNameInternal() override { return Name; } - + TString GetStatusSingleLine() override { TStringStream ss; ss << "jobs: " << Size(); return ss.Str(); } - + void OnClientConnectionEvent(const TClientConnectionEvent& event) { Module->OnClientConnectionEvent(event); } }; - + struct TJobResponseMessage { TBusMessage* Request; TBusMessage* Response; EMessageStatus Status; - + TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status) : Request(request) , Response(response) @@ -215,9 +215,9 @@ namespace NBus { public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, public TScheduleActor<TJobRunner> { THolder<TBusJob> Job; - + TList<TJobRunner*>::iterator JobStorageIterator; - + TJobRunner(TAutoPtr<TBusJob> job) : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor()) , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler) @@ -226,15 +226,15 @@ namespace NBus { { Job->Runner = this; } - + ~TJobRunner() override { Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator()); } - + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) { Job->CallReplyHandler(message.Status, message.Request, message.Response); } - + void Destroy() { if (!!Job->OnMessageContext) { if (!Job->ReplySent) { @@ -242,24 +242,24 @@ namespace NBus { } } Job->ModuleImpl->DestroyJob(this); - } - + } + void Act(NActor::TDefaultTag) { if (JobStorageIterator == TList<TJobRunner*>::iterator()) { return; } - + if (Job->SleepUntil != 0) { if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) { Destroy(); return; } } - + TThreadCurrentJobGuard g(Job.Get()); - + NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll(); - + if (Alarm.FetchTask()) { if (Job->AnyPendingToSend()) { Y_ASSERT(Job->SleepUntil == 0); @@ -272,50 +272,50 @@ namespace NBus { Y_ASSERT(Job->SleepUntil != 0); Job->SleepUntil = 0; } - } - + } + for (;;) { if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) { TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); - + Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message); } - + if (Job->SleepUntil != 0) { ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil)); return; } - + Job->SendPending(); - + if (Job->AnyPendingToSend()) { ScheduleAt(TInstant::Now() + TDuration::Seconds(1)); return; } - + if (!Job->Pending.empty()) { // waiting replies return; } - + if (Job->IsDone()) { Destroy(); return; } } - } + } }; - + } - + static inline TJobRunner* GetJob(TBusMessage* message) { return (TJobRunner*)message->Data; - } - + } + static inline void SetJob(TBusMessage* message, TJobRunner* job) { message->Data = job; } - + TBusJob::TBusJob(TBusModule* module, TBusMessage* message) : Status(MESSAGE_OK) , Runner() @@ -346,7 +346,7 @@ namespace NBus { ///////////////////////////////////////////////////////// /// \brief Send messages in pending list - + /// If at least one message is gone return true /// If message has not been send, move it to Finished with appropriate error code bool TBusJob::SendPending() { @@ -358,7 +358,7 @@ namespace NBus { size_t it = 0; while (it != Pending.size()) { TJobState& call = Pending[it]; - + if (call.Status == MESSAGE_DONT_ASK) { EMessageStatus getAddressStatus = MESSAGE_OK; TNetAddr addr; @@ -371,22 +371,22 @@ namespace NBus { if (getAddressStatus == MESSAGE_OK) { // hold extra reference for each request in flight Runner->Ref(); - + if (call.OneWay) { call.Status = call.Session->SendMessageOneWay(call.Message, &addr); } else { call.Status = call.Session->SendMessage(call.Message, &addr); } - + if (call.Status != MESSAGE_OK) { Runner->UnRef(); } - - } else { + + } else { call.Status = getAddressStatus; - } + } } - + if (call.Status == MESSAGE_OK) { ++it; // keep pending list until we get reply } else if (call.Status == MESSAGE_BUSY) { @@ -397,11 +397,11 @@ namespace NBus { DoCallReplyHandler(call); call.Status = MESSAGE_DONT_ASK; call.Message->Reset(); // generate new Id - } else { + } else { Finished.push_back(call); DoCallReplyHandler(call); Pending.erase(Pending.begin() + it); - } + } } return Pending.size() > 0; } @@ -419,12 +419,12 @@ namespace NBus { bool TBusJob::IsDone() { bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); return r; - } - + } + void TBusJob::CallJobHandlerOnly() { TThreadCurrentJobGuard threadCurrentJobGuard(this); TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); - + Handler = Handler(ModuleImpl->Module, this, Message); } @@ -438,31 +438,31 @@ namespace NBus { if (Status != MESSAGE_OK) { break; } - + /// there are messages to send and wait for reply SendPending(); - + if (!Pending.empty()) { break; } - + /// asked to sleep if (SleepUntil) { break; } } - + Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent), "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d", Message->GetHeader()->Id, Message->GetHeader()->Type); - + return IsDone(); } - + void TBusJob::DoCallReplyHandler(TJobState& call) { if (call.Handler) { TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)"); - + TThreadCurrentJobGuard threadCurrentJobGuard(this); (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); } @@ -477,7 +477,7 @@ namespace NBus { break; } } - + /// if not found, report error if (i == Pending.size()) { Y_FAIL("must not happen"); @@ -496,7 +496,7 @@ namespace NBus { DoCallReplyHandler(call); return 0; } - + /// call the handler if provided DoCallReplyHandler(call); @@ -515,50 +515,50 @@ namespace NBus { SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); } - + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { CheckThreadCurrentJob(); SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); } - + void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { CheckThreadCurrentJob(); SetJob(req.Get(), Runner); Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); } - + void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) { CheckThreadCurrentJob(); - + SetJob(req.Get(), Runner); Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true)); } - + /////////////////////////////////////////////////////////////// /// send reply to the starter message void TBusJob::SendReply(TBusMessageAutoPtr reply) { CheckThreadCurrentJob(); - + Y_VERIFY(!ReplySent, "cannot call SendReply twice"); ReplySent = true; if (!OnMessageContext) return; - + EMessageStatus ok = OnMessageContext.SendReplyMove(reply); if (ok != MESSAGE_OK) { // TODO: count errors } } - + /// set the flag to terminate job at the earliest convenience void TBusJob::Cancel(EMessageStatus status) { CheckThreadCurrentJob(); - + Status = status; - } + } void TBusJob::ClearState(TJobState& call) { TJobStateVec::iterator it; @@ -580,10 +580,10 @@ namespace NBus { void TBusJob::Sleep(int milliSeconds) { CheckThreadCurrentJob(); - + Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job"); Y_VERIFY(SleepUntil == 0, "must not override sleep"); - + SleepUntil = Now() + milliSeconds; } @@ -642,12 +642,12 @@ namespace NBus { : StarterMaxInFlight(1000) { } - + TBusModuleConfig::TSecret::TSecret() : SchedulePeriod(TDuration::Seconds(1)) { } - + TBusModule::TBusModule(const char* name) : Impl(new TBusModuleImpl(this, name)) { @@ -659,16 +659,16 @@ namespace NBus { const char* TBusModule::GetName() const { return Impl->Name; } - + void TBusModule::SetConfig(const TBusModuleConfig& config) { Impl->ModuleConfig = config; } - + bool TBusModule::StartInput() { Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check"); Y_VERIFY(!!Impl->Queue, "state check"); Impl->State = TBusModuleImpl::RUNNING; - + Y_ASSERT(!Impl->ExternalSession); TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue); if (extSession != nullptr) { @@ -676,14 +676,14 @@ namespace NBus { } return true; - } + } bool TBusModule::Shutdown() { Impl->Shutdown(); return true; } - + TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) { TBusJob* job = new TBusJob(this, message); return job; @@ -706,11 +706,11 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { int TBusModule::GetModuleSessionInFlight() const { return Impl->Size(); } - + TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() { return Impl.Get(); } - + TBusServerSessionPtr TBusModule::CreateDefaultDestination( TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) { TBusServerSessionConfig patchedConfig = config; @@ -725,7 +725,7 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue); Impl->ServerSessions.push_back(session); return session; - } + } TBusClientSessionPtr TBusModule::CreateDefaultSource( TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) { @@ -741,140 +741,140 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue); Impl->ClientSessions.push_back(session); return session; - } - + } + TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) { TBusStarter* session = new TBusStarter(this, config); Impl->Starters.push_back(session); return session; - } + } void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) { Y_UNUSED(event); - } - + } + TString TBusModule::GetStatus(unsigned flags) { TString strReturn = Sprintf("%s\n", Impl->Name); strReturn += Impl->GetStatus(flags); return strReturn; } - + } void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { - TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob"); - Jobs.push_back(jobRunner); - jobRunner->JobStorageIterator = Jobs.end(); - --jobRunner->JobStorageIterator; -} - + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob"); + Jobs.push_back(jobRunner); + jobRunner->JobStorageIterator = Jobs.end(); + --jobRunner->JobStorageIterator; +} + void TBusModuleImpl::DestroyJob(TJobRunner* job) { Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator()); - - { - TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob"); - int jobCount = AtomicDecrement(JobCount); + + { + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob"); + int jobCount = AtomicDecrement(JobCount); Y_VERIFY(jobCount >= 0, "decremented too much"); - Jobs.erase(job->JobStorageIterator); - + Jobs.erase(job->JobStorageIterator); + if (AtomicGet(State) == STOPPED) { - if (jobCount == 0) { - ShutdownCondVar.BroadCast(); - } + if (jobCount == 0) { + ShutdownCondVar.BroadCast(); + } } } - + job->JobStorageIterator = TList<TJobRunner*>::iterator(); } -void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) { - TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage(); +void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) { + TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage(); Y_VERIFY(!!msg); - - THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg))); - jobRunner->Job->MessageHolder.Reset(msg0.Release()); - jobRunner->Job->OnMessageContext.Swap(context); - SetJob(jobRunner->Job->Message, jobRunner.Get()); - - AtomicIncrement(JobCount); - - AddJob(jobRunner.Get()); - - jobRunner.Release()->Schedule(); -} - -void TBusModuleImpl::Shutdown() { + + THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg))); + jobRunner->Job->MessageHolder.Reset(msg0.Release()); + jobRunner->Job->OnMessageContext.Swap(context); + SetJob(jobRunner->Job->Message, jobRunner.Get()); + + AtomicIncrement(JobCount); + + AddJob(jobRunner.Get()); + + jobRunner.Release()->Schedule(); +} + +void TBusModuleImpl::Shutdown() { if (AtomicGet(State) != TBusModuleImpl::RUNNING) { AtomicSet(State, TBusModuleImpl::STOPPED); - return; - } + return; + } AtomicSet(State, TBusModuleImpl::STOPPED); - + for (auto& clientSession : ClientSessions) { clientSession->Shutdown(); - } + } for (auto& serverSession : ServerSessions) { serverSession->Shutdown(); - } - - for (size_t starter = 0; starter < Starters.size(); ++starter) { - Starters[starter]->Shutdown(); - } - - { - TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown"); + } + + for (size_t starter = 0; starter < Starters.size(); ++starter) { + Starters[starter]->Shutdown(); + } + + { + TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown"); for (auto& Job : Jobs) { Job->Schedule(); - } - - while (!Jobs.empty()) { - ShutdownCondVar.WaitI(Lock); - } - } + } + + while (!Jobs.empty()) { + ShutdownCondVar.WaitI(Lock); + } + } } -EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) { +EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) { Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING); Y_VERIFY(!!Impl->Queue); - + if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) { - return MESSAGE_BUSY; - } - - TOnMessageContext dummy; - Impl->OnMessageReceived(message.Release(), dummy); - - return MESSAGE_OK; -} - -void TModuleServerHandler::OnMessage(TOnMessageContext& msg) { + return MESSAGE_BUSY; + } + + TOnMessageContext dummy; + Impl->OnMessageReceived(message.Release(), dummy); + + return MESSAGE_OK; +} + +void TModuleServerHandler::OnMessage(TOnMessageContext& msg) { Module->OnMessageReceived(nullptr, msg); -} - -void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) { - TJobRunner* job = GetJob(req.Get()); +} + +void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) { + TJobRunner* job = GetJob(req.Get()); Y_ASSERT(job); Y_ASSERT(job->Job->Message != req.Get()); - job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK)); - job->UnRef(); + job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK)); + job->UnRef(); } -void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) { - TJobRunner* job = GetJob(req.Get()); +void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) { + TJobRunner* job = GetJob(req.Get()); Y_ASSERT(job); Y_ASSERT(job->Job->Message != req.Get()); job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK)); - job->UnRef(); + job->UnRef(); } - -void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) { - TJobRunner* job = GetJob(msg.Get()); - if (job) { + +void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) { + TJobRunner* job = GetJob(msg.Get()); + if (job) { Y_ASSERT(job->Job->Message != msg.Get()); job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status)); - job->UnRef(); - } -} + job->UnRef(); + } +} void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) { Module->OnClientConnectionEvent(event); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 8d1c4a5d52..ead001480c 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -40,7 +40,7 @@ /// error (not MESSAGE_OK) #include "startsession.h" - + #include <library/cpp/messagebus/ybus.h> #include <util/generic/noncopyable.h> @@ -62,7 +62,7 @@ namespace NBus { protected: typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess); TBusHandlerPtr MyPtr; - + public: template <class B> TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) { @@ -107,7 +107,7 @@ namespace NBus { TNetAddr Addr; bool UseAddr; bool OneWay; - + private: TJobState(TReplyHandler handler, EMessageStatus status, @@ -126,7 +126,7 @@ namespace NBus { Addr = *addr; } UseAddr = !!addr; - } + } public: TString GetStatus(unsigned flags); @@ -140,10 +140,10 @@ namespace NBus { /// Maintains internal state of document in computation class TBusJob { TObjectCounter<TBusJob> ObjectCounter; - + private: void CheckThreadCurrentJob(); - + public: /// given a module and starter message TBusJob(TBusModule* module, TBusMessage* message); @@ -154,9 +154,9 @@ namespace NBus { TBusMessage* GetMessage() const { return Message; } - + TNetAddr GetPeerAddrNetAddr() const; - + /// send message to any other session or application /// If addr is set then use it as destination. void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr); @@ -164,7 +164,7 @@ namespace NBus { void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr); void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session); - + /// send reply to the starter message virtual void SendReply(TBusMessageAutoPtr reply); @@ -186,7 +186,7 @@ namespace NBus { Y_ASSERT(stateVec); *stateVec = Pending; } - + /// helper function to find state of previously sent messages template <class MessageType> TJobState* GetState(int* startFrom = nullptr) { @@ -275,18 +275,18 @@ namespace NBus { void Sleep(int milliSeconds); void CallJobHandlerOnly(); - + private: bool CallJobHandler(); void DoCallReplyHandler(TJobState&); /// send out all Pending jobs, failed sends will be migrated to Finished bool SendPending(); bool AnyPendingToSend(); - + public: /// helper to call from OnReply() and OnError() int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply); - + public: TJobHandler Handler; ///< job handler to be executed within next CallJobHandler() EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap @@ -315,48 +315,48 @@ namespace NBus { //////////////////////////////////////////////////////////////////// /// \brief Classes to implement basic module functionality - + class IJobFactory { protected: virtual ~IJobFactory() { } - + public: /// job factory method, override to create custom jobs virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0; - }; - + }; + struct TBusModuleConfig { unsigned StarterMaxInFlight; - + struct TSecret { TDuration SchedulePeriod; - + TSecret(); }; TSecret Secret; - + TBusModuleConfig(); }; - + namespace NPrivate { struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> { virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0; virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0; virtual TBusMessageQueue* GetQueue() = 0; - + virtual TString GetNameInternal() = 0; - + virtual TString GetStatusSingleLine() = 0; - + virtual ~TBusModuleInternal() { } }; } - + class TBusModule: public IJobFactory, TNonCopyable { friend class TBusJob; - + TObjectCounter<TBusModule> ObjectCounter; TIntrusivePtr<NPrivate::TBusModuleImpl> Impl; @@ -365,7 +365,7 @@ namespace NBus { /// Each module should have a name which is used as protocol service TBusModule(const char* name); ~TBusModule() override; - + const char* GetName() const; void SetConfig(const TBusModuleConfig& config); @@ -377,17 +377,17 @@ namespace NBus { virtual bool StartInput(); /// called when application is about to exit virtual bool Shutdown(); - + // this default implementation just creates TBusJob object TBusJob* CreateJobInstance(TBusMessage* message) override; EMessageStatus StartJob(TAutoPtr<TBusMessage> message); - + /// creates private sessions, calls CreateExtSession(), should be called before StartInput() bool CreatePrivateSessions(TBusMessageQueue* queue); - + virtual void OnClientConnectionEvent(const TClientConnectionEvent& event); - + public: /// entry point into module, first function to call virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0; diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h index 5e26e7e1e5..864f82b316 100644 --- a/library/cpp/messagebus/oldmodule/startsession.h +++ b/library/cpp/messagebus/oldmodule/startsession.h @@ -15,7 +15,7 @@ namespace NBus { bool Exiting; TCondVar ExitSignal; TMutex ExitLock; - + static void* _starter(void* data); void Starter(); diff --git a/library/cpp/messagebus/oldmodule/ya.make b/library/cpp/messagebus/oldmodule/ya.make index ca5eae74f0..03ff8a46ac 100644 --- a/library/cpp/messagebus/oldmodule/ya.make +++ b/library/cpp/messagebus/oldmodule/ya.make @@ -1,15 +1,15 @@ -LIBRARY() - +LIBRARY() + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/messagebus library/cpp/messagebus/actor -) - -SRCS( - module.cpp - startsession.cpp -) - -END() +) + +SRCS( + module.cpp + startsession.cpp +) + +END() |