aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/oldmodule
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp1332
-rw-r--r--library/cpp/messagebus/oldmodule/module.h668
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.cpp74
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.h54
4 files changed, 1064 insertions, 1064 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 24bd778799..f423214aa3 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -16,680 +16,680 @@ using namespace NBus;
using namespace NBus::NPrivate;
namespace {
- Y_POD_STATIC_THREAD(TBusJob*)
- ThreadCurrentJob;
-
- struct TThreadCurrentJobGuard {
- TBusJob* Prev;
-
- TThreadCurrentJobGuard(TBusJob* job)
- : Prev(ThreadCurrentJob)
- {
- Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job);
- ThreadCurrentJob = job;
- }
- ~TThreadCurrentJobGuard() {
- ThreadCurrentJob = Prev;
- }
- };
-
- void ClearState(NBus::TJobState* state) {
- /// skip sendbacks handlers
- if (state->Message != state->Reply) {
- if (state->Message) {
- delete state->Message;
- state->Message = nullptr;
- }
-
- if (state->Reply) {
- delete state->Reply;
- state->Reply = nullptr;
- }
- }
- }
-
- void ClearJobStateVector(NBus::TJobStateVec* vec) {
- Y_ASSERT(vec);
-
- for (auto& call : *vec) {
- ClearState(&call);
+ Y_POD_STATIC_THREAD(TBusJob*)
+ ThreadCurrentJob;
+
+ struct TThreadCurrentJobGuard {
+ TBusJob* Prev;
+
+ TThreadCurrentJobGuard(TBusJob* job)
+ : Prev(ThreadCurrentJob)
+ {
+ Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job);
+ ThreadCurrentJob = job;
+ }
+ ~TThreadCurrentJobGuard() {
+ ThreadCurrentJob = Prev;
+ }
+ };
+
+ void ClearState(NBus::TJobState* state) {
+ /// skip sendbacks handlers
+ if (state->Message != state->Reply) {
+ if (state->Message) {
+ delete state->Message;
+ state->Message = nullptr;
+ }
+
+ if (state->Reply) {
+ delete state->Reply;
+ state->Reply = nullptr;
+ }
+ }
+ }
+
+ void ClearJobStateVector(NBus::TJobStateVec* vec) {
+ Y_ASSERT(vec);
+
+ for (auto& call : *vec) {
+ ClearState(&call);
}
- vec->clear();
+ vec->clear();
}
}
namespace NBus {
- namespace NPrivate {
- class TJobStorage {
- };
-
- struct TModuleClientHandler
- : public IBusClientHandler {
- TModuleClientHandler(TBusModuleImpl* module)
- : Module(module)
- {
+ namespace NPrivate {
+ class TJobStorage {
+ };
+
+ struct TModuleClientHandler
+ : public IBusClientHandler {
+ TModuleClientHandler(TBusModuleImpl* module)
+ : Module(module)
+ {
+ }
+
+ void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
+ void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
+ void OnClientConnectionEvent(const TClientConnectionEvent& event) override;
+
+ TBusModuleImpl* const Module;
+ };
+
+ struct TModuleServerHandler
+ : public IBusServerHandler {
+ TModuleServerHandler(TBusModuleImpl* module)
+ : Module(module)
+ {
+ }
+
+ void OnMessage(TOnMessageContext& msg) override;
+
+ TBusModuleImpl* const Module;
+ };
+
+ struct TBusModuleImpl: public TBusModuleInternal {
+ TBusModule* const Module;
+
+ TBusMessageQueue* Queue;
+
+ TScheduler Scheduler;
+
+ const char* const Name;
+
+ typedef TList<TJobRunner*> TBusJobList;
+ /// jobs currently in-flight on this module
+ TBusJobList Jobs;
+ /// module level mutex
+ TMutex Lock;
+ TCondVar ShutdownCondVar;
+ TAtomic JobCount;
+
+ enum EState {
+ CREATED,
+ RUNNING,
+ STOPPED,
+ };
+
+ TAtomic State;
+ TBusModuleConfig ModuleConfig;
+ TBusServerSessionPtr ExternalSession;
+ /// protocol for local proxy session
+ THolder<IBusClientHandler> ModuleClientHandler;
+ THolder<IBusServerHandler> ModuleServerHandler;
+ TVector<TSimpleSharedPtr<TBusStarter>> Starters;
+
+ // Sessions must be destroyed before
+ // ModuleClientHandler / ModuleServerHandler
+ TVector<TBusClientSessionPtr> ClientSessions;
+ TVector<TBusServerSessionPtr> ServerSessions;
+
+ TBusModuleImpl(TBusModule* module, const char* name)
+ : Module(module)
+ , Queue()
+ , Name(name)
+ , JobCount(0)
+ , State(CREATED)
+ , ExternalSession(nullptr)
+ , ModuleClientHandler(new TModuleClientHandler(this))
+ , ModuleServerHandler(new TModuleServerHandler(this))
+ {
+ }
+
+ ~TBusModuleImpl() override {
+ // Shutdown cannot be called from destructor,
+ // because module has virtual methods.
+ Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");
+
+ Scheduler.Stop();
+
+ while (!Jobs.empty()) {
+ DestroyJob(Jobs.front());
+ }
+ Y_VERIFY(JobCount == 0, "state check");
+ }
+
+ void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);
+
+ void AddJob(TJobRunner* jobRunner);
+
+ void DestroyJob(TJobRunner* job);
+
+ /// terminate job on this message
+ void CancelJob(TBusJob* job, EMessageStatus status);
+ /// prints statuses of jobs
+ TString GetStatus(unsigned flags);
+
+ size_t Size() const {
+ return AtomicGet(JobCount);
+ }
+
+ void Shutdown();
+
+ TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
+ return ClientSessions;
+ }
+
+ TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
+ return ServerSessions;
+ }
+
+ TBusMessageQueue* GetQueue() override {
+ return Queue;
+ }
+
+ TString GetNameInternal() override {
+ return Name;
+ }
+
+ TString GetStatusSingleLine() override {
+ TStringStream ss;
+ ss << "jobs: " << Size();
+ return ss.Str();
+ }
+
+ void OnClientConnectionEvent(const TClientConnectionEvent& event) {
+ Module->OnClientConnectionEvent(event);
+ }
+ };
+
+ struct TJobResponseMessage {
+ TBusMessage* Request;
+ TBusMessage* Response;
+ EMessageStatus Status;
+
+ TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
+ : Request(request)
+ , Response(response)
+ , Status(status)
+ {
+ }
+ };
+
+ struct TJobRunner: public TAtomicRefCount<TJobRunner>,
+ public NActor::TActor<TJobRunner>,
+ public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
+ public TScheduleActor<TJobRunner> {
+ THolder<TBusJob> Job;
+
+ TList<TJobRunner*>::iterator JobStorageIterator;
+
+ TJobRunner(TAutoPtr<TBusJob> job)
+ : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
+ , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
+ , Job(job.Release())
+ , JobStorageIterator()
+ {
+ Job->Runner = this;
+ }
+
+ ~TJobRunner() override {
+ Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
+ }
+
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
+ Job->CallReplyHandler(message.Status, message.Request, message.Response);
+ }
+
+ void Destroy() {
+ if (!!Job->OnMessageContext) {
+ if (!Job->ReplySent) {
+ Job->OnMessageContext.ForgetRequest();
+ }
+ }
+ Job->ModuleImpl->DestroyJob(this);
}
- void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
- void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
- void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
- void OnClientConnectionEvent(const TClientConnectionEvent& event) override;
-
- TBusModuleImpl* const Module;
- };
-
- struct TModuleServerHandler
- : public IBusServerHandler {
- TModuleServerHandler(TBusModuleImpl* module)
- : Module(module)
- {
- }
-
- void OnMessage(TOnMessageContext& msg) override;
-
- TBusModuleImpl* const Module;
- };
-
- struct TBusModuleImpl: public TBusModuleInternal {
- TBusModule* const Module;
-
- TBusMessageQueue* Queue;
-
- TScheduler Scheduler;
-
- const char* const Name;
-
- typedef TList<TJobRunner*> TBusJobList;
- /// jobs currently in-flight on this module
- TBusJobList Jobs;
- /// module level mutex
- TMutex Lock;
- TCondVar ShutdownCondVar;
- TAtomic JobCount;
-
- enum EState {
- CREATED,
- RUNNING,
- STOPPED,
- };
-
- TAtomic State;
- TBusModuleConfig ModuleConfig;
- TBusServerSessionPtr ExternalSession;
- /// protocol for local proxy session
- THolder<IBusClientHandler> ModuleClientHandler;
- THolder<IBusServerHandler> ModuleServerHandler;
- TVector<TSimpleSharedPtr<TBusStarter>> Starters;
-
- // Sessions must be destroyed before
- // ModuleClientHandler / ModuleServerHandler
- TVector<TBusClientSessionPtr> ClientSessions;
- TVector<TBusServerSessionPtr> ServerSessions;
-
- TBusModuleImpl(TBusModule* module, const char* name)
- : Module(module)
- , Queue()
- , Name(name)
- , JobCount(0)
- , State(CREATED)
- , ExternalSession(nullptr)
- , ModuleClientHandler(new TModuleClientHandler(this))
- , ModuleServerHandler(new TModuleServerHandler(this))
- {
- }
-
- ~TBusModuleImpl() override {
- // Shutdown cannot be called from destructor,
- // because module has virtual methods.
- Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");
-
- Scheduler.Stop();
-
- while (!Jobs.empty()) {
- DestroyJob(Jobs.front());
+ void Act(NActor::TDefaultTag) {
+ if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
+ return;
+ }
+
+ if (Job->SleepUntil != 0) {
+ if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
+ Destroy();
+ return;
+ }
+ }
+
+ TThreadCurrentJobGuard g(Job.Get());
+
+ NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();
+
+ if (Alarm.FetchTask()) {
+ if (Job->AnyPendingToSend()) {
+ Y_ASSERT(Job->SleepUntil == 0);
+ Job->SendPending();
+ if (Job->AnyPendingToSend()) {
+ }
+ } else {
+ // regular alarm
+ Y_ASSERT(Job->Pending.empty());
+ Y_ASSERT(Job->SleepUntil != 0);
+ Job->SleepUntil = 0;
+ }
}
- Y_VERIFY(JobCount == 0, "state check");
- }
- void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);
+ for (;;) {
+ if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
+ TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
- void AddJob(TJobRunner* jobRunner);
+ Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
+ }
- void DestroyJob(TJobRunner* job);
+ if (Job->SleepUntil != 0) {
+ ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
+ return;
+ }
- /// terminate job on this message
- void CancelJob(TBusJob* job, EMessageStatus status);
- /// prints statuses of jobs
- TString GetStatus(unsigned flags);
+ Job->SendPending();
- size_t Size() const {
- return AtomicGet(JobCount);
- }
+ if (Job->AnyPendingToSend()) {
+ ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
+ return;
+ }
- void Shutdown();
+ if (!Job->Pending.empty()) {
+ // waiting replies
+ return;
+ }
- TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
- return ClientSessions;
+ if (Job->IsDone()) {
+ Destroy();
+ return;
+ }
+ }
}
+ };
+
+ }
+
+ static inline TJobRunner* GetJob(TBusMessage* message) {
+ return (TJobRunner*)message->Data;
+ }
+
+ static inline void SetJob(TBusMessage* message, TJobRunner* job) {
+ message->Data = job;
+ }
+
+ TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
+ : Status(MESSAGE_OK)
+ , Runner()
+ , Message(message)
+ , ReplySent(false)
+ , Module(module)
+ , ModuleImpl(module->Impl.Get())
+ , SleepUntil(0)
+ {
+ Handler = TJobHandler(&TBusModule::Start);
+ }
+
+ TBusJob::~TBusJob() {
+ Y_ASSERT(Pending.size() == 0);
+ //Y_ASSERT(SleepUntil == 0);
+
+ ClearAllMessageStates();
+ }
+
+ TNetAddr TBusJob::GetPeerAddrNetAddr() const {
+ Y_VERIFY(!!OnMessageContext);
+ return OnMessageContext.GetPeerAddrNetAddr();
+ }
+
+ void TBusJob::CheckThreadCurrentJob() {
+ Y_ASSERT(ThreadCurrentJob == this);
+ }
+
+ /////////////////////////////////////////////////////////
+ /// \brief Send messages in pending list
+
+ /// If at least one message is gone return true
+ /// If message has not been send, move it to Finished with appropriate error code
+ bool TBusJob::SendPending() {
+ // Iterator type must be size_t, not vector::iterator,
+ // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector,
+ // that in turn invalidates iterator.
+ // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending`
+ // (not erases, and not inserts) so iteration by index is valid.
+ size_t it = 0;
+ while (it != Pending.size()) {
+ TJobState& call = Pending[it];
+
+ if (call.Status == MESSAGE_DONT_ASK) {
+ EMessageStatus getAddressStatus = MESSAGE_OK;
+ TNetAddr addr;
+ if (call.UseAddr) {
+ addr = call.Addr;
+ } else {
+ getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr);
+ }
+
+ if (getAddressStatus == MESSAGE_OK) {
+ // hold extra reference for each request in flight
+ Runner->Ref();
+
+ if (call.OneWay) {
+ call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
+ } else {
+ call.Status = call.Session->SendMessage(call.Message, &addr);
+ }
+
+ if (call.Status != MESSAGE_OK) {
+ Runner->UnRef();
+ }
- TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
- return ServerSessions;
- }
-
- TBusMessageQueue* GetQueue() override {
- return Queue;
- }
-
- TString GetNameInternal() override {
- return Name;
- }
-
- TString GetStatusSingleLine() override {
- TStringStream ss;
- ss << "jobs: " << Size();
- return ss.Str();
- }
-
- void OnClientConnectionEvent(const TClientConnectionEvent& event) {
- Module->OnClientConnectionEvent(event);
- }
- };
-
- struct TJobResponseMessage {
- TBusMessage* Request;
- TBusMessage* Response;
- EMessageStatus Status;
-
- TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
- : Request(request)
- , Response(response)
- , Status(status)
- {
- }
- };
-
- struct TJobRunner: public TAtomicRefCount<TJobRunner>,
- public NActor::TActor<TJobRunner>,
- public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
- public TScheduleActor<TJobRunner> {
- THolder<TBusJob> Job;
-
- TList<TJobRunner*>::iterator JobStorageIterator;
-
- TJobRunner(TAutoPtr<TBusJob> job)
- : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
- , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
- , Job(job.Release())
- , JobStorageIterator()
- {
- Job->Runner = this;
- }
-
- ~TJobRunner() override {
- Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
- }
-
- void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
- Job->CallReplyHandler(message.Status, message.Request, message.Response);
- }
-
- void Destroy() {
- if (!!Job->OnMessageContext) {
- if (!Job->ReplySent) {
- Job->OnMessageContext.ForgetRequest();
- }
- }
- Job->ModuleImpl->DestroyJob(this);
- }
-
- void Act(NActor::TDefaultTag) {
- if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
- return;
- }
-
- if (Job->SleepUntil != 0) {
- if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
- Destroy();
- return;
- }
- }
-
- TThreadCurrentJobGuard g(Job.Get());
-
- NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();
-
- if (Alarm.FetchTask()) {
- if (Job->AnyPendingToSend()) {
- Y_ASSERT(Job->SleepUntil == 0);
- Job->SendPending();
- if (Job->AnyPendingToSend()) {
- }
- } else {
- // regular alarm
- Y_ASSERT(Job->Pending.empty());
- Y_ASSERT(Job->SleepUntil != 0);
- Job->SleepUntil = 0;
- }
- }
-
- for (;;) {
- if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
- TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
-
- Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
- }
-
- if (Job->SleepUntil != 0) {
- ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
- return;
- }
-
- Job->SendPending();
-
- if (Job->AnyPendingToSend()) {
- ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
- return;
- }
-
- if (!Job->Pending.empty()) {
- // waiting replies
- return;
- }
-
- if (Job->IsDone()) {
- Destroy();
- return;
- }
- }
- }
- };
-
- }
-
- static inline TJobRunner* GetJob(TBusMessage* message) {
- return (TJobRunner*)message->Data;
- }
-
- static inline void SetJob(TBusMessage* message, TJobRunner* job) {
- message->Data = job;
- }
-
- TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
- : Status(MESSAGE_OK)
- , Runner()
- , Message(message)
- , ReplySent(false)
- , Module(module)
- , ModuleImpl(module->Impl.Get())
- , SleepUntil(0)
- {
- Handler = TJobHandler(&TBusModule::Start);
- }
-
- TBusJob::~TBusJob() {
- Y_ASSERT(Pending.size() == 0);
- //Y_ASSERT(SleepUntil == 0);
-
- ClearAllMessageStates();
- }
-
- TNetAddr TBusJob::GetPeerAddrNetAddr() const {
- Y_VERIFY(!!OnMessageContext);
- return OnMessageContext.GetPeerAddrNetAddr();
- }
-
- void TBusJob::CheckThreadCurrentJob() {
- Y_ASSERT(ThreadCurrentJob == this);
- }
-
- /////////////////////////////////////////////////////////
- /// \brief Send messages in pending list
-
- /// If at least one message is gone return true
- /// If message has not been send, move it to Finished with appropriate error code
- bool TBusJob::SendPending() {
- // Iterator type must be size_t, not vector::iterator,
- // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector,
- // that in turn invalidates iterator.
- // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending`
- // (not erases, and not inserts) so iteration by index is valid.
- size_t it = 0;
- while (it != Pending.size()) {
- TJobState& call = Pending[it];
-
- if (call.Status == MESSAGE_DONT_ASK) {
- EMessageStatus getAddressStatus = MESSAGE_OK;
- TNetAddr addr;
- if (call.UseAddr) {
- addr = call.Addr;
} else {
- getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr);
+ call.Status = getAddressStatus;
}
-
- if (getAddressStatus == MESSAGE_OK) {
- // hold extra reference for each request in flight
- Runner->Ref();
-
- if (call.OneWay) {
- call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
- } else {
- call.Status = call.Session->SendMessage(call.Message, &addr);
- }
-
- if (call.Status != MESSAGE_OK) {
- Runner->UnRef();
- }
-
- } else {
- call.Status = getAddressStatus;
- }
- }
-
- if (call.Status == MESSAGE_OK) {
- ++it; // keep pending list until we get reply
- } else if (call.Status == MESSAGE_BUSY) {
- Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight");
- } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) {
- ++it; // try up to call.MaxRetries times to send message
- call.NumRetries++;
- DoCallReplyHandler(call);
- call.Status = MESSAGE_DONT_ASK;
- call.Message->Reset(); // generate new Id
+ }
+
+ if (call.Status == MESSAGE_OK) {
+ ++it; // keep pending list until we get reply
+ } else if (call.Status == MESSAGE_BUSY) {
+ Y_FAIL("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight");
+ } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) {
+ ++it; // try up to call.MaxRetries times to send message
+ call.NumRetries++;
+ DoCallReplyHandler(call);
+ call.Status = MESSAGE_DONT_ASK;
+ call.Message->Reset(); // generate new Id
} else {
- Finished.push_back(call);
- DoCallReplyHandler(call);
- Pending.erase(Pending.begin() + it);
+ Finished.push_back(call);
+ DoCallReplyHandler(call);
+ Pending.erase(Pending.begin() + it);
}
}
- return Pending.size() > 0;
- }
-
- bool TBusJob::AnyPendingToSend() {
- for (unsigned i = 0; i < Pending.size(); ++i) {
- if (Pending[i].Status == MESSAGE_DONT_ASK) {
- return true;
- }
+ return Pending.size() > 0;
+ }
+
+ bool TBusJob::AnyPendingToSend() {
+ for (unsigned i = 0; i < Pending.size(); ++i) {
+ if (Pending[i].Status == MESSAGE_DONT_ASK) {
+ return true;
+ }
}
-
- return false;
+
+ return false;
}
- bool TBusJob::IsDone() {
- bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
- return r;
- }
-
- void TBusJob::CallJobHandlerOnly() {
- TThreadCurrentJobGuard threadCurrentJobGuard(this);
- TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
-
- Handler = Handler(ModuleImpl->Module, this, Message);
+ bool TBusJob::IsDone() {
+ bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
+ return r;
}
- bool TBusJob::CallJobHandler() {
- /// go on as far as we can go without waiting
- while (!IsDone()) {
- /// call the handler
- CallJobHandlerOnly();
+ void TBusJob::CallJobHandlerOnly() {
+ TThreadCurrentJobGuard threadCurrentJobGuard(this);
+ TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
- /// quit if job is canceled
- if (Status != MESSAGE_OK) {
- break;
- }
+ Handler = Handler(ModuleImpl->Module, this, Message);
+ }
- /// there are messages to send and wait for reply
- SendPending();
+ bool TBusJob::CallJobHandler() {
+ /// go on as far as we can go without waiting
+ while (!IsDone()) {
+ /// call the handler
+ CallJobHandlerOnly();
+
+ /// quit if job is canceled
+ if (Status != MESSAGE_OK) {
+ break;
+ }
- if (!Pending.empty()) {
- break;
- }
+ /// there are messages to send and wait for reply
+ SendPending();
- /// asked to sleep
- if (SleepUntil) {
- break;
- }
- }
+ if (!Pending.empty()) {
+ break;
+ }
- Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
- "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
- Message->GetHeader()->Id, Message->GetHeader()->Type);
+ /// asked to sleep
+ if (SleepUntil) {
+ break;
+ }
+ }
- return IsDone();
- }
+ Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
+ "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
+ Message->GetHeader()->Id, Message->GetHeader()->Type);
- void TBusJob::DoCallReplyHandler(TJobState& call) {
- if (call.Handler) {
- TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");
+ return IsDone();
+ }
- TThreadCurrentJobGuard threadCurrentJobGuard(this);
- (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
- }
- }
+ void TBusJob::DoCallReplyHandler(TJobState& call) {
+ if (call.Handler) {
+ TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");
- int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
- /// find handler for given message and update it's status
- size_t i = 0;
- for (; i < Pending.size(); ++i) {
- TJobState& call = Pending[i];
- if (call.Message == mess) {
- break;
- }
+ TThreadCurrentJobGuard threadCurrentJobGuard(this);
+ (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
}
-
- /// if not found, report error
- if (i == Pending.size()) {
- Y_FAIL("must not happen");
+ }
+
+ int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
+ /// find handler for given message and update it's status
+ size_t i = 0;
+ for (; i < Pending.size(); ++i) {
+ TJobState& call = Pending[i];
+ if (call.Message == mess) {
+ break;
+ }
}
- /// fill in response into job state
- TJobState& call = Pending[i];
- call.Status = status;
- Y_ASSERT(call.Message == mess);
- call.Reply = reply;
-
- if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
- call.NumRetries++;
- call.Status = MESSAGE_DONT_ASK;
- call.Message->Reset(); // generate new Id
- DoCallReplyHandler(call);
- return 0;
+ /// if not found, report error
+ if (i == Pending.size()) {
+ Y_FAIL("must not happen");
+ }
+
+ /// fill in response into job state
+ TJobState& call = Pending[i];
+ call.Status = status;
+ Y_ASSERT(call.Message == mess);
+ call.Reply = reply;
+
+ if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
+ call.NumRetries++;
+ call.Status = MESSAGE_DONT_ASK;
+ call.Message->Reset(); // generate new Id
+ DoCallReplyHandler(call);
+ return 0;
}
- /// call the handler if provided
- DoCallReplyHandler(call);
+ /// call the handler if provided
+ DoCallReplyHandler(call);
- /// move job state into the finished stack
- Finished.push_back(Pending[i]);
- Pending.erase(Pending.begin() + i);
+ /// move job state into the finished stack
+ Finished.push_back(Pending[i]);
+ Pending.erase(Pending.begin() + i);
return 0;
}
- ///////////////////////////////////////////////////////////////
- /// send message to any other session or application
- void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
- CheckThreadCurrentJob();
+ ///////////////////////////////////////////////////////////////
+ /// send message to any other session or application
+ void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
+ CheckThreadCurrentJob();
- SetJob(mess.Get(), Runner);
- Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
- }
+ SetJob(mess.Get(), Runner);
+ Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
+ }
- void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
- CheckThreadCurrentJob();
+ void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
+ CheckThreadCurrentJob();
- SetJob(mess.Get(), Runner);
- Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
- }
+ SetJob(mess.Get(), Runner);
+ Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
+ }
- void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
- CheckThreadCurrentJob();
+ void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
+ CheckThreadCurrentJob();
- SetJob(req.Get(), Runner);
- Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
- }
+ SetJob(req.Get(), Runner);
+ Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
+ }
- void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
- CheckThreadCurrentJob();
+ void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
+ CheckThreadCurrentJob();
- SetJob(req.Get(), Runner);
- Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
- }
+ SetJob(req.Get(), Runner);
+ Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
+ }
- ///////////////////////////////////////////////////////////////
- /// send reply to the starter message
- void TBusJob::SendReply(TBusMessageAutoPtr reply) {
- CheckThreadCurrentJob();
+ ///////////////////////////////////////////////////////////////
+ /// send reply to the starter message
+ void TBusJob::SendReply(TBusMessageAutoPtr reply) {
+ CheckThreadCurrentJob();
- Y_VERIFY(!ReplySent, "cannot call SendReply twice");
- ReplySent = true;
- if (!OnMessageContext)
- return;
+ Y_VERIFY(!ReplySent, "cannot call SendReply twice");
+ ReplySent = true;
+ if (!OnMessageContext)
+ return;
- EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
- if (ok != MESSAGE_OK) {
- // TODO: count errors
- }
- }
+ EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
+ if (ok != MESSAGE_OK) {
+ // TODO: count errors
+ }
+ }
- /// set the flag to terminate job at the earliest convenience
- void TBusJob::Cancel(EMessageStatus status) {
- CheckThreadCurrentJob();
+ /// set the flag to terminate job at the earliest convenience
+ void TBusJob::Cancel(EMessageStatus status) {
+ CheckThreadCurrentJob();
- Status = status;
+ Status = status;
}
- void TBusJob::ClearState(TJobState& call) {
- TJobStateVec::iterator it;
- for (it = Finished.begin(); it != Finished.end(); ++it) {
- TJobState& state = *it;
- if (&call == &state) {
- ::ClearState(&call);
- Finished.erase(it);
- return;
- }
+ void TBusJob::ClearState(TJobState& call) {
+ TJobStateVec::iterator it;
+ for (it = Finished.begin(); it != Finished.end(); ++it) {
+ TJobState& state = *it;
+ if (&call == &state) {
+ ::ClearState(&call);
+ Finished.erase(it);
+ return;
+ }
}
- Y_ASSERT(0);
+ Y_ASSERT(0);
}
- void TBusJob::ClearAllMessageStates() {
- ClearJobStateVector(&Finished);
- ClearJobStateVector(&Pending);
- }
+ void TBusJob::ClearAllMessageStates() {
+ ClearJobStateVector(&Finished);
+ ClearJobStateVector(&Pending);
+ }
- void TBusJob::Sleep(int milliSeconds) {
- CheckThreadCurrentJob();
+ void TBusJob::Sleep(int milliSeconds) {
+ CheckThreadCurrentJob();
- Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job");
- Y_VERIFY(SleepUntil == 0, "must not override sleep");
+ Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job");
+ Y_VERIFY(SleepUntil == 0, "must not override sleep");
- SleepUntil = Now() + milliSeconds;
- }
+ SleepUntil = Now() + milliSeconds;
+ }
- TString TBusJob::GetStatus(unsigned flags) {
- TString strReturn;
- strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
- Message->GetHeader()->Id,
- (int)Message->GetHeader()->Type,
- (int)(Now() - Message->GetHeader()->SendTime) / 1000,
- (int)Pending.size(),
- (int)Finished.size(),
+ TString TBusJob::GetStatus(unsigned flags) {
+ TString strReturn;
+ strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
+ Message->GetHeader()->Id,
+ (int)Message->GetHeader()->Type,
+ (int)(Now() - Message->GetHeader()->SendTime) / 1000,
+ (int)Pending.size(),
+ (int)Finished.size(),
Status != MESSAGE_OK ? ToString(Status).data() : "");
-
- TJobStateVec::iterator it;
- for (it = Pending.begin(); it != Pending.end(); ++it) {
- TJobState& call = *it;
- strReturn += call.GetStatus(flags);
- }
- return strReturn;
- }
-
- TString TJobState::GetStatus(unsigned flags) {
- Y_UNUSED(flags);
- TString strReturn;
- strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n",
- Message->GetHeader()->Id,
- (int)Message->GetHeader()->Type,
- Session->GetProto()->GetService(),
- (int)(Now() - Message->GetHeader()->SendTime) / 1000,
+
+ TJobStateVec::iterator it;
+ for (it = Pending.begin(); it != Pending.end(); ++it) {
+ TJobState& call = *it;
+ strReturn += call.GetStatus(flags);
+ }
+ return strReturn;
+ }
+
+ TString TJobState::GetStatus(unsigned flags) {
+ Y_UNUSED(flags);
+ TString strReturn;
+ strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n",
+ Message->GetHeader()->Id,
+ (int)Message->GetHeader()->Type,
+ Session->GetProto()->GetService(),
+ (int)(Now() - Message->GetHeader()->SendTime) / 1000,
ToString(Status).data());
- return strReturn;
- }
-
- //////////////////////////////////////////////////////////////////////
-
- void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
- if (job) {
- job->Cancel(status);
- }
- }
-
- TString TBusModuleImpl::GetStatus(unsigned flags) {
- Y_UNUSED(flags);
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
- TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size());
- for (auto job : Jobs) {
- //strReturn += job->Job->GetStatus(flags);
- Y_UNUSED(job);
- strReturn += "TODO\n";
- }
- return strReturn;
- }
-
- TBusModuleConfig::TBusModuleConfig()
- : StarterMaxInFlight(1000)
- {
- }
-
- TBusModuleConfig::TSecret::TSecret()
- : SchedulePeriod(TDuration::Seconds(1))
- {
- }
-
- TBusModule::TBusModule(const char* name)
- : Impl(new TBusModuleImpl(this, name))
- {
- }
-
- TBusModule::~TBusModule() {
- }
-
- const char* TBusModule::GetName() const {
- return Impl->Name;
- }
-
- void TBusModule::SetConfig(const TBusModuleConfig& config) {
- Impl->ModuleConfig = config;
- }
-
- bool TBusModule::StartInput() {
- Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check");
- Y_VERIFY(!!Impl->Queue, "state check");
- Impl->State = TBusModuleImpl::RUNNING;
-
- Y_ASSERT(!Impl->ExternalSession);
- TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
- if (extSession != nullptr) {
- Impl->ExternalSession = extSession;
- }
-
- return true;
- }
-
- bool TBusModule::Shutdown() {
- Impl->Shutdown();
-
- return true;
- }
-
- TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
- TBusJob* job = new TBusJob(this, message);
- return job;
- }
-
- /**
+ return strReturn;
+ }
+
+ //////////////////////////////////////////////////////////////////////
+
+ void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
+ if (job) {
+ job->Cancel(status);
+ }
+ }
+
+ TString TBusModuleImpl::GetStatus(unsigned flags) {
+ Y_UNUSED(flags);
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
+ TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size());
+ for (auto job : Jobs) {
+ //strReturn += job->Job->GetStatus(flags);
+ Y_UNUSED(job);
+ strReturn += "TODO\n";
+ }
+ return strReturn;
+ }
+
+ TBusModuleConfig::TBusModuleConfig()
+ : StarterMaxInFlight(1000)
+ {
+ }
+
+ TBusModuleConfig::TSecret::TSecret()
+ : SchedulePeriod(TDuration::Seconds(1))
+ {
+ }
+
+ TBusModule::TBusModule(const char* name)
+ : Impl(new TBusModuleImpl(this, name))
+ {
+ }
+
+ TBusModule::~TBusModule() {
+ }
+
+ const char* TBusModule::GetName() const {
+ return Impl->Name;
+ }
+
+ void TBusModule::SetConfig(const TBusModuleConfig& config) {
+ Impl->ModuleConfig = config;
+ }
+
+ bool TBusModule::StartInput() {
+ Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check");
+ Y_VERIFY(!!Impl->Queue, "state check");
+ Impl->State = TBusModuleImpl::RUNNING;
+
+ Y_ASSERT(!Impl->ExternalSession);
+ TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
+ if (extSession != nullptr) {
+ Impl->ExternalSession = extSession;
+ }
+
+ return true;
+ }
+
+ bool TBusModule::Shutdown() {
+ Impl->Shutdown();
+
+ return true;
+ }
+
+ TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
+ TBusJob* job = new TBusJob(this, message);
+ return job;
+ }
+
+ /**
Example for external session creation:
TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
@@ -698,77 +698,77 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
return session;
*/
- bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
- Impl->Queue = queue;
- return true;
- }
-
- int TBusModule::GetModuleSessionInFlight() const {
- return Impl->Size();
- }
-
- TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
- return Impl.Get();
- }
-
- TBusServerSessionPtr TBusModule::CreateDefaultDestination(
- TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
- TBusServerSessionConfig patchedConfig = config;
- patchedConfig.ExecuteOnMessageInWorkerPool = false;
- if (!patchedConfig.Name) {
- patchedConfig.Name = name;
- }
- if (!patchedConfig.Name) {
- patchedConfig.Name = Impl->Name;
- }
- TBusServerSessionPtr session =
- TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
- Impl->ServerSessions.push_back(session);
- return session;
- }
-
- TBusClientSessionPtr TBusModule::CreateDefaultSource(
- TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
- TBusClientSessionConfig patchedConfig = config;
- patchedConfig.ExecuteOnReplyInWorkerPool = false;
- if (!patchedConfig.Name) {
- patchedConfig.Name = name;
- }
- if (!patchedConfig.Name) {
- patchedConfig.Name = Impl->Name;
- }
- TBusClientSessionPtr session =
- TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
- Impl->ClientSessions.push_back(session);
- return session;
- }
-
- TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
- TBusStarter* session = new TBusStarter(this, config);
- Impl->Starters.push_back(session);
- return session;
- }
-
- void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
- Y_UNUSED(event);
- }
-
- TString TBusModule::GetStatus(unsigned flags) {
- TString strReturn = Sprintf("%s\n", Impl->Name);
- strReturn += Impl->GetStatus(flags);
- return strReturn;
- }
+ bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
+ Impl->Queue = queue;
+ return true;
+ }
+
+ int TBusModule::GetModuleSessionInFlight() const {
+ return Impl->Size();
+ }
+
+ TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
+ return Impl.Get();
+ }
+
+ TBusServerSessionPtr TBusModule::CreateDefaultDestination(
+ TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
+ TBusServerSessionConfig patchedConfig = config;
+ patchedConfig.ExecuteOnMessageInWorkerPool = false;
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = name;
+ }
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = Impl->Name;
+ }
+ TBusServerSessionPtr session =
+ TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
+ Impl->ServerSessions.push_back(session);
+ return session;
+ }
+
+ TBusClientSessionPtr TBusModule::CreateDefaultSource(
+ TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
+ TBusClientSessionConfig patchedConfig = config;
+ patchedConfig.ExecuteOnReplyInWorkerPool = false;
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = name;
+ }
+ if (!patchedConfig.Name) {
+ patchedConfig.Name = Impl->Name;
+ }
+ TBusClientSessionPtr session =
+ TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
+ Impl->ClientSessions.push_back(session);
+ return session;
+ }
+
+ TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
+ TBusStarter* session = new TBusStarter(this, config);
+ Impl->Starters.push_back(session);
+ return session;
+ }
+
+ void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
+ Y_UNUSED(event);
+ }
+
+ TString TBusModule::GetStatus(unsigned flags) {
+ TString strReturn = Sprintf("%s\n", Impl->Name);
+ strReturn += Impl->GetStatus(flags);
+ return strReturn;
+ }
}
-void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
+void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
Jobs.push_back(jobRunner);
jobRunner->JobStorageIterator = Jobs.end();
--jobRunner->JobStorageIterator;
}
-void TBusModuleImpl::DestroyJob(TJobRunner* job) {
+void TBusModuleImpl::DestroyJob(TJobRunner* job) {
Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator());
{
@@ -837,7 +837,7 @@ EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING);
Y_VERIFY(!!Impl->Queue);
- if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
+ if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
return MESSAGE_BUSY;
}
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index 8d1c4a5d52..c917f9cae7 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -1,37 +1,37 @@
-#pragma once
-
+#pragma once
+
///////////////////////////////////////////////////////////////////////////
/// \file
/// \brief Application interface for modules
-
+
/// NBus::TBusModule provides foundation for implementation of asynchnous
/// modules that communicate with multiple external or local sessions
-/// NBus::TBusSession.
-
+/// NBus::TBusSession.
+
/// To implement the module some virtual functions needs to be overridden:
-/// NBus::TBusModule::CreateExtSession() creates and registers an
+/// NBus::TBusModule::CreateExtSession() creates and registers an
/// external session that receives incoming messages as input for module
/// processing.
-
-/// When new incoming message arrives the new NBus::TBusJob is created.
+
+/// When new incoming message arrives the new NBus::TBusJob is created.
/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
-/// during processing of one incoming message. Default implementation of
-/// NBus::TBusJob will maintain all send and received messages during
+/// during processing of one incoming message. Default implementation of
+/// NBus::TBusJob will maintain all send and received messages during
/// lifetime of this job. Each message, status and reply can be found
/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
/// needs to maintain an additional information during lifetime of the job
-/// you can derive your own class from NBus::TBusJob and override job
+/// you can derive your own class from NBus::TBusJob and override job
/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
-
+
/// Processing of a given message starts with a call to NBus::TBusModule::Start()
/// handler that should be overridden in the module implementation. Within
/// the callback handler module can perform any computation and access any
/// datastore tables that it needs. The handler can also access any module
-/// variables. However, same handler can be called from multiple threads so,
+/// variables. However, same handler can be called from multiple threads so,
/// it is recommended that handler only access read-only module level variables.
-
-/// Handler should use NBus::TBusJob::Send() to send messages to other client
+
+/// Handler should use NBus::TBusJob::Send() to send messages to other client
/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
/// job message. When handler is done, it returns the pointer to the next handler to call
/// when all pending messages have cleared. If handler
@@ -47,364 +47,364 @@
#include <util/generic/object_counter.h>
namespace NBus {
- class TBusJob;
- class TBusModule;
-
- namespace NPrivate {
- struct TCallJobHandlerWorkItem;
- struct TBusModuleImpl;
- struct TModuleServerHandler;
- struct TModuleClientHandler;
- struct TJobRunner;
- }
-
- class TJobHandler {
- protected:
- typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
- TBusHandlerPtr MyPtr;
-
- public:
- template <class B>
- TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) {
- MyPtr = static_cast<TBusHandlerPtr>(fptr);
- }
- TJobHandler(TBusHandlerPtr fptr = nullptr) {
- MyPtr = fptr;
- }
+ class TBusJob;
+ class TBusModule;
+
+ namespace NPrivate {
+ struct TCallJobHandlerWorkItem;
+ struct TBusModuleImpl;
+ struct TModuleServerHandler;
+ struct TModuleClientHandler;
+ struct TJobRunner;
+ }
+
+ class TJobHandler {
+ protected:
+ typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
+ TBusHandlerPtr MyPtr;
+
+ public:
+ template <class B>
+ TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) {
+ MyPtr = static_cast<TBusHandlerPtr>(fptr);
+ }
+ TJobHandler(TBusHandlerPtr fptr = nullptr) {
+ MyPtr = fptr;
+ }
TJobHandler(const TJobHandler&) = default;
TJobHandler& operator =(const TJobHandler&) = default;
bool operator==(TJobHandler h) const {
- return MyPtr == h.MyPtr;
- }
+ return MyPtr == h.MyPtr;
+ }
bool operator!=(TJobHandler h) const {
- return MyPtr != h.MyPtr;
- }
- bool operator!() const {
- return !MyPtr;
- }
- TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) {
- return (b->*MyPtr)(job, mess);
- }
- };
-
- typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
-
- ////////////////////////////////////////////////////
- /// \brief Pending message state
-
- struct TJobState {
- friend class TBusJob;
- friend class ::TCrawlerModule;
-
- TReplyHandler Handler;
- EMessageStatus Status;
- TBusMessage* Message;
- TBusMessage* Reply;
- TBusClientSession* Session;
- size_t NumRetries;
- size_t MaxRetries;
- // If != NULL then use it as destination.
- TNetAddr Addr;
- bool UseAddr;
- bool OneWay;
-
- private:
- TJobState(TReplyHandler handler,
- EMessageStatus status,
- TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0,
- const TNetAddr* addr = nullptr, bool oneWay = false)
- : Handler(handler)
- , Status(status)
- , Message(mess)
- , Reply(reply)
- , Session(session)
- , NumRetries(0)
- , MaxRetries(maxRetries)
- , OneWay(oneWay)
- {
- if (!!addr) {
- Addr = *addr;
- }
- UseAddr = !!addr;
- }
-
- public:
- TString GetStatus(unsigned flags);
- };
-
- using TJobStateVec = TVector<TJobState>;
-
- /////////////////////////////////////////////////////////
- /// \brief Execution item = thread
-
- /// Maintains internal state of document in computation
- class TBusJob {
- TObjectCounter<TBusJob> ObjectCounter;
-
- private:
- void CheckThreadCurrentJob();
-
- public:
- /// given a module and starter message
- TBusJob(TBusModule* module, TBusMessage* message);
-
- /// destructor will free all the message that were send and received
- virtual ~TBusJob();
-
- TBusMessage* GetMessage() const {
- return Message;
- }
-
- TNetAddr GetPeerAddrNetAddr() const;
-
- /// send message to any other session or application
- /// If addr is set then use it as destination.
- void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
- void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
-
- void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
- void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
-
- /// send reply to the starter message
- virtual void SendReply(TBusMessageAutoPtr reply);
-
- /// set the flag to terminate job at the earliest convenience
- void Cancel(EMessageStatus status);
-
- /// helper to put item on finished list of states
- /// It should not be a part of public API,
- /// so prohibit it for all except current users.
- private:
- friend class ::TCrawlerModule;
- void PutState(const TJobState& state) {
- Finished.push_back(state);
+ return MyPtr != h.MyPtr;
+ }
+ bool operator!() const {
+ return !MyPtr;
+ }
+ TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) {
+ return (b->*MyPtr)(job, mess);
+ }
+ };
+
+ typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
+
+ ////////////////////////////////////////////////////
+ /// \brief Pending message state
+
+ struct TJobState {
+ friend class TBusJob;
+ friend class ::TCrawlerModule;
+
+ TReplyHandler Handler;
+ EMessageStatus Status;
+ TBusMessage* Message;
+ TBusMessage* Reply;
+ TBusClientSession* Session;
+ size_t NumRetries;
+ size_t MaxRetries;
+ // If != NULL then use it as destination.
+ TNetAddr Addr;
+ bool UseAddr;
+ bool OneWay;
+
+ private:
+ TJobState(TReplyHandler handler,
+ EMessageStatus status,
+ TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0,
+ const TNetAddr* addr = nullptr, bool oneWay = false)
+ : Handler(handler)
+ , Status(status)
+ , Message(mess)
+ , Reply(reply)
+ , Session(session)
+ , NumRetries(0)
+ , MaxRetries(maxRetries)
+ , OneWay(oneWay)
+ {
+ if (!!addr) {
+ Addr = *addr;
+ }
+ UseAddr = !!addr;
}
- public:
- /// retrieve all pending messages
- void GetPending(TJobStateVec* stateVec) {
- Y_ASSERT(stateVec);
- *stateVec = Pending;
- }
-
- /// helper function to find state of previously sent messages
- template <class MessageType>
- TJobState* GetState(int* startFrom = nullptr) {
- for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
- TJobState* call = &Finished[i];
- if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) {
- if (startFrom) {
- *startFrom = i;
- }
- return call;
- }
- if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) {
- if (startFrom) {
- *startFrom = i;
- }
- return call;
- }
+ public:
+ TString GetStatus(unsigned flags);
+ };
+
+ using TJobStateVec = TVector<TJobState>;
+
+ /////////////////////////////////////////////////////////
+ /// \brief Execution item = thread
+
+ /// Maintains internal state of document in computation
+ class TBusJob {
+ TObjectCounter<TBusJob> ObjectCounter;
+
+ private:
+ void CheckThreadCurrentJob();
+
+ public:
+ /// given a module and starter message
+ TBusJob(TBusModule* module, TBusMessage* message);
+
+ /// destructor will free all the message that were send and received
+ virtual ~TBusJob();
+
+ TBusMessage* GetMessage() const {
+ return Message;
+ }
+
+ TNetAddr GetPeerAddrNetAddr() const;
+
+ /// send message to any other session or application
+ /// If addr is set then use it as destination.
+ void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
+ void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
+
+ void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
+ void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
+
+ /// send reply to the starter message
+ virtual void SendReply(TBusMessageAutoPtr reply);
+
+ /// set the flag to terminate job at the earliest convenience
+ void Cancel(EMessageStatus status);
+
+ /// helper to put item on finished list of states
+ /// It should not be a part of public API,
+ /// so prohibit it for all except current users.
+ private:
+ friend class ::TCrawlerModule;
+ void PutState(const TJobState& state) {
+ Finished.push_back(state);
+ }
+
+ public:
+ /// retrieve all pending messages
+ void GetPending(TJobStateVec* stateVec) {
+ Y_ASSERT(stateVec);
+ *stateVec = Pending;
+ }
+
+ /// helper function to find state of previously sent messages
+ template <class MessageType>
+ TJobState* GetState(int* startFrom = nullptr) {
+ for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
+ TJobState* call = &Finished[i];
+ if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return call;
+ }
+ if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return call;
+ }
}
- return nullptr;
+ return nullptr;
}
- /// helper function to find response for previously sent messages
- template <class MessageType>
- MessageType* Get(int* startFrom = nullptr) {
- for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
- TJobState& call = Finished[i];
- if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) {
- if (startFrom) {
- *startFrom = i;
- }
- return static_cast<MessageType*>(call.Reply);
- }
- if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
- if (startFrom) {
- *startFrom = i;
- }
- return static_cast<MessageType*>(call.Message);
- }
+ /// helper function to find response for previously sent messages
+ template <class MessageType>
+ MessageType* Get(int* startFrom = nullptr) {
+ for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
+ TJobState& call = Finished[i];
+ if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return static_cast<MessageType*>(call.Reply);
+ }
+ if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return static_cast<MessageType*>(call.Message);
+ }
}
- return nullptr;
+ return nullptr;
}
- /// helper function to find status for previously sent message
- template <class MessageType>
- EMessageStatus GetStatus(int* startFrom = nullptr) {
- for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
- TJobState& call = Finished[i];
- if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
- if (startFrom) {
- *startFrom = i;
- }
- return call.Status;
- }
+ /// helper function to find status for previously sent message
+ template <class MessageType>
+ EMessageStatus GetStatus(int* startFrom = nullptr) {
+ for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
+ TJobState& call = Finished[i];
+ if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return call.Status;
+ }
}
- return MESSAGE_UNKNOWN;
+ return MESSAGE_UNKNOWN;
}
- /// helper function to clear state of previosly sent messages
- template <class MessageType>
- void Clear() {
- for (size_t i = 0; i < Finished.size();) {
- // `Finished.size() - i` decreases with each iteration
- // we either increment i, or remove element from Finished.
- TJobState& call = Finished[i];
- if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
- ClearState(call);
- } else {
- ++i;
- }
+ /// helper function to clear state of previosly sent messages
+ template <class MessageType>
+ void Clear() {
+ for (size_t i = 0; i < Finished.size();) {
+ // `Finished.size() - i` decreases with each iteration
+ // we either increment i, or remove element from Finished.
+ TJobState& call = Finished[i];
+ if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
+ ClearState(call);
+ } else {
+ ++i;
+ }
}
}
- /// helper function to clear state in order to try again
- void ClearState(TJobState& state);
-
- /// clears all message states
- void ClearAllMessageStates();
-
- /// returns true if job is done
- bool IsDone();
-
- /// return human reabable status of this job
- virtual TString GetStatus(unsigned flags);
-
- /// set sleep time for job
- void Sleep(int milliSeconds);
-
- void CallJobHandlerOnly();
-
- private:
- bool CallJobHandler();
- void DoCallReplyHandler(TJobState&);
- /// send out all Pending jobs, failed sends will be migrated to Finished
- bool SendPending();
- bool AnyPendingToSend();
-
- public:
- /// helper to call from OnReply() and OnError()
- int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
-
- public:
- TJobHandler Handler; ///< job handler to be executed within next CallJobHandler()
- EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap
- private:
- NPrivate::TJobRunner* Runner;
- TBusMessage* Message;
- THolder<TBusMessage> MessageHolder;
- TOnMessageContext OnMessageContext; // starter
- public:
- bool ReplySent;
-
- private:
- friend class TBusModule;
- friend struct NPrivate::TBusModuleImpl;
- friend struct NPrivate::TCallJobHandlerWorkItem;
- friend struct NPrivate::TModuleServerHandler;
- friend struct NPrivate::TModuleClientHandler;
- friend struct NPrivate::TJobRunner;
-
- TJobStateVec Pending; ///< messages currently outstanding via Send()
- TJobStateVec Finished; ///< messages that were replied to
- TBusModule* Module;
- NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
- TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
+ /// helper function to clear state in order to try again
+ void ClearState(TJobState& state);
+
+ /// clears all message states
+ void ClearAllMessageStates();
+
+ /// returns true if job is done
+ bool IsDone();
+
+ /// return human reabable status of this job
+ virtual TString GetStatus(unsigned flags);
+
+ /// set sleep time for job
+ void Sleep(int milliSeconds);
+
+ void CallJobHandlerOnly();
+
+ private:
+ bool CallJobHandler();
+ void DoCallReplyHandler(TJobState&);
+ /// send out all Pending jobs, failed sends will be migrated to Finished
+ bool SendPending();
+ bool AnyPendingToSend();
+
+ public:
+ /// helper to call from OnReply() and OnError()
+ int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
+
+ public:
+ TJobHandler Handler; ///< job handler to be executed within next CallJobHandler()
+ EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap
+ private:
+ NPrivate::TJobRunner* Runner;
+ TBusMessage* Message;
+ THolder<TBusMessage> MessageHolder;
+ TOnMessageContext OnMessageContext; // starter
+ public:
+ bool ReplySent;
+
+ private:
+ friend class TBusModule;
+ friend struct NPrivate::TBusModuleImpl;
+ friend struct NPrivate::TCallJobHandlerWorkItem;
+ friend struct NPrivate::TModuleServerHandler;
+ friend struct NPrivate::TModuleClientHandler;
+ friend struct NPrivate::TJobRunner;
+
+ TJobStateVec Pending; ///< messages currently outstanding via Send()
+ TJobStateVec Finished; ///< messages that were replied to
+ TBusModule* Module;
+ NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
+ TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief Classes to implement basic module functionality
+
+ class IJobFactory {
+ protected:
+ virtual ~IJobFactory() {
+ }
+
+ public:
+ /// job factory method, override to create custom jobs
+ virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0;
};
- ////////////////////////////////////////////////////////////////////
- /// \brief Classes to implement basic module functionality
+ struct TBusModuleConfig {
+ unsigned StarterMaxInFlight;
- class IJobFactory {
- protected:
- virtual ~IJobFactory() {
- }
+ struct TSecret {
+ TDuration SchedulePeriod;
- public:
- /// job factory method, override to create custom jobs
- virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0;
- };
-
- struct TBusModuleConfig {
- unsigned StarterMaxInFlight;
-
- struct TSecret {
- TDuration SchedulePeriod;
+ TSecret();
+ };
+ TSecret Secret;
- TSecret();
- };
- TSecret Secret;
+ TBusModuleConfig();
+ };
- TBusModuleConfig();
- };
-
- namespace NPrivate {
- struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> {
- virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0;
- virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0;
- virtual TBusMessageQueue* GetQueue() = 0;
+ namespace NPrivate {
+ struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> {
+ virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0;
+ virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0;
+ virtual TBusMessageQueue* GetQueue() = 0;
- virtual TString GetNameInternal() = 0;
+ virtual TString GetNameInternal() = 0;
- virtual TString GetStatusSingleLine() = 0;
+ virtual TString GetStatusSingleLine() = 0;
- virtual ~TBusModuleInternal() {
- }
- };
- }
+ virtual ~TBusModuleInternal() {
+ }
+ };
+ }
- class TBusModule: public IJobFactory, TNonCopyable {
- friend class TBusJob;
+ class TBusModule: public IJobFactory, TNonCopyable {
+ friend class TBusJob;
- TObjectCounter<TBusModule> ObjectCounter;
+ TObjectCounter<TBusModule> ObjectCounter;
- TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
+ TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
- public:
- /// Each module should have a name which is used as protocol service
- TBusModule(const char* name);
- ~TBusModule() override;
+ public:
+ /// Each module should have a name which is used as protocol service
+ TBusModule(const char* name);
+ ~TBusModule() override;
- const char* GetName() const;
+ const char* GetName() const;
- void SetConfig(const TBusModuleConfig& config);
+ void SetConfig(const TBusModuleConfig& config);
- /// get status of all jobs in flight
- TString GetStatus(unsigned flags = 0);
+ /// get status of all jobs in flight
+ TString GetStatus(unsigned flags = 0);
- /// called when application is about to start
- virtual bool StartInput();
- /// called when application is about to exit
- virtual bool Shutdown();
+ /// called when application is about to start
+ virtual bool StartInput();
+ /// called when application is about to exit
+ virtual bool Shutdown();
- // this default implementation just creates TBusJob object
- TBusJob* CreateJobInstance(TBusMessage* message) override;
+ // this default implementation just creates TBusJob object
+ TBusJob* CreateJobInstance(TBusMessage* message) override;
- EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
+ EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
- /// creates private sessions, calls CreateExtSession(), should be called before StartInput()
- bool CreatePrivateSessions(TBusMessageQueue* queue);
+ /// creates private sessions, calls CreateExtSession(), should be called before StartInput()
+ bool CreatePrivateSessions(TBusMessageQueue* queue);
- virtual void OnClientConnectionEvent(const TClientConnectionEvent& event);
+ virtual void OnClientConnectionEvent(const TClientConnectionEvent& event);
- public:
- /// entry point into module, first function to call
- virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
-
- protected:
- /// override this function to create destination session
- virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;
-
- public:
- int GetModuleSessionInFlight() const;
-
- TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal();
-
- protected:
- TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString());
- TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString());
- TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
- };
+ public:
+ /// entry point into module, first function to call
+ virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
+ protected:
+ /// override this function to create destination session
+ virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;
+
+ public:
+ int GetModuleSessionInFlight() const;
+
+ TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal();
+
+ protected:
+ TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString());
+ TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString());
+ TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
+ };
+
}
diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp
index 7c38801d62..76171ba1d5 100644
--- a/library/cpp/messagebus/oldmodule/startsession.cpp
+++ b/library/cpp/messagebus/oldmodule/startsession.cpp
@@ -1,14 +1,14 @@
///////////////////////////////////////////////////////////
-/// \file
+/// \file
/// \brief Starter session implementation
-
+
/// Starter session will generate emtpy message to insert
/// into local session that are registered under same protocol
-
+
/// Starter (will one day) automatically adjust number
/// of message inflight to make sure that at least one of source
/// sessions within message queue is at the limit (bottle neck)
-
+
/// Maximum number of messages that starter will instert into
/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
@@ -19,46 +19,46 @@
#include <library/cpp/messagebus/ybus.h>
namespace NBus {
- void* TBusStarter::_starter(void* data) {
- TBusStarter* pThis = static_cast<TBusStarter*>(data);
- pThis->Starter();
- return nullptr;
- }
+ void* TBusStarter::_starter(void* data) {
+ TBusStarter* pThis = static_cast<TBusStarter*>(data);
+ pThis->Starter();
+ return nullptr;
+ }
- TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
- : Module(module)
- , Config(config)
- , StartThread(_starter, this)
- , Exiting(false)
- {
- StartThread.Start();
- }
+ TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
+ : Module(module)
+ , Config(config)
+ , StartThread(_starter, this)
+ , Exiting(false)
+ {
+ StartThread.Start();
+ }
- TBusStarter::~TBusStarter() {
- Shutdown();
- }
+ TBusStarter::~TBusStarter() {
+ Shutdown();
+ }
- void TBusStarter::Shutdown() {
- {
- TGuard<TMutex> g(ExitLock);
- Exiting = true;
- ExitSignal.Signal();
- }
- StartThread.Join();
- }
+ void TBusStarter::Shutdown() {
+ {
+ TGuard<TMutex> g(ExitLock);
+ Exiting = true;
+ ExitSignal.Signal();
+ }
+ StartThread.Join();
+ }
- void TBusStarter::Starter() {
+ void TBusStarter::Starter() {
TGuard<TMutex> g(ExitLock);
- while (!Exiting) {
- TAutoPtr<TBusMessage> empty(new TBusMessage(0));
+ while (!Exiting) {
+ TAutoPtr<TBusMessage> empty(new TBusMessage(0));
- EMessageStatus status = Module->StartJob(empty);
+ EMessageStatus status = Module->StartJob(empty);
- if (Config.SendTimeout > 0) {
- ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
- } else {
- ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
- }
+ if (Config.SendTimeout > 0) {
+ ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
+ } else {
+ ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
+ }
}
}
diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h
index 5e26e7e1e5..afe25ac809 100644
--- a/library/cpp/messagebus/oldmodule/startsession.h
+++ b/library/cpp/messagebus/oldmodule/startsession.h
@@ -5,30 +5,30 @@
#include <util/system/thread.h>
namespace NBus {
- class TBusModule;
-
- class TBusStarter {
- private:
- TBusModule* Module;
- TBusSessionConfig Config;
- TThread StartThread;
- bool Exiting;
- TCondVar ExitSignal;
- TMutex ExitLock;
-
- static void* _starter(void* data);
-
- void Starter();
-
- TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) {
- return "";
- }
-
- public:
- TBusStarter(TBusModule* module, const TBusSessionConfig& config);
- ~TBusStarter();
-
- void Shutdown();
- };
-
-}
+ class TBusModule;
+
+ class TBusStarter {
+ private:
+ TBusModule* Module;
+ TBusSessionConfig Config;
+ TThread StartThread;
+ bool Exiting;
+ TCondVar ExitSignal;
+ TMutex ExitLock;
+
+ static void* _starter(void* data);
+
+ void Starter();
+
+ TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) {
+ return "";
+ }
+
+ public:
+ TBusStarter(TBusModule* module, const TBusSessionConfig& config);
+ ~TBusStarter();
+
+ void Shutdown();
+ };
+
+}