diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/oldmodule/module.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.cpp')
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 1332 |
1 files changed, 666 insertions, 666 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index f423214aa3..24bd778799 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -16,680 +16,680 @@ using namespace NBus; using namespace NBus::NPrivate; namespace { - Y_POD_STATIC_THREAD(TBusJob*) - ThreadCurrentJob; - - struct TThreadCurrentJobGuard { - TBusJob* Prev; - - TThreadCurrentJobGuard(TBusJob* job) - : Prev(ThreadCurrentJob) - { - Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job); - ThreadCurrentJob = job; - } - ~TThreadCurrentJobGuard() { - ThreadCurrentJob = Prev; - } - }; - - void ClearState(NBus::TJobState* state) { - /// skip sendbacks handlers - if (state->Message != state->Reply) { - if (state->Message) { - delete state->Message; - state->Message = nullptr; - } - - if (state->Reply) { - delete state->Reply; - state->Reply = nullptr; - } - } - } - - void ClearJobStateVector(NBus::TJobStateVec* vec) { - Y_ASSERT(vec); - - for (auto& call : *vec) { - ClearState(&call); + Y_POD_STATIC_THREAD(TBusJob*) + ThreadCurrentJob; + + struct TThreadCurrentJobGuard { + TBusJob* Prev; + + TThreadCurrentJobGuard(TBusJob* job) + : Prev(ThreadCurrentJob) + { + Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job); + ThreadCurrentJob = job; + } + ~TThreadCurrentJobGuard() { + ThreadCurrentJob = Prev; + } + }; + + void ClearState(NBus::TJobState* state) { + /// skip sendbacks handlers + if (state->Message != state->Reply) { + if (state->Message) { + delete state->Message; + state->Message = nullptr; + } + + if (state->Reply) { + delete state->Reply; + state->Reply = nullptr; + } + } + } + + void ClearJobStateVector(NBus::TJobStateVec* vec) { + Y_ASSERT(vec); + + for (auto& call : *vec) { + ClearState(&call); } - vec->clear(); + vec->clear(); } } namespace NBus { - namespace NPrivate { - class TJobStorage { - }; - - struct TModuleClientHandler - : public IBusClientHandler { - TModuleClientHandler(TBusModuleImpl* module) - : Module(module) - { - } - - void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override; - void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; - void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override; - void OnClientConnectionEvent(const TClientConnectionEvent& event) override; - - TBusModuleImpl* const Module; - }; - - struct TModuleServerHandler - : public IBusServerHandler { - TModuleServerHandler(TBusModuleImpl* module) - : Module(module) - { - } - - void OnMessage(TOnMessageContext& msg) override; - - TBusModuleImpl* const Module; - }; - - struct TBusModuleImpl: public TBusModuleInternal { - TBusModule* const Module; - - TBusMessageQueue* Queue; - - TScheduler Scheduler; - - const char* const Name; - - typedef TList<TJobRunner*> TBusJobList; - /// jobs currently in-flight on this module - TBusJobList Jobs; - /// module level mutex - TMutex Lock; - TCondVar ShutdownCondVar; - TAtomic JobCount; - - enum EState { - CREATED, - RUNNING, - STOPPED, - }; - - TAtomic State; - TBusModuleConfig ModuleConfig; - TBusServerSessionPtr ExternalSession; - /// protocol for local proxy session - THolder<IBusClientHandler> ModuleClientHandler; - THolder<IBusServerHandler> ModuleServerHandler; - TVector<TSimpleSharedPtr<TBusStarter>> Starters; - - // Sessions must be destroyed before - // ModuleClientHandler / ModuleServerHandler - TVector<TBusClientSessionPtr> ClientSessions; - TVector<TBusServerSessionPtr> ServerSessions; - - TBusModuleImpl(TBusModule* module, const char* name) - : Module(module) - , Queue() - , Name(name) - , JobCount(0) - , State(CREATED) - , ExternalSession(nullptr) - , ModuleClientHandler(new TModuleClientHandler(this)) - , ModuleServerHandler(new TModuleServerHandler(this)) - { - } - - ~TBusModuleImpl() override { - // Shutdown cannot be called from destructor, - // because module has virtual methods. - Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor"); - - Scheduler.Stop(); - - while (!Jobs.empty()) { - DestroyJob(Jobs.front()); - } - Y_VERIFY(JobCount == 0, "state check"); - } - - void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&); - - void AddJob(TJobRunner* jobRunner); - - void DestroyJob(TJobRunner* job); - - /// terminate job on this message - void CancelJob(TBusJob* job, EMessageStatus status); - /// prints statuses of jobs - TString GetStatus(unsigned flags); - - size_t Size() const { - return AtomicGet(JobCount); - } - - void Shutdown(); - - TVector<TBusClientSessionPtr> GetClientSessionsInternal() override { - return ClientSessions; - } - - TVector<TBusServerSessionPtr> GetServerSessionsInternal() override { - return ServerSessions; - } - - TBusMessageQueue* GetQueue() override { - return Queue; - } - - TString GetNameInternal() override { - return Name; - } - - TString GetStatusSingleLine() override { - TStringStream ss; - ss << "jobs: " << Size(); - return ss.Str(); - } - - void OnClientConnectionEvent(const TClientConnectionEvent& event) { - Module->OnClientConnectionEvent(event); - } - }; - - struct TJobResponseMessage { - TBusMessage* Request; - TBusMessage* Response; - EMessageStatus Status; - - TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status) - : Request(request) - , Response(response) - , Status(status) - { - } - }; - - struct TJobRunner: public TAtomicRefCount<TJobRunner>, - public NActor::TActor<TJobRunner>, - public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, - public TScheduleActor<TJobRunner> { - THolder<TBusJob> Job; - - TList<TJobRunner*>::iterator JobStorageIterator; - - TJobRunner(TAutoPtr<TBusJob> job) - : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor()) - , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler) - , Job(job.Release()) - , JobStorageIterator() - { - Job->Runner = this; - } - - ~TJobRunner() override { - Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator()); - } - - void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) { - Job->CallReplyHandler(message.Status, message.Request, message.Response); - } - - void Destroy() { - if (!!Job->OnMessageContext) { - if (!Job->ReplySent) { - Job->OnMessageContext.ForgetRequest(); - } - } - Job->ModuleImpl->DestroyJob(this); + namespace NPrivate { + class TJobStorage { + }; + + struct TModuleClientHandler + : public IBusClientHandler { + TModuleClientHandler(TBusModuleImpl* module) + : Module(module) + { } - void Act(NActor::TDefaultTag) { - if (JobStorageIterator == TList<TJobRunner*>::iterator()) { - return; - } - - if (Job->SleepUntil != 0) { - if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) { - Destroy(); - return; - } - } - - TThreadCurrentJobGuard g(Job.Get()); - - NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll(); - - if (Alarm.FetchTask()) { - if (Job->AnyPendingToSend()) { - Y_ASSERT(Job->SleepUntil == 0); - Job->SendPending(); - if (Job->AnyPendingToSend()) { - } - } else { - // regular alarm - Y_ASSERT(Job->Pending.empty()); - Y_ASSERT(Job->SleepUntil != 0); - Job->SleepUntil = 0; - } + void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override; + void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; + void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override; + void OnClientConnectionEvent(const TClientConnectionEvent& event) override; + + TBusModuleImpl* const Module; + }; + + struct TModuleServerHandler + : public IBusServerHandler { + TModuleServerHandler(TBusModuleImpl* module) + : Module(module) + { + } + + void OnMessage(TOnMessageContext& msg) override; + + TBusModuleImpl* const Module; + }; + + struct TBusModuleImpl: public TBusModuleInternal { + TBusModule* const Module; + + TBusMessageQueue* Queue; + + TScheduler Scheduler; + + const char* const Name; + + typedef TList<TJobRunner*> TBusJobList; + /// jobs currently in-flight on this module + TBusJobList Jobs; + /// module level mutex + TMutex Lock; + TCondVar ShutdownCondVar; + TAtomic JobCount; + + enum EState { + CREATED, + RUNNING, + STOPPED, + }; + + TAtomic State; + TBusModuleConfig ModuleConfig; + TBusServerSessionPtr ExternalSession; + /// protocol for local proxy session + THolder<IBusClientHandler> ModuleClientHandler; + THolder<IBusServerHandler> ModuleServerHandler; + TVector<TSimpleSharedPtr<TBusStarter>> Starters; + + // Sessions must be destroyed before + // ModuleClientHandler / ModuleServerHandler + TVector<TBusClientSessionPtr> ClientSessions; + TVector<TBusServerSessionPtr> ServerSessions; + + TBusModuleImpl(TBusModule* module, const char* name) + : Module(module) + , Queue() + , Name(name) + , JobCount(0) + , State(CREATED) + , ExternalSession(nullptr) + , ModuleClientHandler(new TModuleClientHandler(this)) + , ModuleServerHandler(new TModuleServerHandler(this)) + { + } + + ~TBusModuleImpl() override { + // Shutdown cannot be called from destructor, + // because module has virtual methods. + Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor"); + + Scheduler.Stop(); + + while (!Jobs.empty()) { + DestroyJob(Jobs.front()); } + Y_VERIFY(JobCount == 0, "state check"); + } - for (;;) { - if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) { - TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); + void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&); - Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message); - } + void AddJob(TJobRunner* jobRunner); - if (Job->SleepUntil != 0) { - ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil)); - return; - } + void DestroyJob(TJobRunner* job); - Job->SendPending(); + /// terminate job on this message + void CancelJob(TBusJob* job, EMessageStatus status); + /// prints statuses of jobs + TString GetStatus(unsigned flags); - if (Job->AnyPendingToSend()) { - ScheduleAt(TInstant::Now() + TDuration::Seconds(1)); - return; - } + size_t Size() const { + return AtomicGet(JobCount); + } - if (!Job->Pending.empty()) { - // waiting replies - return; - } + void Shutdown(); - if (Job->IsDone()) { - Destroy(); - return; - } - } + TVector<TBusClientSessionPtr> GetClientSessionsInternal() override { + return ClientSessions; } - }; - - } - - static inline TJobRunner* GetJob(TBusMessage* message) { - return (TJobRunner*)message->Data; - } - - static inline void SetJob(TBusMessage* message, TJobRunner* job) { - message->Data = job; - } - - TBusJob::TBusJob(TBusModule* module, TBusMessage* message) - : Status(MESSAGE_OK) - , Runner() - , Message(message) - , ReplySent(false) - , Module(module) - , ModuleImpl(module->Impl.Get()) - , SleepUntil(0) - { - Handler = TJobHandler(&TBusModule::Start); - } - - TBusJob::~TBusJob() { - Y_ASSERT(Pending.size() == 0); - //Y_ASSERT(SleepUntil == 0); - - ClearAllMessageStates(); - } - - TNetAddr TBusJob::GetPeerAddrNetAddr() const { - Y_VERIFY(!!OnMessageContext); - return OnMessageContext.GetPeerAddrNetAddr(); - } - - void TBusJob::CheckThreadCurrentJob() { - Y_ASSERT(ThreadCurrentJob == this); - } - - ///////////////////////////////////////////////////////// - /// \brief Send messages in pending list - - /// If at least one message is gone return true - /// If message has not been send, move it to Finished with appropriate error code - bool TBusJob::SendPending() { - // Iterator type must be size_t, not vector::iterator, - // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector, - // that in turn invalidates iterator. - // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending` - // (not erases, and not inserts) so iteration by index is valid. - size_t it = 0; - while (it != Pending.size()) { - TJobState& call = Pending[it]; - - if (call.Status == MESSAGE_DONT_ASK) { - EMessageStatus getAddressStatus = MESSAGE_OK; - TNetAddr addr; - if (call.UseAddr) { - addr = call.Addr; - } else { - getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr); - } - - if (getAddressStatus == MESSAGE_OK) { - // hold extra reference for each request in flight - Runner->Ref(); - - if (call.OneWay) { - call.Status = call.Session->SendMessageOneWay(call.Message, &addr); - } else { - call.Status = call.Session->SendMessage(call.Message, &addr); - } - - if (call.Status != MESSAGE_OK) { - Runner->UnRef(); - } + TVector<TBusServerSessionPtr> GetServerSessionsInternal() override { + return ServerSessions; + } + + TBusMessageQueue* GetQueue() override { + return Queue; + } + + TString GetNameInternal() override { + return Name; + } + + TString GetStatusSingleLine() override { + TStringStream ss; + ss << "jobs: " << Size(); + return ss.Str(); + } + + void OnClientConnectionEvent(const TClientConnectionEvent& event) { + Module->OnClientConnectionEvent(event); + } + }; + + struct TJobResponseMessage { + TBusMessage* Request; + TBusMessage* Response; + EMessageStatus Status; + + TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status) + : Request(request) + , Response(response) + , Status(status) + { + } + }; + + struct TJobRunner: public TAtomicRefCount<TJobRunner>, + public NActor::TActor<TJobRunner>, + public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, + public TScheduleActor<TJobRunner> { + THolder<TBusJob> Job; + + TList<TJobRunner*>::iterator JobStorageIterator; + + TJobRunner(TAutoPtr<TBusJob> job) + : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor()) + , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler) + , Job(job.Release()) + , JobStorageIterator() + { + Job->Runner = this; + } + + ~TJobRunner() override { + Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator()); + } + + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) { + Job->CallReplyHandler(message.Status, message.Request, message.Response); + } + + void Destroy() { + if (!!Job->OnMessageContext) { + if (!Job->ReplySent) { + Job->OnMessageContext.ForgetRequest(); + } + } + Job->ModuleImpl->DestroyJob(this); + } + + void Act(NActor::TDefaultTag) { + if (JobStorageIterator == TList<TJobRunner*>::iterator()) { + return; + } + + if (Job->SleepUntil != 0) { + if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) { + Destroy(); + return; + } + } + + TThreadCurrentJobGuard g(Job.Get()); + + NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll(); + + if (Alarm.FetchTask()) { + if (Job->AnyPendingToSend()) { + Y_ASSERT(Job->SleepUntil == 0); + Job->SendPending(); + if (Job->AnyPendingToSend()) { + } + } else { + // regular alarm + Y_ASSERT(Job->Pending.empty()); + Y_ASSERT(Job->SleepUntil != 0); + Job->SleepUntil = 0; + } + } + + for (;;) { + if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) { + TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); + + Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message); + } + + if (Job->SleepUntil != 0) { + ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil)); + return; + } + + Job->SendPending(); + + if (Job->AnyPendingToSend()) { + ScheduleAt(TInstant::Now() + TDuration::Seconds(1)); + return; + } + + if (!Job->Pending.empty()) { + // waiting replies + return; + } + + if (Job->IsDone()) { + Destroy(); + return; + } + } + } + }; + + } + + static inline TJobRunner* GetJob(TBusMessage* message) { + return (TJobRunner*)message->Data; + } + + static inline void SetJob(TBusMessage* message, TJobRunner* job) { + message->Data = job; + } + + TBusJob::TBusJob(TBusModule* module, TBusMessage* message) + : Status(MESSAGE_OK) + , Runner() + , Message(message) + , ReplySent(false) + , Module(module) + , ModuleImpl(module->Impl.Get()) + , SleepUntil(0) + { + Handler = TJobHandler(&TBusModule::Start); + } + + TBusJob::~TBusJob() { + Y_ASSERT(Pending.size() == 0); + //Y_ASSERT(SleepUntil == 0); + + ClearAllMessageStates(); + } + + TNetAddr TBusJob::GetPeerAddrNetAddr() const { + Y_VERIFY(!!OnMessageContext); + return OnMessageContext.GetPeerAddrNetAddr(); + } + + void TBusJob::CheckThreadCurrentJob() { + Y_ASSERT(ThreadCurrentJob == this); + } + + ///////////////////////////////////////////////////////// + /// \brief Send messages in pending list + + /// If at least one message is gone return true + /// If message has not been send, move it to Finished with appropriate error code + bool TBusJob::SendPending() { + // Iterator type must be size_t, not vector::iterator, + // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector, + // that in turn invalidates iterator. + // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending` + // (not erases, and not inserts) so iteration by index is valid. + size_t it = 0; + while (it != Pending.size()) { + TJobState& call = Pending[it]; + + if (call.Status == MESSAGE_DONT_ASK) { + EMessageStatus getAddressStatus = MESSAGE_OK; + TNetAddr addr; + if (call.UseAddr) { + addr = call.Addr; } else { - call.Status = getAddressStatus; + getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr); } - } - - if (call.Status == MESSAGE_OK) { - ++it; // keep pending list until we get reply - } else if (call.Status == MESSAGE_BUSY) { - Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight"); - } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) { - ++it; // try up to call.MaxRetries times to send message - call.NumRetries++; - DoCallReplyHandler(call); - call.Status = MESSAGE_DONT_ASK; - call.Message->Reset(); // generate new Id + + if (getAddressStatus == MESSAGE_OK) { + // hold extra reference for each request in flight + Runner->Ref(); + + if (call.OneWay) { + call.Status = call.Session->SendMessageOneWay(call.Message, &addr); + } else { + call.Status = call.Session->SendMessage(call.Message, &addr); + } + + if (call.Status != MESSAGE_OK) { + Runner->UnRef(); + } + + } else { + call.Status = getAddressStatus; + } + } + + if (call.Status == MESSAGE_OK) { + ++it; // keep pending list until we get reply + } else if (call.Status == MESSAGE_BUSY) { + Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight"); + } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) { + ++it; // try up to call.MaxRetries times to send message + call.NumRetries++; + DoCallReplyHandler(call); + call.Status = MESSAGE_DONT_ASK; + call.Message->Reset(); // generate new Id } else { - Finished.push_back(call); - DoCallReplyHandler(call); - Pending.erase(Pending.begin() + it); + Finished.push_back(call); + DoCallReplyHandler(call); + Pending.erase(Pending.begin() + it); } } - return Pending.size() > 0; - } - - bool TBusJob::AnyPendingToSend() { - for (unsigned i = 0; i < Pending.size(); ++i) { - if (Pending[i].Status == MESSAGE_DONT_ASK) { - return true; - } + return Pending.size() > 0; + } + + bool TBusJob::AnyPendingToSend() { + for (unsigned i = 0; i < Pending.size(); ++i) { + if (Pending[i].Status == MESSAGE_DONT_ASK) { + return true; + } } - - return false; + + return false; } - bool TBusJob::IsDone() { - bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); - return r; + bool TBusJob::IsDone() { + bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); + return r; + } + + void TBusJob::CallJobHandlerOnly() { + TThreadCurrentJobGuard threadCurrentJobGuard(this); + TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); + + Handler = Handler(ModuleImpl->Module, this, Message); } - void TBusJob::CallJobHandlerOnly() { - TThreadCurrentJobGuard threadCurrentJobGuard(this); - TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)"); + bool TBusJob::CallJobHandler() { + /// go on as far as we can go without waiting + while (!IsDone()) { + /// call the handler + CallJobHandlerOnly(); - Handler = Handler(ModuleImpl->Module, this, Message); - } + /// quit if job is canceled + if (Status != MESSAGE_OK) { + break; + } - bool TBusJob::CallJobHandler() { - /// go on as far as we can go without waiting - while (!IsDone()) { - /// call the handler - CallJobHandlerOnly(); - - /// quit if job is canceled - if (Status != MESSAGE_OK) { - break; - } + /// there are messages to send and wait for reply + SendPending(); - /// there are messages to send and wait for reply - SendPending(); + if (!Pending.empty()) { + break; + } - if (!Pending.empty()) { - break; - } + /// asked to sleep + if (SleepUntil) { + break; + } + } - /// asked to sleep - if (SleepUntil) { - break; - } - } + Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent), + "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d", + Message->GetHeader()->Id, Message->GetHeader()->Type); - Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent), - "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d", - Message->GetHeader()->Id, Message->GetHeader()->Type); + return IsDone(); + } - return IsDone(); - } + void TBusJob::DoCallReplyHandler(TJobState& call) { + if (call.Handler) { + TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)"); - void TBusJob::DoCallReplyHandler(TJobState& call) { - if (call.Handler) { - TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)"); + TThreadCurrentJobGuard threadCurrentJobGuard(this); + (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); + } + } - TThreadCurrentJobGuard threadCurrentJobGuard(this); - (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); + int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + /// find handler for given message and update it's status + size_t i = 0; + for (; i < Pending.size(); ++i) { + TJobState& call = Pending[i]; + if (call.Message == mess) { + break; + } } - } - - int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { - /// find handler for given message and update it's status - size_t i = 0; - for (; i < Pending.size(); ++i) { - TJobState& call = Pending[i]; - if (call.Message == mess) { - break; - } + + /// if not found, report error + if (i == Pending.size()) { + Y_FAIL("must not happen"); } - /// if not found, report error - if (i == Pending.size()) { - Y_FAIL("must not happen"); - } - - /// fill in response into job state - TJobState& call = Pending[i]; - call.Status = status; - Y_ASSERT(call.Message == mess); - call.Reply = reply; - - if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { - call.NumRetries++; - call.Status = MESSAGE_DONT_ASK; - call.Message->Reset(); // generate new Id - DoCallReplyHandler(call); - return 0; + /// fill in response into job state + TJobState& call = Pending[i]; + call.Status = status; + Y_ASSERT(call.Message == mess); + call.Reply = reply; + + if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { + call.NumRetries++; + call.Status = MESSAGE_DONT_ASK; + call.Message->Reset(); // generate new Id + DoCallReplyHandler(call); + return 0; } - /// call the handler if provided - DoCallReplyHandler(call); + /// call the handler if provided + DoCallReplyHandler(call); - /// move job state into the finished stack - Finished.push_back(Pending[i]); - Pending.erase(Pending.begin() + i); + /// move job state into the finished stack + Finished.push_back(Pending[i]); + Pending.erase(Pending.begin() + i); return 0; } - /////////////////////////////////////////////////////////////// - /// send message to any other session or application - void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { - CheckThreadCurrentJob(); + /////////////////////////////////////////////////////////////// + /// send message to any other session or application + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { + CheckThreadCurrentJob(); - SetJob(mess.Get(), Runner); - Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); - } + SetJob(mess.Get(), Runner); + Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); + } - void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { - CheckThreadCurrentJob(); + void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { + CheckThreadCurrentJob(); - SetJob(mess.Get(), Runner); - Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); - } + SetJob(mess.Get(), Runner); + Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); + } - void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { - CheckThreadCurrentJob(); + void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { + CheckThreadCurrentJob(); - SetJob(req.Get(), Runner); - Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); - } + SetJob(req.Get(), Runner); + Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); + } - void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) { - CheckThreadCurrentJob(); + void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) { + CheckThreadCurrentJob(); - SetJob(req.Get(), Runner); - Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true)); - } + SetJob(req.Get(), Runner); + Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true)); + } - /////////////////////////////////////////////////////////////// - /// send reply to the starter message - void TBusJob::SendReply(TBusMessageAutoPtr reply) { - CheckThreadCurrentJob(); + /////////////////////////////////////////////////////////////// + /// send reply to the starter message + void TBusJob::SendReply(TBusMessageAutoPtr reply) { + CheckThreadCurrentJob(); - Y_VERIFY(!ReplySent, "cannot call SendReply twice"); - ReplySent = true; - if (!OnMessageContext) - return; + Y_VERIFY(!ReplySent, "cannot call SendReply twice"); + ReplySent = true; + if (!OnMessageContext) + return; - EMessageStatus ok = OnMessageContext.SendReplyMove(reply); - if (ok != MESSAGE_OK) { - // TODO: count errors - } - } + EMessageStatus ok = OnMessageContext.SendReplyMove(reply); + if (ok != MESSAGE_OK) { + // TODO: count errors + } + } - /// set the flag to terminate job at the earliest convenience - void TBusJob::Cancel(EMessageStatus status) { - CheckThreadCurrentJob(); + /// set the flag to terminate job at the earliest convenience + void TBusJob::Cancel(EMessageStatus status) { + CheckThreadCurrentJob(); - Status = status; + Status = status; } - void TBusJob::ClearState(TJobState& call) { - TJobStateVec::iterator it; - for (it = Finished.begin(); it != Finished.end(); ++it) { - TJobState& state = *it; - if (&call == &state) { - ::ClearState(&call); - Finished.erase(it); - return; - } + void TBusJob::ClearState(TJobState& call) { + TJobStateVec::iterator it; + for (it = Finished.begin(); it != Finished.end(); ++it) { + TJobState& state = *it; + if (&call == &state) { + ::ClearState(&call); + Finished.erase(it); + return; + } } - Y_ASSERT(0); + Y_ASSERT(0); } - void TBusJob::ClearAllMessageStates() { - ClearJobStateVector(&Finished); - ClearJobStateVector(&Pending); - } + void TBusJob::ClearAllMessageStates() { + ClearJobStateVector(&Finished); + ClearJobStateVector(&Pending); + } - void TBusJob::Sleep(int milliSeconds) { - CheckThreadCurrentJob(); + void TBusJob::Sleep(int milliSeconds) { + CheckThreadCurrentJob(); - Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job"); - Y_VERIFY(SleepUntil == 0, "must not override sleep"); + Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job"); + Y_VERIFY(SleepUntil == 0, "must not override sleep"); - SleepUntil = Now() + milliSeconds; - } + SleepUntil = Now() + milliSeconds; + } - TString TBusJob::GetStatus(unsigned flags) { - TString strReturn; - strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", - Message->GetHeader()->Id, - (int)Message->GetHeader()->Type, - (int)(Now() - Message->GetHeader()->SendTime) / 1000, - (int)Pending.size(), - (int)Finished.size(), + TString TBusJob::GetStatus(unsigned flags) { + TString strReturn; + strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", + Message->GetHeader()->Id, + (int)Message->GetHeader()->Type, + (int)(Now() - Message->GetHeader()->SendTime) / 1000, + (int)Pending.size(), + (int)Finished.size(), Status != MESSAGE_OK ? ToString(Status).data() : ""); - - TJobStateVec::iterator it; - for (it = Pending.begin(); it != Pending.end(); ++it) { - TJobState& call = *it; - strReturn += call.GetStatus(flags); - } - return strReturn; - } - - TString TJobState::GetStatus(unsigned flags) { - Y_UNUSED(flags); - TString strReturn; - strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n", - Message->GetHeader()->Id, - (int)Message->GetHeader()->Type, - Session->GetProto()->GetService(), - (int)(Now() - Message->GetHeader()->SendTime) / 1000, + + TJobStateVec::iterator it; + for (it = Pending.begin(); it != Pending.end(); ++it) { + TJobState& call = *it; + strReturn += call.GetStatus(flags); + } + return strReturn; + } + + TString TJobState::GetStatus(unsigned flags) { + Y_UNUSED(flags); + TString strReturn; + strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n", + Message->GetHeader()->Id, + (int)Message->GetHeader()->Type, + Session->GetProto()->GetService(), + (int)(Now() - Message->GetHeader()->SendTime) / 1000, ToString(Status).data()); - return strReturn; - } - - ////////////////////////////////////////////////////////////////////// - - void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) { - TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob"); - if (job) { - job->Cancel(status); - } - } - - TString TBusModuleImpl::GetStatus(unsigned flags) { - Y_UNUSED(flags); - TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); - TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size()); - for (auto job : Jobs) { - //strReturn += job->Job->GetStatus(flags); - Y_UNUSED(job); - strReturn += "TODO\n"; - } - return strReturn; - } - - TBusModuleConfig::TBusModuleConfig() - : StarterMaxInFlight(1000) - { - } - - TBusModuleConfig::TSecret::TSecret() - : SchedulePeriod(TDuration::Seconds(1)) - { - } - - TBusModule::TBusModule(const char* name) - : Impl(new TBusModuleImpl(this, name)) - { - } - - TBusModule::~TBusModule() { - } - - const char* TBusModule::GetName() const { - return Impl->Name; - } - - void TBusModule::SetConfig(const TBusModuleConfig& config) { - Impl->ModuleConfig = config; - } - - bool TBusModule::StartInput() { - Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check"); - Y_VERIFY(!!Impl->Queue, "state check"); - Impl->State = TBusModuleImpl::RUNNING; - - Y_ASSERT(!Impl->ExternalSession); - TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue); - if (extSession != nullptr) { - Impl->ExternalSession = extSession; - } - - return true; - } - - bool TBusModule::Shutdown() { - Impl->Shutdown(); - - return true; - } - - TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) { - TBusJob* job = new TBusJob(this, message); - return job; - } - - /** + return strReturn; + } + + ////////////////////////////////////////////////////////////////////// + + void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) { + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob"); + if (job) { + job->Cancel(status); + } + } + + TString TBusModuleImpl::GetStatus(unsigned flags) { + Y_UNUSED(flags); + TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); + TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size()); + for (auto job : Jobs) { + //strReturn += job->Job->GetStatus(flags); + Y_UNUSED(job); + strReturn += "TODO\n"; + } + return strReturn; + } + + TBusModuleConfig::TBusModuleConfig() + : StarterMaxInFlight(1000) + { + } + + TBusModuleConfig::TSecret::TSecret() + : SchedulePeriod(TDuration::Seconds(1)) + { + } + + TBusModule::TBusModule(const char* name) + : Impl(new TBusModuleImpl(this, name)) + { + } + + TBusModule::~TBusModule() { + } + + const char* TBusModule::GetName() const { + return Impl->Name; + } + + void TBusModule::SetConfig(const TBusModuleConfig& config) { + Impl->ModuleConfig = config; + } + + bool TBusModule::StartInput() { + Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check"); + Y_VERIFY(!!Impl->Queue, "state check"); + Impl->State = TBusModuleImpl::RUNNING; + + Y_ASSERT(!Impl->ExternalSession); + TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue); + if (extSession != nullptr) { + Impl->ExternalSession = extSession; + } + + return true; + } + + bool TBusModule::Shutdown() { + Impl->Shutdown(); + + return true; + } + + TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) { + TBusJob* job = new TBusJob(this, message); + return job; + } + + /** Example for external session creation: TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { @@ -698,77 +698,77 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { return session; */ - bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) { - Impl->Queue = queue; - return true; - } - - int TBusModule::GetModuleSessionInFlight() const { - return Impl->Size(); - } - - TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() { - return Impl.Get(); - } - - TBusServerSessionPtr TBusModule::CreateDefaultDestination( - TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) { - TBusServerSessionConfig patchedConfig = config; - patchedConfig.ExecuteOnMessageInWorkerPool = false; - if (!patchedConfig.Name) { - patchedConfig.Name = name; - } - if (!patchedConfig.Name) { - patchedConfig.Name = Impl->Name; - } - TBusServerSessionPtr session = - TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue); - Impl->ServerSessions.push_back(session); - return session; - } - - TBusClientSessionPtr TBusModule::CreateDefaultSource( - TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) { - TBusClientSessionConfig patchedConfig = config; - patchedConfig.ExecuteOnReplyInWorkerPool = false; - if (!patchedConfig.Name) { - patchedConfig.Name = name; - } - if (!patchedConfig.Name) { - patchedConfig.Name = Impl->Name; - } - TBusClientSessionPtr session = - TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue); - Impl->ClientSessions.push_back(session); - return session; - } - - TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) { - TBusStarter* session = new TBusStarter(this, config); - Impl->Starters.push_back(session); - return session; - } - - void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) { - Y_UNUSED(event); - } - - TString TBusModule::GetStatus(unsigned flags) { - TString strReturn = Sprintf("%s\n", Impl->Name); - strReturn += Impl->GetStatus(flags); - return strReturn; - } + bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) { + Impl->Queue = queue; + return true; + } + + int TBusModule::GetModuleSessionInFlight() const { + return Impl->Size(); + } + + TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() { + return Impl.Get(); + } + + TBusServerSessionPtr TBusModule::CreateDefaultDestination( + TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) { + TBusServerSessionConfig patchedConfig = config; + patchedConfig.ExecuteOnMessageInWorkerPool = false; + if (!patchedConfig.Name) { + patchedConfig.Name = name; + } + if (!patchedConfig.Name) { + patchedConfig.Name = Impl->Name; + } + TBusServerSessionPtr session = + TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue); + Impl->ServerSessions.push_back(session); + return session; + } + + TBusClientSessionPtr TBusModule::CreateDefaultSource( + TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) { + TBusClientSessionConfig patchedConfig = config; + patchedConfig.ExecuteOnReplyInWorkerPool = false; + if (!patchedConfig.Name) { + patchedConfig.Name = name; + } + if (!patchedConfig.Name) { + patchedConfig.Name = Impl->Name; + } + TBusClientSessionPtr session = + TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue); + Impl->ClientSessions.push_back(session); + return session; + } + + TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) { + TBusStarter* session = new TBusStarter(this, config); + Impl->Starters.push_back(session); + return session; + } + + void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) { + Y_UNUSED(event); + } + + TString TBusModule::GetStatus(unsigned flags) { + TString strReturn = Sprintf("%s\n", Impl->Name); + strReturn += Impl->GetStatus(flags); + return strReturn; + } } -void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { +void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob"); Jobs.push_back(jobRunner); jobRunner->JobStorageIterator = Jobs.end(); --jobRunner->JobStorageIterator; } -void TBusModuleImpl::DestroyJob(TJobRunner* job) { +void TBusModuleImpl::DestroyJob(TJobRunner* job) { Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator()); { @@ -837,7 +837,7 @@ EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) { Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING); Y_VERIFY(!!Impl->Queue); - if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) { + if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) { return MESSAGE_BUSY; } |