diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/oldmodule | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 1332 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.h | 668 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.cpp | 74 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/startsession.h | 54 |
4 files changed, 1064 insertions, 1064 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..f423214aa3 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -16,680 +16,680 @@ using namespace NBus; using namespace NBus::NPrivate; namespace { - Y_POD_STATIC_THREAD(TBusJob*) - ThreadCurrentJob; - - struct TThreadCurrentJobGuard { - TBusJob* Prev; - - TThreadCurrentJobGuard(TBusJob* job) - : Prev(ThreadCurrentJob) - { - Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job); - ThreadCurrentJob = job; - } - ~TThreadCurrentJobGuard() { - ThreadCurrentJob = Prev; - } - }; - - void ClearState(NBus::TJobState* state) { - /// skip sendbacks handlers - if (state->Message != state->Reply) { - if (state->Message) { - delete state->Message; - state->Message = nullptr; - } - - if (state->Reply) { - delete state->Reply; - state->Reply = nullptr; - } - } - } - - void ClearJobStateVector(NBus::TJobStateVec* vec) { - Y_ASSERT(vec); - - for (auto& call : *vec) { - ClearState(&call); + Y_POD_STATIC_THREAD(TBusJob*) + ThreadCurrentJob; + + struct TThreadCurrentJobGuard { + TBusJob* Prev; + + TThreadCurrentJobGuard(TBusJob* job) + : Prev(ThreadCurrentJob) + { + Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job); + ThreadCurrentJob = job; + } + ~TThreadCurrentJobGuard() { + ThreadCurrentJob = Prev; + } + }; + + void ClearState(NBus::TJobState* state) { + /// skip sendbacks handlers + if (state->Message != state->Reply) { + if (state->Message) { + delete state->Message; + state->Message = nullptr; + } + + if (state->Reply) { + delete state->Reply; + state->Reply = nullptr; + } + } + } + + void ClearJobStateVector(NBus::TJobStateVec* vec) { + Y_ASSERT(vec); + + for (auto& call : *vec) { + ClearState(&call); } - vec->clear(); + vec->clear(); } } namespace NBus { - namespace NPrivate { - class TJobStorage { - }; - - struct TModuleClientHandler - : public IBusClientHandler { - TModuleClientHandler(TBusModuleImpl* module) - : Module(module) - { + namespace NPrivate { + class TJobStorage { + }; + + struct TModuleClientHandler + : public IBusClientHandler { + TModuleClientHandler(TBusModuleImpl* module) + : Module(module) + { + } + + void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override; + void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; + void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override; + void OnClientConnectionEvent(const TClientConnectionEvent& event) override; + + TBusModuleImpl* const Module; + }; + + struct TModuleServerHandler + : public IBusServerHandler { + TModuleServerHandler(TBusModuleImpl* module) + : Module(module) + { + } + + void OnMessage(TOnMessageContext& msg) override; + + TBusModuleImpl* const Module; + }; + + struct TBusModuleImpl: public TBusModuleInternal { + TBusModule* const Module; + + TBusMessageQueue* Queue; + + TScheduler Scheduler; + + const char* const Name; + + typedef TList<TJobRunner*> TBusJobList; + /// jobs currently in-flight on this module + TBusJobList Jobs; + /// module level mutex + TMutex Lock; + TCondVar ShutdownCondVar; + TAtomic JobCount; + + enum EState { + CREATED, + RUNNING, + STOPPED, + }; + + TAtomic State; + TBusModuleConfig ModuleConfig; + TBusServerSessionPtr ExternalSession; + /// protocol for local proxy session + THolder<IBusClientHandler> ModuleClientHandler; + THolder<IBusServerHandler> ModuleServerHandler; + TVector<TSimpleSharedPtr<TBusStarter>> Starters; + + // Sessions must be destroyed before + // ModuleClientHandler / ModuleServerHandler + TVector<TBusClientSessionPtr> ClientSessions; + TVector<TBusServerSessionPtr> ServerSessions; + + TBusModuleImpl(TBusModule* module, const char* name) + : Module(module) + , Queue() + , Name(name) + , JobCount(0) + , State(CREATED) + , ExternalSession(nullptr) + , ModuleClientHandler(new TModuleClientHandler(this)) + , ModuleServerHandler(new TModuleServerHandler(this)) + { + } + + ~TBusModuleImpl() override { + // Shutdown cannot be called from destructor, + // because module has virtual methods. + Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor"); + + Scheduler.Stop(); + + while (!Jobs.empty()) { + DestroyJob(Jobs.front()); + } + Y_VERIFY(JobCount == 0, "state check"); + } + + void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&); + + void AddJob(TJobRunner* jobRunner); + + void DestroyJob(TJobRunner* job); + + /// terminate job on this message + void CancelJob(TBusJob* job, EMessageStatus status); + /// prints statuses of jobs + TString GetStatus(unsigned flags); + + size_t Size() const { + return AtomicGet(JobCount); + } + + void Shutdown(); + + TVector<TBusClientSessionPtr> GetClientSessionsInternal() override { + return ClientSessions; + } + + TVector<TBusServerSessionPtr> GetServerSessionsInternal() override { + return ServerSessions; + } + + TBusMessageQueue* GetQueue() override { + return Queue; + } + + TString GetNameInternal() override { + return Name; + } + + TString GetStatusSingleLine() override { + TStringStream ss; + ss << "jobs: " << Size(); + return ss.Str(); + } + + void OnClientConnectionEvent(const TClientConnectionEvent& event) { + Module->OnClientConnectionEvent(event); + } + }; + + struct TJobResponseMessage { + TBusMessage* Request; + TBusMessage* Response; + EMessageStatus Status; + + TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status) + : Request(request) + , Response(response) + , Status(status) + { + } + }; + + struct TJobRunner: public TAtomicRefCount<TJobRunner>, + public NActor::TActor<TJobRunner>, + public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, + public TScheduleActor<TJobRunner> { + THolder<TBusJob> Job; + + TList<TJobRunner*>::iterator JobStorageIterator; + + TJobRunner(TAutoPtr<TBusJob> job) + : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor()) + , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler) + , Job(job.Release()) + , JobStorageIterator() + { + Job->Runner = this; + } + + ~TJobRunner() override { + Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator()); + } + + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) { + Job->CallReplyHandler(message.Status, message.Request, message.Response); + } + + void Destroy() { + if (!!Job->OnMessageContext) { + if (!Job->ReplySent) { + Job->OnMessageContext.ForgetRequest(); + } + } + Job->ModuleImpl->DestroyJob(this); } - void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override; - void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; - void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override; - void OnClientConnectionEvent(const TClientConnectionEvent& event) override; - - TBusModuleImpl* const Module; - }; - - struct TModuleServerHandler - : public IBusServerHandler { - TModuleServerHandler(TBusModuleImpl* module) - : Module(module) - { - } - - void OnMessage(TOnMessageContext& msg) override; - - TBusModuleImpl* const Module; - }; - - struct TBusModuleImpl: public TBusModuleInternal { - TBusModule* const Module; - - TBusMessageQueue* Queue; - - TScheduler Scheduler; - - const char* const Name; - - typedef TList<TJobRunner*> TBusJobList; - /// jobs currently in-flight on this module - TBusJobList Jobs; - /// module level mutex - TMutex Lock; - TCondVar ShutdownCondVar; - TAtomic JobCount; - - enum EState { - CREATED, - RUNNING, - STOPPED, - }; - - TAtomic State; - TBusModuleConfig ModuleConfig; - TBusServerSessionPtr ExternalSession; - /// protocol for local proxy session - THolder<IBusClientHandler> ModuleClientHandler; - THolder<IBusServerHandler> ModuleServerHandler; - TVector<TSimpleSharedPtr<TBusStarter>> Starters; - - // Sessions must be destroyed before - // ModuleClientHandler / ModuleServerHandler - TVector<TBusClientSessionPtr> ClientSessions; - TVector<TBusServerSessionPtr> ServerSessions; - - TBusModuleImpl(TBusModule* module, const char* name) - : Module(module) - , Queue() - , Name(name) - , JobCount(0) - , State(CREATED) - , ExternalSession(nullptr) - , ModuleClientHandler(new TModuleClientHandler(this)) - , ModuleServerHandler(new TModuleServerHandler(this)) - { - } - - ~TBusModuleImpl() override { - // Shutdown cannot be called from destructor, - // because module has virtual methods. - Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor"); - - Scheduler.Stop(); - - while (!Jobs.empty()) { - DestroyJob(Jobs.front()); + void Act(NActor::TDefaultTag) { + if (JobStorageIterator == TList<TJobRunner*>::iterator()) { + return; + } + + if (Job->SleepUntil != 0) { + if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) { + Destroy(); + return; + } + } + + TThreadCurrentJobGuard g(Job.Get()); + + NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll(); + + if (Alarm.FetchTask()) { + if (Job->AnyPendingToSend()) { + Y_ASSERT(Job->SleepUntil == 0); + Job->SendPending(); + if (Job->AnyPendingToSend()) { + } + } else { + // regular alarm + Y_ASSERT(Job->Pending.empty()); + Y_ASSERT(Job->SleepUntil != 0); + Job->SleepUntil = 0; + } } - Y_VERIFY(JobCount == 0, "state check"); - } - void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&); + for (;;) { + if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) { + TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); - void AddJob(TJobRunner* jobRunner); + Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message); + } - void DestroyJob(TJobRunner* job); + if (Job->SleepUntil != 0) { + ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil)); + return; + } - /// terminate job on this message - void CancelJob(TBusJob* job, EMessageStatus status); - /// prints statuses of jobs - TString GetStatus(unsigned flags); + Job->SendPending(); - size_t Size() const { - return AtomicGet(JobCount); - } + if (Job->AnyPendingToSend()) { + ScheduleAt(TInstant::Now() + TDuration::Seconds(1)); + return; + } - void Shutdown(); + if (!Job->Pending.empty()) { + // waiting replies + return; + } - TVector<TBusClientSessionPtr> GetClientSessionsInternal() override { - return ClientSessions; + if (Job->IsDone()) { + Destroy(); + return; + } + } } + }; + + } + + static inline TJobRunner* GetJob(TBusMessage* message) { + return (TJobRunner*)message->Data; + } + + static inline void SetJob(TBusMessage* message, TJobRunner* job) { + message->Data = job; + } + + TBusJob::TBusJob(TBusModule* module, TBusMessage* message) + : Status(MESSAGE_OK) + , Runner() + , Message(message) + , ReplySent(false) + , Module(module) + , ModuleImpl(module->Impl.Get()) + , SleepUntil(0) + { + Handler = TJobHandler(&TBusModule::Start); + } + + TBusJob::~TBusJob() { + Y_ASSERT(Pending.size() == 0); + //Y_ASSERT(SleepUntil == 0); + + ClearAllMessageStates(); + } + + TNetAddr TBusJob::GetPeerAddrNetAddr() const { + Y_VERIFY(!!OnMessageContext); + return OnMessageContext.GetPeerAddrNetAddr(); + } + + void TBusJob::CheckThreadCurrentJob() { + Y_ASSERT(ThreadCurrentJob == this); + } + + ///////////////////////////////////////////////////////// + /// \brief Send messages in pending list + + /// If at least one message is gone return true + /// If message has not been send, move it to Finished with appropriate error code + bool TBusJob::SendPending() { + // Iterator type must be size_t, not vector::iterator, + // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector, + // that in turn invalidates iterator. + // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending` + // (not erases, and not inserts) so iteration by index is valid. + size_t it = 0; + while (it != Pending.size()) { + TJobState& call = Pending[it]; + + if (call.Status == MESSAGE_DONT_ASK) { + EMessageStatus getAddressStatus = MESSAGE_OK; + TNetAddr addr; + if (call.UseAddr) { + addr = call.Addr; + } else { + getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr); + } + + if (getAddressStatus == MESSAGE_OK) { + // hold extra reference for each request in flight + Runner->Ref(); + + if (call.OneWay) { + call.Status = call.Session->SendMessageOneWay(call.Message, &addr); + } else { + call.Status = call.Session->SendMessage(call.Message, &addr); + } + + if (call.Status != MESSAGE_OK) { + Runner->UnRef(); + } - TVector<TBusServerSessionPtr> GetServerSessionsInternal() override { - return ServerSessions; - } - - TBusMessageQueue* GetQueue() override { - return Queue; - } - - TString GetNameInternal() override { - return Name; - } - - TString GetStatusSingleLine() override { - TStringStream ss; - ss << "jobs: " << Size(); - return ss.Str(); - } - - void OnClientConnectionEvent(const TClientConnectionEvent& event) { - Module->OnClientConnectionEvent(event); - } - }; - - struct TJobResponseMessage { - TBusMessage* Request; - TBusMessage* Response; - EMessageStatus Status; - - TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status) - : Request(request) - , Response(response) - , Status(status) - { - } - }; - - struct TJobRunner: public TAtomicRefCount<TJobRunner>, - public NActor::TActor<TJobRunner>, - public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, - public TScheduleActor<TJobRunner> { - THolder<TBusJob> Job; - - TList<TJobRunner*>::iterator JobStorageIterator; - - TJobRunner(TAutoPtr<TBusJob> job) - : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor()) - , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler) - , Job(job.Release()) - , JobStorageIterator() - { - Job->Runner = this; - } - - ~TJobRunner() override { - Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator()); - } - - void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) { - Job->CallReplyHandler(message.Status, message.Request, message.Response); - } - - void Destroy() { - if (!!Job->OnMessageContext) { - if (!Job->ReplySent) { - Job->OnMessageContext.ForgetRequest(); - } - } - Job->ModuleImpl->DestroyJob(this); - } - - void Act(NActor::TDefaultTag) { - if (JobStorageIterator == TList<TJobRunner*>::iterator()) { - return; - } - - if (Job->SleepUntil != 0) { - if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) { - Destroy(); - return; - } - } - - TThreadCurrentJobGuard g(Job.Get()); - - NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll(); - - if (Alarm.FetchTask()) { - if (Job->AnyPendingToSend()) { - Y_ASSERT(Job->SleepUntil == 0); - Job->SendPending(); - if (Job->AnyPendingToSend()) { - } - } else { - // regular alarm - Y_ASSERT(Job->Pending.empty()); - Y_ASSERT(Job->SleepUntil != 0); - Job->SleepUntil = 0; - } - } - - for (;;) { - if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) { - TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); - - Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message); - } - - if (Job->SleepUntil != 0) { - ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil)); - return; - } - - Job->SendPending(); - - if (Job->AnyPendingToSend()) { - ScheduleAt(TInstant::Now() + TDuration::Seconds(1)); - return; - } - - if (!Job->Pending.empty()) { - // waiting replies - return; - } - - if (Job->IsDone()) { - Destroy(); - return; - } - } - } - }; - - } - - static inline TJobRunner* GetJob(TBusMessage* message) { - return (TJobRunner*)message->Data; - } - - static inline void SetJob(TBusMessage* message, TJobRunner* job) { - message->Data = job; - } - - TBusJob::TBusJob(TBusModule* module, TBusMessage* message) - : Status(MESSAGE_OK) - , Runner() - , Message(message) - , ReplySent(false) - , Module(module) - , ModuleImpl(module->Impl.Get()) - , SleepUntil(0) - { - Handler = TJobHandler(&TBusModule::Start); - } - - TBusJob::~TBusJob() { - Y_ASSERT(Pending.size() == 0); - //Y_ASSERT(SleepUntil == 0); - - ClearAllMessageStates(); - } - - TNetAddr TBusJob::GetPeerAddrNetAddr() const { - Y_VERIFY(!!OnMessageContext); - return OnMessageContext.GetPeerAddrNetAddr(); - } - - void TBusJob::CheckThreadCurrentJob() { - Y_ASSERT(ThreadCurrentJob == this); - } - - ///////////////////////////////////////////////////////// - /// \brief Send messages in pending list - - /// If at least one message is gone return true - /// If message has not been send, move it to Finished with appropriate error code - bool TBusJob::SendPending() { - // Iterator type must be size_t, not vector::iterator, - // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector, - // that in turn invalidates iterator. - // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending` - // (not erases, and not inserts) so iteration by index is valid. - size_t it = 0; - while (it != Pending.size()) { - TJobState& call = Pending[it]; - - if (call.Status == MESSAGE_DONT_ASK) { - EMessageStatus getAddressStatus = MESSAGE_OK; - TNetAddr addr; - if (call.UseAddr) { - addr = call.Addr; } else { - getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr); + call.Status = getAddressStatus; } - - if (getAddressStatus == MESSAGE_OK) { - // hold extra reference for each request in flight - Runner->Ref(); - - if (call.OneWay) { - call.Status = call.Session->SendMessageOneWay(call.Message, &addr); - } else { - call.Status = call.Session->SendMessage(call.Message, &addr); - } - - if (call.Status != MESSAGE_OK) { - Runner->UnRef(); - } - - } else { - call.Status = getAddressStatus; - } - } - - if (call.Status == MESSAGE_OK) { - ++it; // keep pending list until we get reply - } else if (call.Status == MESSAGE_BUSY) { - Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight"); - } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) { - ++it; // try up to call.MaxRetries times to send message - call.NumRetries++; - DoCallReplyHandler(call); - call.Status = MESSAGE_DONT_ASK; - call.Message->Reset(); // generate new Id + } + + if (call.Status == MESSAGE_OK) { + ++it; // keep pending list until we get reply + } else if (call.Status == MESSAGE_BUSY) { + Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight"); + } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) { + ++it; // try up to call.MaxRetries times to send message + call.NumRetries++; + DoCallReplyHandler(call); + call.Status = MESSAGE_DONT_ASK; + call.Message->Reset(); // generate new Id } else { - Finished.push_back(call); - DoCallReplyHandler(call); - Pending.erase(Pending.begin() + it); + Finished.push_back(call); + DoCallReplyHandler(call); + Pending.erase(Pending.begin() + it); } } - return Pending.size() > 0; - } - - bool TBusJob::AnyPendingToSend() { - for (unsigned i = 0; i < Pending.size(); ++i) { - if (Pending[i].Status == MESSAGE_DONT_ASK) { - return true; - } + return Pending.size() > 0; + } + + bool TBusJob::AnyPendingToSend() { + for (unsigned i = 0; i < Pending.size(); ++i) { + if (Pending[i].Status == MESSAGE_DONT_ASK) { + return true; + } } - - return false; + + return false; } - bool TBusJob::IsDone() { - bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); - return r; - } - - void TBusJob::CallJobHandlerOnly() { - TThreadCurrentJobGuard threadCurrentJobGuard(this); - TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); - - Handler = Handler(ModuleImpl->Module, this, Message); + bool TBusJob::IsDone() { + bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); + return r; } - bool TBusJob::CallJobHandler() { - /// go on as far as we can go without waiting - while (!IsDone()) { - /// call the handler - CallJobHandlerOnly(); + void TBusJob::CallJobHandlerOnly() { + TThreadCurrentJobGuard threadCurrentJobGuard(this); + TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); - /// quit if job is canceled - if (Status != MESSAGE_OK) { - break; - } + Handler = Handler(ModuleImpl->Module, this, Message); + } - /// there are messages to send and wait for reply - SendPending(); + bool TBusJob::CallJobHandler() { + /// go on as far as we can go without waiting + while (!IsDone()) { + /// call the handler + CallJobHandlerOnly(); + + /// quit if job is canceled + if (Status != MESSAGE_OK) { + break; + } - if (!Pending.empty()) { - break; - } + /// there are messages to send and wait for reply + SendPending(); - /// asked to sleep - if (SleepUntil) { - break; - } - } + if (!Pending.empty()) { + break; + } - Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent), - "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d", - Message->GetHeader()->Id, Message->GetHeader()->Type); + /// asked to sleep + if (SleepUntil) { + break; + } + } - return IsDone(); - } + Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent), + "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d", + Message->GetHeader()->Id, Message->GetHeader()->Type); - void TBusJob::DoCallReplyHandler(TJobState& call) { - if (call.Handler) { - TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)"); + return IsDone(); + } - TThreadCurrentJobGuard threadCurrentJobGuard(this); - (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); - } - } + void TBusJob::DoCallReplyHandler(TJobState& call) { + if (call.Handler) { + TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)"); - int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { - /// find handler for given message and update it's status - size_t i = 0; - for (; i < Pending.size(); ++i) { - TJobState& call = Pending[i]; - if (call.Message == mess) { - break; - } + TThreadCurrentJobGuard threadCurrentJobGuard(this); + (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); } - - /// if not found, report error - if (i == Pending.size()) { - Y_FAIL("must not happen"); + } + + int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + /// find handler for given message and update it's status + size_t i = 0; + for (; i < Pending.size(); ++i) { + TJobState& call = Pending[i]; + if (call.Message == mess) { + break; + } } - /// fill in response into job state - TJobState& call = Pending[i]; - call.Status = status; - Y_ASSERT(call.Message == mess); - call.Reply = reply; - - if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { - call.NumRetries++; - call.Status = MESSAGE_DONT_ASK; - call.Message->Reset(); // generate new Id - DoCallReplyHandler(call); - return 0; + /// if not found, report error + if (i == Pending.size()) { + Y_FAIL("must not happen"); + } + + /// fill in response into job state + TJobState& call = Pending[i]; + call.Status = status; + Y_ASSERT(call.Message == mess); + call.Reply = reply; + + if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { + call.NumRetries++; + call.Status = MESSAGE_DONT_ASK; + call.Message->Reset(); // generate new Id + DoCallReplyHandler(call); + return 0; } - /// call the handler if provided - DoCallReplyHandler(call); + /// call the handler if provided + DoCallReplyHandler(call); - /// move job state into the finished stack - Finished.push_back(Pending[i]); - Pending.erase(Pending.begin() + i); + /// move job state into the finished stack + Finished.push_back(Pending[i]); + Pending.erase(Pending.begin() + i); return 0; } - /////////////////////////////////////////////////////////////// - /// send message to any other session or application - void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { - CheckThreadCurrentJob(); + /////////////////////////////////////////////////////////////// + /// send message to any other session or application + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { + CheckThreadCurrentJob(); - SetJob(mess.Get(), Runner); - Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); - } + SetJob(mess.Get(), Runner); + Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); + } - void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { - CheckThreadCurrentJob(); + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { + CheckThreadCurrentJob(); - SetJob(mess.Get(), Runner); - Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); - } + SetJob(mess.Get(), Runner); + Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); + } - void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { - CheckThreadCurrentJob(); + void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { + CheckThreadCurrentJob(); - SetJob(req.Get(), Runner); - Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); - } + SetJob(req.Get(), Runner); + Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); + } - void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) { - CheckThreadCurrentJob(); + void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) { + CheckThreadCurrentJob(); - SetJob(req.Get(), Runner); - Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true)); - } + SetJob(req.Get(), Runner); + Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true)); + } - /////////////////////////////////////////////////////////////// - /// send reply to the starter message - void TBusJob::SendReply(TBusMessageAutoPtr reply) { - CheckThreadCurrentJob(); + /////////////////////////////////////////////////////////////// + /// send reply to the starter message + void TBusJob::SendReply(TBusMessageAutoPtr reply) { + CheckThreadCurrentJob(); - Y_VERIFY(!ReplySent, "cannot call SendReply twice"); - ReplySent = true; - if (!OnMessageContext) - return; + Y_VERIFY(!ReplySent, "cannot call SendReply twice"); + ReplySent = true; + if (!OnMessageContext) + return; - EMessageStatus ok = OnMessageContext.SendReplyMove(reply); - if (ok != MESSAGE_OK) { - // TODO: count errors - } - } + EMessageStatus ok = OnMessageContext.SendReplyMove(reply); + if (ok != MESSAGE_OK) { + // TODO: count errors + } + } - /// set the flag to terminate job at the earliest convenience - void TBusJob::Cancel(EMessageStatus status) { - CheckThreadCurrentJob(); + /// set the flag to terminate job at the earliest convenience + void TBusJob::Cancel(EMessageStatus status) { + CheckThreadCurrentJob(); - Status = status; + Status = status; } - void TBusJob::ClearState(TJobState& call) { - TJobStateVec::iterator it; - for (it = Finished.begin(); it != Finished.end(); ++it) { - TJobState& state = *it; - if (&call == &state) { - ::ClearState(&call); - Finished.erase(it); - return; - } + void TBusJob::ClearState(TJobState& call) { + TJobStateVec::iterator it; + for (it = Finished.begin(); it != Finished.end(); ++it) { + TJobState& state = *it; + if (&call == &state) { + ::ClearState(&call); + Finished.erase(it); + return; + } } - Y_ASSERT(0); + Y_ASSERT(0); } - void TBusJob::ClearAllMessageStates() { - ClearJobStateVector(&Finished); - ClearJobStateVector(&Pending); - } + void TBusJob::ClearAllMessageStates() { + ClearJobStateVector(&Finished); + ClearJobStateVector(&Pending); + } - void TBusJob::Sleep(int milliSeconds) { - CheckThreadCurrentJob(); + void TBusJob::Sleep(int milliSeconds) { + CheckThreadCurrentJob(); - Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job"); - Y_VERIFY(SleepUntil == 0, "must not override sleep"); + Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job"); + Y_VERIFY(SleepUntil == 0, "must not override sleep"); - SleepUntil = Now() + milliSeconds; - } + SleepUntil = Now() + milliSeconds; + } - TString TBusJob::GetStatus(unsigned flags) { - TString strReturn; - strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", - Message->GetHeader()->Id, - (int)Message->GetHeader()->Type, - (int)(Now() - Message->GetHeader()->SendTime) / 1000, - (int)Pending.size(), - (int)Finished.size(), + TString TBusJob::GetStatus(unsigned flags) { + TString strReturn; + strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", + Message->GetHeader()->Id, + (int)Message->GetHeader()->Type, + (int)(Now() - Message->GetHeader()->SendTime) / 1000, + (int)Pending.size(), + (int)Finished.size(), Status != MESSAGE_OK ? ToString(Status).data() : ""); - - TJobStateVec::iterator it; - for (it = Pending.begin(); it != Pending.end(); ++it) { - TJobState& call = *it; - strReturn += call.GetStatus(flags); - } - return strReturn; - } - - TString TJobState::GetStatus(unsigned flags) { - Y_UNUSED(flags); - TString strReturn; - strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n", - Message->GetHeader()->Id, - (int)Message->GetHeader()->Type, - Session->GetProto()->GetService(), - (int)(Now() - Message->GetHeader()->SendTime) / 1000, + + TJobStateVec::iterator it; + for (it = Pending.begin(); it != Pending.end(); ++it) { + TJobState& call = *it; + strReturn += call.GetStatus(flags); + } + return strReturn; + } + + TString TJobState::GetStatus(unsigned flags) { + Y_UNUSED(flags); + TString strReturn; + strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n", + Message->GetHeader()->Id, + (int)Message->GetHeader()->Type, + Session->GetProto()->GetService(), + (int)(Now() - Message->GetHeader()->SendTime) / 1000, ToString(Status).data()); - return strReturn; - } - - ////////////////////////////////////////////////////////////////////// - - void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) { - TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob"); - if (job) { - job->Cancel(status); - } - } - - TString TBusModuleImpl::GetStatus(unsigned flags) { - Y_UNUSED(flags); - TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); - TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size()); - for (auto job : Jobs) { - //strReturn += job->Job->GetStatus(flags); - Y_UNUSED(job); - strReturn += "TODO\n"; - } - return strReturn; - } - - TBusModuleConfig::TBusModuleConfig() - : StarterMaxInFlight(1000) - { - } - - TBusModuleConfig::TSecret::TSecret() - : SchedulePeriod(TDuration::Seconds(1)) - { - } - - TBusModule::TBusModule(const char* name) - : Impl(new TBusModuleImpl(this, name)) - { - } - - TBusModule::~TBusModule() { - } - - const char* TBusModule::GetName() const { - return Impl->Name; - } - - void TBusModule::SetConfig(const TBusModuleConfig& config) { - Impl->ModuleConfig = config; - } - - bool TBusModule::StartInput() { - Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check"); - Y_VERIFY(!!Impl->Queue, "state check"); - Impl->State = TBusModuleImpl::RUNNING; - - Y_ASSERT(!Impl->ExternalSession); - TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue); - if (extSession != nullptr) { - Impl->ExternalSession = extSession; - } - - return true; - } - - bool TBusModule::Shutdown() { - Impl->Shutdown(); - - return true; - } - - TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) { - TBusJob* job = new TBusJob(this, message); - return job; - } - - /** + return strReturn; + } + + ////////////////////////////////////////////////////////////////////// + + void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) { + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob"); + if (job) { + job->Cancel(status); + } + } + + TString TBusModuleImpl::GetStatus(unsigned flags) { + Y_UNUSED(flags); + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); + TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size()); + for (auto job : Jobs) { + //strReturn += job->Job->GetStatus(flags); + Y_UNUSED(job); + strReturn += "TODO\n"; + } + return strReturn; + } + + TBusModuleConfig::TBusModuleConfig() + : StarterMaxInFlight(1000) + { + } + + TBusModuleConfig::TSecret::TSecret() + : SchedulePeriod(TDuration::Seconds(1)) + { + } + + TBusModule::TBusModule(const char* name) + : Impl(new TBusModuleImpl(this, name)) + { + } + + TBusModule::~TBusModule() { + } + + const char* TBusModule::GetName() const { + return Impl->Name; + } + + void TBusModule::SetConfig(const TBusModuleConfig& config) { + Impl->ModuleConfig = config; + } + + bool TBusModule::StartInput() { + Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check"); + Y_VERIFY(!!Impl->Queue, "state check"); + Impl->State = TBusModuleImpl::RUNNING; + + Y_ASSERT(!Impl->ExternalSession); + TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue); + if (extSession != nullptr) { + Impl->ExternalSession = extSession; + } + + return true; + } + + bool TBusModule::Shutdown() { + Impl->Shutdown(); + + return true; + } + + TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) { + TBusJob* job = new TBusJob(this, message); + return job; + } + + /** Example for external session creation: TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { @@ -698,77 +698,77 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { return session; */ - bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) { - Impl->Queue = queue; - return true; - } - - int TBusModule::GetModuleSessionInFlight() const { - return Impl->Size(); - } - - TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() { - return Impl.Get(); - } - - TBusServerSessionPtr TBusModule::CreateDefaultDestination( - TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) { - TBusServerSessionConfig patchedConfig = config; - patchedConfig.ExecuteOnMessageInWorkerPool = false; - if (!patchedConfig.Name) { - patchedConfig.Name = name; - } - if (!patchedConfig.Name) { - patchedConfig.Name = Impl->Name; - } - TBusServerSessionPtr session = - TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue); - Impl->ServerSessions.push_back(session); - return session; - } - - TBusClientSessionPtr TBusModule::CreateDefaultSource( - TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) { - TBusClientSessionConfig patchedConfig = config; - patchedConfig.ExecuteOnReplyInWorkerPool = false; - if (!patchedConfig.Name) { - patchedConfig.Name = name; - } - if (!patchedConfig.Name) { - patchedConfig.Name = Impl->Name; - } - TBusClientSessionPtr session = - TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue); - Impl->ClientSessions.push_back(session); - return session; - } - - TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) { - TBusStarter* session = new TBusStarter(this, config); - Impl->Starters.push_back(session); - return session; - } - - void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) { - Y_UNUSED(event); - } - - TString TBusModule::GetStatus(unsigned flags) { - TString strReturn = Sprintf("%s\n", Impl->Name); - strReturn += Impl->GetStatus(flags); - return strReturn; - } + bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) { + Impl->Queue = queue; + return true; + } + + int TBusModule::GetModuleSessionInFlight() const { + return Impl->Size(); + } + + TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() { + return Impl.Get(); + } + + TBusServerSessionPtr TBusModule::CreateDefaultDestination( + TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) { + TBusServerSessionConfig patchedConfig = config; + patchedConfig.ExecuteOnMessageInWorkerPool = false; + if (!patchedConfig.Name) { + patchedConfig.Name = name; + } + if (!patchedConfig.Name) { + patchedConfig.Name = Impl->Name; + } + TBusServerSessionPtr session = + TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue); + Impl->ServerSessions.push_back(session); + return session; + } + + TBusClientSessionPtr TBusModule::CreateDefaultSource( + TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) { + TBusClientSessionConfig patchedConfig = config; + patchedConfig.ExecuteOnReplyInWorkerPool = false; + if (!patchedConfig.Name) { + patchedConfig.Name = name; + } + if (!patchedConfig.Name) { + patchedConfig.Name = Impl->Name; + } + TBusClientSessionPtr session = + TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue); + Impl->ClientSessions.push_back(session); + return session; + } + + TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) { + TBusStarter* session = new TBusStarter(this, config); + Impl->Starters.push_back(session); + return session; + } + + void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) { + Y_UNUSED(event); + } + + TString TBusModule::GetStatus(unsigned flags) { + TString strReturn = Sprintf("%s\n", Impl->Name); + strReturn += Impl->GetStatus(flags); + return strReturn; + } } -void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { +void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob"); Jobs.push_back(jobRunner); jobRunner->JobStorageIterator = Jobs.end(); --jobRunner->JobStorageIterator; } -void TBusModuleImpl::DestroyJob(TJobRunner* job) { +void TBusModuleImpl::DestroyJob(TJobRunner* job) { Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator()); { @@ -837,7 +837,7 @@ EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) { Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING); Y_VERIFY(!!Impl->Queue); - if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) { + if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) { return MESSAGE_BUSY; } diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 8d1c4a5d52..c917f9cae7 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -1,37 +1,37 @@ -#pragma once - +#pragma once + /////////////////////////////////////////////////////////////////////////// /// \file /// \brief Application interface for modules - + /// NBus::TBusModule provides foundation for implementation of asynchnous /// modules that communicate with multiple external or local sessions -/// NBus::TBusSession. - +/// NBus::TBusSession. + /// To implement the module some virtual functions needs to be overridden: -/// NBus::TBusModule::CreateExtSession() creates and registers an +/// NBus::TBusModule::CreateExtSession() creates and registers an /// external session that receives incoming messages as input for module /// processing. - -/// When new incoming message arrives the new NBus::TBusJob is created. + +/// When new incoming message arrives the new NBus::TBusJob is created. /// NBus::TBusJob is somewhat similar to a thread, it maintains all the state -/// during processing of one incoming message. Default implementation of -/// NBus::TBusJob will maintain all send and received messages during +/// during processing of one incoming message. Default implementation of +/// NBus::TBusJob will maintain all send and received messages during /// lifetime of this job. Each message, status and reply can be found /// within NBus::TJobState using NBus::TBusJob::GetState(). If your module /// needs to maintain an additional information during lifetime of the job -/// you can derive your own class from NBus::TBusJob and override job +/// you can derive your own class from NBus::TBusJob and override job /// factory method NBus::IJobFactory::CreateJobInstance() to create your instances. - + /// Processing of a given message starts with a call to NBus::TBusModule::Start() /// handler that should be overridden in the module implementation. Within /// the callback handler module can perform any computation and access any /// datastore tables that it needs. The handler can also access any module -/// variables. However, same handler can be called from multiple threads so, +/// variables. However, same handler can be called from multiple threads so, /// it is recommended that handler only access read-only module level variables. - -/// Handler should use NBus::TBusJob::Send() to send messages to other client + +/// Handler should use NBus::TBusJob::Send() to send messages to other client /// sessions and it can use NBus::TBusJob::Reply() to send reply to the main /// job message. When handler is done, it returns the pointer to the next handler to call /// when all pending messages have cleared. If handler @@ -47,364 +47,364 @@ #include <util/generic/object_counter.h> namespace NBus { - class TBusJob; - class TBusModule; - - namespace NPrivate { - struct TCallJobHandlerWorkItem; - struct TBusModuleImpl; - struct TModuleServerHandler; - struct TModuleClientHandler; - struct TJobRunner; - } - - class TJobHandler { - protected: - typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess); - TBusHandlerPtr MyPtr; - - public: - template <class B> - TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) { - MyPtr = static_cast<TBusHandlerPtr>(fptr); - } - TJobHandler(TBusHandlerPtr fptr = nullptr) { - MyPtr = fptr; - } + class TBusJob; + class TBusModule; + + namespace NPrivate { + struct TCallJobHandlerWorkItem; + struct TBusModuleImpl; + struct TModuleServerHandler; + struct TModuleClientHandler; + struct TJobRunner; + } + + class TJobHandler { + protected: + typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess); + TBusHandlerPtr MyPtr; + + public: + template <class B> + TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) { + MyPtr = static_cast<TBusHandlerPtr>(fptr); + } + TJobHandler(TBusHandlerPtr fptr = nullptr) { + MyPtr = fptr; + } TJobHandler(const TJobHandler&) = default; TJobHandler& operator =(const TJobHandler&) = default; bool operator==(TJobHandler h) const { - return MyPtr == h.MyPtr; - } + return MyPtr == h.MyPtr; + } bool operator!=(TJobHandler h) const { - return MyPtr != h.MyPtr; - } - bool operator!() const { - return !MyPtr; - } - TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) { - return (b->*MyPtr)(job, mess); - } - }; - - typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply); - - //////////////////////////////////////////////////// - /// \brief Pending message state - - struct TJobState { - friend class TBusJob; - friend class ::TCrawlerModule; - - TReplyHandler Handler; - EMessageStatus Status; - TBusMessage* Message; - TBusMessage* Reply; - TBusClientSession* Session; - size_t NumRetries; - size_t MaxRetries; - // If != NULL then use it as destination. - TNetAddr Addr; - bool UseAddr; - bool OneWay; - - private: - TJobState(TReplyHandler handler, - EMessageStatus status, - TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0, - const TNetAddr* addr = nullptr, bool oneWay = false) - : Handler(handler) - , Status(status) - , Message(mess) - , Reply(reply) - , Session(session) - , NumRetries(0) - , MaxRetries(maxRetries) - , OneWay(oneWay) - { - if (!!addr) { - Addr = *addr; - } - UseAddr = !!addr; - } - - public: - TString GetStatus(unsigned flags); - }; - - using TJobStateVec = TVector<TJobState>; - - ///////////////////////////////////////////////////////// - /// \brief Execution item = thread - - /// Maintains internal state of document in computation - class TBusJob { - TObjectCounter<TBusJob> ObjectCounter; - - private: - void CheckThreadCurrentJob(); - - public: - /// given a module and starter message - TBusJob(TBusModule* module, TBusMessage* message); - - /// destructor will free all the message that were send and received - virtual ~TBusJob(); - - TBusMessage* GetMessage() const { - return Message; - } - - TNetAddr GetPeerAddrNetAddr() const; - - /// send message to any other session or application - /// If addr is set then use it as destination. - void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr); - void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0); - - void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr); - void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session); - - /// send reply to the starter message - virtual void SendReply(TBusMessageAutoPtr reply); - - /// set the flag to terminate job at the earliest convenience - void Cancel(EMessageStatus status); - - /// helper to put item on finished list of states - /// It should not be a part of public API, - /// so prohibit it for all except current users. - private: - friend class ::TCrawlerModule; - void PutState(const TJobState& state) { - Finished.push_back(state); + return MyPtr != h.MyPtr; + } + bool operator!() const { + return !MyPtr; + } + TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) { + return (b->*MyPtr)(job, mess); + } + }; + + typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply); + + //////////////////////////////////////////////////// + /// \brief Pending message state + + struct TJobState { + friend class TBusJob; + friend class ::TCrawlerModule; + + TReplyHandler Handler; + EMessageStatus Status; + TBusMessage* Message; + TBusMessage* Reply; + TBusClientSession* Session; + size_t NumRetries; + size_t MaxRetries; + // If != NULL then use it as destination. + TNetAddr Addr; + bool UseAddr; + bool OneWay; + + private: + TJobState(TReplyHandler handler, + EMessageStatus status, + TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0, + const TNetAddr* addr = nullptr, bool oneWay = false) + : Handler(handler) + , Status(status) + , Message(mess) + , Reply(reply) + , Session(session) + , NumRetries(0) + , MaxRetries(maxRetries) + , OneWay(oneWay) + { + if (!!addr) { + Addr = *addr; + } + UseAddr = !!addr; } - public: - /// retrieve all pending messages - void GetPending(TJobStateVec* stateVec) { - Y_ASSERT(stateVec); - *stateVec = Pending; - } - - /// helper function to find state of previously sent messages - template <class MessageType> - TJobState* GetState(int* startFrom = nullptr) { - for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) { - TJobState* call = &Finished[i]; - if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) { - if (startFrom) { - *startFrom = i; - } - return call; - } - if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) { - if (startFrom) { - *startFrom = i; - } - return call; - } + public: + TString GetStatus(unsigned flags); + }; + + using TJobStateVec = TVector<TJobState>; + + ///////////////////////////////////////////////////////// + /// \brief Execution item = thread + + /// Maintains internal state of document in computation + class TBusJob { + TObjectCounter<TBusJob> ObjectCounter; + + private: + void CheckThreadCurrentJob(); + + public: + /// given a module and starter message + TBusJob(TBusModule* module, TBusMessage* message); + + /// destructor will free all the message that were send and received + virtual ~TBusJob(); + + TBusMessage* GetMessage() const { + return Message; + } + + TNetAddr GetPeerAddrNetAddr() const; + + /// send message to any other session or application + /// If addr is set then use it as destination. + void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr); + void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0); + + void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr); + void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session); + + /// send reply to the starter message + virtual void SendReply(TBusMessageAutoPtr reply); + + /// set the flag to terminate job at the earliest convenience + void Cancel(EMessageStatus status); + + /// helper to put item on finished list of states + /// It should not be a part of public API, + /// so prohibit it for all except current users. + private: + friend class ::TCrawlerModule; + void PutState(const TJobState& state) { + Finished.push_back(state); + } + + public: + /// retrieve all pending messages + void GetPending(TJobStateVec* stateVec) { + Y_ASSERT(stateVec); + *stateVec = Pending; + } + + /// helper function to find state of previously sent messages + template <class MessageType> + TJobState* GetState(int* startFrom = nullptr) { + for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) { + TJobState* call = &Finished[i]; + if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) { + if (startFrom) { + *startFrom = i; + } + return call; + } + if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) { + if (startFrom) { + *startFrom = i; + } + return call; + } } - return nullptr; + return nullptr; } - /// helper function to find response for previously sent messages - template <class MessageType> - MessageType* Get(int* startFrom = nullptr) { - for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) { - TJobState& call = Finished[i]; - if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) { - if (startFrom) { - *startFrom = i; - } - return static_cast<MessageType*>(call.Reply); - } - if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) { - if (startFrom) { - *startFrom = i; - } - return static_cast<MessageType*>(call.Message); - } + /// helper function to find response for previously sent messages + template <class MessageType> + MessageType* Get(int* startFrom = nullptr) { + for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) { + TJobState& call = Finished[i]; + if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) { + if (startFrom) { + *startFrom = i; + } + return static_cast<MessageType*>(call.Reply); + } + if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) { + if (startFrom) { + *startFrom = i; + } + return static_cast<MessageType*>(call.Message); + } } - return nullptr; + return nullptr; } - /// helper function to find status for previously sent message - template <class MessageType> - EMessageStatus GetStatus(int* startFrom = nullptr) { - for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) { - TJobState& call = Finished[i]; - if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) { - if (startFrom) { - *startFrom = i; - } - return call.Status; - } + /// helper function to find status for previously sent message + template <class MessageType> + EMessageStatus GetStatus(int* startFrom = nullptr) { + for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) { + TJobState& call = Finished[i]; + if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) { + if (startFrom) { + *startFrom = i; + } + return call.Status; + } } - return MESSAGE_UNKNOWN; + return MESSAGE_UNKNOWN; } - /// helper function to clear state of previosly sent messages - template <class MessageType> - void Clear() { - for (size_t i = 0; i < Finished.size();) { - // `Finished.size() - i` decreases with each iteration - // we either increment i, or remove element from Finished. - TJobState& call = Finished[i]; - if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) { - ClearState(call); - } else { - ++i; - } + /// helper function to clear state of previosly sent messages + template <class MessageType> + void Clear() { + for (size_t i = 0; i < Finished.size();) { + // `Finished.size() - i` decreases with each iteration + // we either increment i, or remove element from Finished. + TJobState& call = Finished[i]; + if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) { + ClearState(call); + } else { + ++i; + } } } - /// helper function to clear state in order to try again - void ClearState(TJobState& state); - - /// clears all message states - void ClearAllMessageStates(); - - /// returns true if job is done - bool IsDone(); - - /// return human reabable status of this job - virtual TString GetStatus(unsigned flags); - - /// set sleep time for job - void Sleep(int milliSeconds); - - void CallJobHandlerOnly(); - - private: - bool CallJobHandler(); - void DoCallReplyHandler(TJobState&); - /// send out all Pending jobs, failed sends will be migrated to Finished - bool SendPending(); - bool AnyPendingToSend(); - - public: - /// helper to call from OnReply() and OnError() - int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply); - - public: - TJobHandler Handler; ///< job handler to be executed within next CallJobHandler() - EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap - private: - NPrivate::TJobRunner* Runner; - TBusMessage* Message; - THolder<TBusMessage> MessageHolder; - TOnMessageContext OnMessageContext; // starter - public: - bool ReplySent; - - private: - friend class TBusModule; - friend struct NPrivate::TBusModuleImpl; - friend struct NPrivate::TCallJobHandlerWorkItem; - friend struct NPrivate::TModuleServerHandler; - friend struct NPrivate::TModuleClientHandler; - friend struct NPrivate::TJobRunner; - - TJobStateVec Pending; ///< messages currently outstanding via Send() - TJobStateVec Finished; ///< messages that were replied to - TBusModule* Module; - NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job - TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep + /// helper function to clear state in order to try again + void ClearState(TJobState& state); + + /// clears all message states + void ClearAllMessageStates(); + + /// returns true if job is done + bool IsDone(); + + /// return human reabable status of this job + virtual TString GetStatus(unsigned flags); + + /// set sleep time for job + void Sleep(int milliSeconds); + + void CallJobHandlerOnly(); + + private: + bool CallJobHandler(); + void DoCallReplyHandler(TJobState&); + /// send out all Pending jobs, failed sends will be migrated to Finished + bool SendPending(); + bool AnyPendingToSend(); + + public: + /// helper to call from OnReply() and OnError() + int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply); + + public: + TJobHandler Handler; ///< job handler to be executed within next CallJobHandler() + EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap + private: + NPrivate::TJobRunner* Runner; + TBusMessage* Message; + THolder<TBusMessage> MessageHolder; + TOnMessageContext OnMessageContext; // starter + public: + bool ReplySent; + + private: + friend class TBusModule; + friend struct NPrivate::TBusModuleImpl; + friend struct NPrivate::TCallJobHandlerWorkItem; + friend struct NPrivate::TModuleServerHandler; + friend struct NPrivate::TModuleClientHandler; + friend struct NPrivate::TJobRunner; + + TJobStateVec Pending; ///< messages currently outstanding via Send() + TJobStateVec Finished; ///< messages that were replied to + TBusModule* Module; + NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job + TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep + }; + + //////////////////////////////////////////////////////////////////// + /// \brief Classes to implement basic module functionality + + class IJobFactory { + protected: + virtual ~IJobFactory() { + } + + public: + /// job factory method, override to create custom jobs + virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0; }; - //////////////////////////////////////////////////////////////////// - /// \brief Classes to implement basic module functionality + struct TBusModuleConfig { + unsigned StarterMaxInFlight; - class IJobFactory { - protected: - virtual ~IJobFactory() { - } + struct TSecret { + TDuration SchedulePeriod; - public: - /// job factory method, override to create custom jobs - virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0; - }; - - struct TBusModuleConfig { - unsigned StarterMaxInFlight; - - struct TSecret { - TDuration SchedulePeriod; + TSecret(); + }; + TSecret Secret; - TSecret(); - }; - TSecret Secret; + TBusModuleConfig(); + }; - TBusModuleConfig(); - }; - - namespace NPrivate { - struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> { - virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0; - virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0; - virtual TBusMessageQueue* GetQueue() = 0; + namespace NPrivate { + struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> { + virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0; + virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0; + virtual TBusMessageQueue* GetQueue() = 0; - virtual TString GetNameInternal() = 0; + virtual TString GetNameInternal() = 0; - virtual TString GetStatusSingleLine() = 0; + virtual TString GetStatusSingleLine() = 0; - virtual ~TBusModuleInternal() { - } - }; - } + virtual ~TBusModuleInternal() { + } + }; + } - class TBusModule: public IJobFactory, TNonCopyable { - friend class TBusJob; + class TBusModule: public IJobFactory, TNonCopyable { + friend class TBusJob; - TObjectCounter<TBusModule> ObjectCounter; + TObjectCounter<TBusModule> ObjectCounter; - TIntrusivePtr<NPrivate::TBusModuleImpl> Impl; + TIntrusivePtr<NPrivate::TBusModuleImpl> Impl; - public: - /// Each module should have a name which is used as protocol service - TBusModule(const char* name); - ~TBusModule() override; + public: + /// Each module should have a name which is used as protocol service + TBusModule(const char* name); + ~TBusModule() override; - const char* GetName() const; + const char* GetName() const; - void SetConfig(const TBusModuleConfig& config); + void SetConfig(const TBusModuleConfig& config); - /// get status of all jobs in flight - TString GetStatus(unsigned flags = 0); + /// get status of all jobs in flight + TString GetStatus(unsigned flags = 0); - /// called when application is about to start - virtual bool StartInput(); - /// called when application is about to exit - virtual bool Shutdown(); + /// called when application is about to start + virtual bool StartInput(); + /// called when application is about to exit + virtual bool Shutdown(); - // this default implementation just creates TBusJob object - TBusJob* CreateJobInstance(TBusMessage* message) override; + // this default implementation just creates TBusJob object + TBusJob* CreateJobInstance(TBusMessage* message) override; - EMessageStatus StartJob(TAutoPtr<TBusMessage> message); + EMessageStatus StartJob(TAutoPtr<TBusMessage> message); - /// creates private sessions, calls CreateExtSession(), should be called before StartInput() - bool CreatePrivateSessions(TBusMessageQueue* queue); + /// creates private sessions, calls CreateExtSession(), should be called before StartInput() + bool CreatePrivateSessions(TBusMessageQueue* queue); - virtual void OnClientConnectionEvent(const TClientConnectionEvent& event); + virtual void OnClientConnectionEvent(const TClientConnectionEvent& event); - public: - /// entry point into module, first function to call - virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0; - - protected: - /// override this function to create destination session - virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0; - - public: - int GetModuleSessionInFlight() const; - - TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal(); - - protected: - TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString()); - TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString()); - TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config); - }; + public: + /// entry point into module, first function to call + virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0; + protected: + /// override this function to create destination session + virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0; + + public: + int GetModuleSessionInFlight() const; + + TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal(); + + protected: + TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString()); + TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString()); + TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config); + }; + } diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp index 7c38801d62..76171ba1d5 100644 --- a/library/cpp/messagebus/oldmodule/startsession.cpp +++ b/library/cpp/messagebus/oldmodule/startsession.cpp @@ -1,14 +1,14 @@ /////////////////////////////////////////////////////////// -/// \file +/// \file /// \brief Starter session implementation - + /// Starter session will generate emtpy message to insert /// into local session that are registered under same protocol - + /// Starter (will one day) automatically adjust number /// of message inflight to make sure that at least one of source /// sessions within message queue is at the limit (bottle neck) - + /// Maximum number of messages that starter will instert into /// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight @@ -19,46 +19,46 @@ #include <library/cpp/messagebus/ybus.h> namespace NBus { - void* TBusStarter::_starter(void* data) { - TBusStarter* pThis = static_cast<TBusStarter*>(data); - pThis->Starter(); - return nullptr; - } + void* TBusStarter::_starter(void* data) { + TBusStarter* pThis = static_cast<TBusStarter*>(data); + pThis->Starter(); + return nullptr; + } - TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config) - : Module(module) - , Config(config) - , StartThread(_starter, this) - , Exiting(false) - { - StartThread.Start(); - } + TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config) + : Module(module) + , Config(config) + , StartThread(_starter, this) + , Exiting(false) + { + StartThread.Start(); + } - TBusStarter::~TBusStarter() { - Shutdown(); - } + TBusStarter::~TBusStarter() { + Shutdown(); + } - void TBusStarter::Shutdown() { - { - TGuard<TMutex> g(ExitLock); - Exiting = true; - ExitSignal.Signal(); - } - StartThread.Join(); - } + void TBusStarter::Shutdown() { + { + TGuard<TMutex> g(ExitLock); + Exiting = true; + ExitSignal.Signal(); + } + StartThread.Join(); + } - void TBusStarter::Starter() { + void TBusStarter::Starter() { TGuard<TMutex> g(ExitLock); - while (!Exiting) { - TAutoPtr<TBusMessage> empty(new TBusMessage(0)); + while (!Exiting) { + TAutoPtr<TBusMessage> empty(new TBusMessage(0)); - EMessageStatus status = Module->StartJob(empty); + EMessageStatus status = Module->StartJob(empty); - if (Config.SendTimeout > 0) { - ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout)); - } else { - ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero()); - } + if (Config.SendTimeout > 0) { + ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout)); + } else { + ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero()); + } } } diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h index 5e26e7e1e5..afe25ac809 100644 --- a/library/cpp/messagebus/oldmodule/startsession.h +++ b/library/cpp/messagebus/oldmodule/startsession.h @@ -5,30 +5,30 @@ #include <util/system/thread.h> namespace NBus { - class TBusModule; - - class TBusStarter { - private: - TBusModule* Module; - TBusSessionConfig Config; - TThread StartThread; - bool Exiting; - TCondVar ExitSignal; - TMutex ExitLock; - - static void* _starter(void* data); - - void Starter(); - - TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) { - return ""; - } - - public: - TBusStarter(TBusModule* module, const TBusSessionConfig& config); - ~TBusStarter(); - - void Shutdown(); - }; - -} + class TBusModule; + + class TBusStarter { + private: + TBusModule* Module; + TBusSessionConfig Config; + TThread StartThread; + bool Exiting; + TCondVar ExitSignal; + TMutex ExitLock; + + static void* _starter(void* data); + + void Starter(); + + TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) { + return ""; + } + + public: + TBusStarter(TBusModule* module, const TBusSessionConfig& config); + ~TBusStarter(); + + void Shutdown(); + }; + +} |