aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/oldmodule
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp380
-rw-r--r--library/cpp/messagebus/oldmodule/module.h62
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.h2
-rw-r--r--library/cpp/messagebus/oldmodule/ya.make24
4 files changed, 234 insertions, 234 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 24bd778799..ccebcfb7cc 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -1,4 +1,4 @@
-#include "module.h"
+#include "module.h"
#include <library/cpp/messagebus/scheduler_actor.h>
#include <library/cpp/messagebus/thread_extra.h>
@@ -9,19 +9,19 @@
#include <util/generic/singleton.h>
#include <util/string/printf.h>
-#include <util/system/event.h>
-
-using namespace NActor;
-using namespace NBus;
-using namespace NBus::NPrivate;
+#include <util/system/event.h>
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
namespace {
Y_POD_STATIC_THREAD(TBusJob*)
ThreadCurrentJob;
struct TThreadCurrentJobGuard {
TBusJob* Prev;
-
+
TThreadCurrentJobGuard(TBusJob* job)
: Prev(ThreadCurrentJob)
{
@@ -32,7 +32,7 @@ namespace {
ThreadCurrentJob = Prev;
}
};
-
+
void ClearState(NBus::TJobState* state) {
/// skip sendbacks handlers
if (state->Message != state->Reply) {
@@ -46,11 +46,11 @@ namespace {
state->Reply = nullptr;
}
}
- }
-
+ }
+
void ClearJobStateVector(NBus::TJobStateVec* vec) {
Y_ASSERT(vec);
-
+
for (auto& call : *vec) {
ClearState(&call);
}
@@ -71,12 +71,12 @@ namespace NBus {
: Module(module)
{
}
-
+
void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
void OnClientConnectionEvent(const TClientConnectionEvent& event) override;
-
+
TBusModuleImpl* const Module;
};
@@ -94,13 +94,13 @@ namespace NBus {
struct TBusModuleImpl: public TBusModuleInternal {
TBusModule* const Module;
-
+
TBusMessageQueue* Queue;
-
+
TScheduler Scheduler;
-
+
const char* const Name;
-
+
typedef TList<TJobRunner*> TBusJobList;
/// jobs currently in-flight on this module
TBusJobList Jobs;
@@ -108,13 +108,13 @@ namespace NBus {
TMutex Lock;
TCondVar ShutdownCondVar;
TAtomic JobCount;
-
+
enum EState {
CREATED,
RUNNING,
STOPPED,
};
-
+
TAtomic State;
TBusModuleConfig ModuleConfig;
TBusServerSessionPtr ExternalSession;
@@ -122,12 +122,12 @@ namespace NBus {
THolder<IBusClientHandler> ModuleClientHandler;
THolder<IBusServerHandler> ModuleServerHandler;
TVector<TSimpleSharedPtr<TBusStarter>> Starters;
-
+
// Sessions must be destroyed before
// ModuleClientHandler / ModuleServerHandler
TVector<TBusClientSessionPtr> ClientSessions;
TVector<TBusServerSessionPtr> ServerSessions;
-
+
TBusModuleImpl(TBusModule* module, const char* name)
: Module(module)
, Queue()
@@ -139,12 +139,12 @@ namespace NBus {
, ModuleServerHandler(new TModuleServerHandler(this))
{
}
-
+
~TBusModuleImpl() override {
// Shutdown cannot be called from destructor,
// because module has virtual methods.
Y_VERIFY(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");
-
+
Scheduler.Stop();
while (!Jobs.empty()) {
@@ -152,56 +152,56 @@ namespace NBus {
}
Y_VERIFY(JobCount == 0, "state check");
}
-
+
void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);
-
+
void AddJob(TJobRunner* jobRunner);
-
+
void DestroyJob(TJobRunner* job);
-
+
/// terminate job on this message
void CancelJob(TBusJob* job, EMessageStatus status);
/// prints statuses of jobs
TString GetStatus(unsigned flags);
-
+
size_t Size() const {
return AtomicGet(JobCount);
}
-
+
void Shutdown();
-
+
TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
return ClientSessions;
}
-
+
TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
return ServerSessions;
}
-
+
TBusMessageQueue* GetQueue() override {
return Queue;
}
-
+
TString GetNameInternal() override {
return Name;
}
-
+
TString GetStatusSingleLine() override {
TStringStream ss;
ss << "jobs: " << Size();
return ss.Str();
}
-
+
void OnClientConnectionEvent(const TClientConnectionEvent& event) {
Module->OnClientConnectionEvent(event);
}
};
-
+
struct TJobResponseMessage {
TBusMessage* Request;
TBusMessage* Response;
EMessageStatus Status;
-
+
TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
: Request(request)
, Response(response)
@@ -215,9 +215,9 @@ namespace NBus {
public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
public TScheduleActor<TJobRunner> {
THolder<TBusJob> Job;
-
+
TList<TJobRunner*>::iterator JobStorageIterator;
-
+
TJobRunner(TAutoPtr<TBusJob> job)
: NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
, TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
@@ -226,15 +226,15 @@ namespace NBus {
{
Job->Runner = this;
}
-
+
~TJobRunner() override {
Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
}
-
+
void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
Job->CallReplyHandler(message.Status, message.Request, message.Response);
}
-
+
void Destroy() {
if (!!Job->OnMessageContext) {
if (!Job->ReplySent) {
@@ -242,24 +242,24 @@ namespace NBus {
}
}
Job->ModuleImpl->DestroyJob(this);
- }
-
+ }
+
void Act(NActor::TDefaultTag) {
if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
return;
}
-
+
if (Job->SleepUntil != 0) {
if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
Destroy();
return;
}
}
-
+
TThreadCurrentJobGuard g(Job.Get());
-
+
NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();
-
+
if (Alarm.FetchTask()) {
if (Job->AnyPendingToSend()) {
Y_ASSERT(Job->SleepUntil == 0);
@@ -272,50 +272,50 @@ namespace NBus {
Y_ASSERT(Job->SleepUntil != 0);
Job->SleepUntil = 0;
}
- }
-
+ }
+
for (;;) {
if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
-
+
Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
}
-
+
if (Job->SleepUntil != 0) {
ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
return;
}
-
+
Job->SendPending();
-
+
if (Job->AnyPendingToSend()) {
ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
return;
}
-
+
if (!Job->Pending.empty()) {
// waiting replies
return;
}
-
+
if (Job->IsDone()) {
Destroy();
return;
}
}
- }
+ }
};
-
+
}
-
+
static inline TJobRunner* GetJob(TBusMessage* message) {
return (TJobRunner*)message->Data;
- }
-
+ }
+
static inline void SetJob(TBusMessage* message, TJobRunner* job) {
message->Data = job;
}
-
+
TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
: Status(MESSAGE_OK)
, Runner()
@@ -346,7 +346,7 @@ namespace NBus {
/////////////////////////////////////////////////////////
/// \brief Send messages in pending list
-
+
/// If at least one message is gone return true
/// If message has not been send, move it to Finished with appropriate error code
bool TBusJob::SendPending() {
@@ -358,7 +358,7 @@ namespace NBus {
size_t it = 0;
while (it != Pending.size()) {
TJobState& call = Pending[it];
-
+
if (call.Status == MESSAGE_DONT_ASK) {
EMessageStatus getAddressStatus = MESSAGE_OK;
TNetAddr addr;
@@ -371,22 +371,22 @@ namespace NBus {
if (getAddressStatus == MESSAGE_OK) {
// hold extra reference for each request in flight
Runner->Ref();
-
+
if (call.OneWay) {
call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
} else {
call.Status = call.Session->SendMessage(call.Message, &addr);
}
-
+
if (call.Status != MESSAGE_OK) {
Runner->UnRef();
}
-
- } else {
+
+ } else {
call.Status = getAddressStatus;
- }
+ }
}
-
+
if (call.Status == MESSAGE_OK) {
++it; // keep pending list until we get reply
} else if (call.Status == MESSAGE_BUSY) {
@@ -397,11 +397,11 @@ namespace NBus {
DoCallReplyHandler(call);
call.Status = MESSAGE_DONT_ASK;
call.Message->Reset(); // generate new Id
- } else {
+ } else {
Finished.push_back(call);
DoCallReplyHandler(call);
Pending.erase(Pending.begin() + it);
- }
+ }
}
return Pending.size() > 0;
}
@@ -419,12 +419,12 @@ namespace NBus {
bool TBusJob::IsDone() {
bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
return r;
- }
-
+ }
+
void TBusJob::CallJobHandlerOnly() {
TThreadCurrentJobGuard threadCurrentJobGuard(this);
TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
-
+
Handler = Handler(ModuleImpl->Module, this, Message);
}
@@ -438,31 +438,31 @@ namespace NBus {
if (Status != MESSAGE_OK) {
break;
}
-
+
/// there are messages to send and wait for reply
SendPending();
-
+
if (!Pending.empty()) {
break;
}
-
+
/// asked to sleep
if (SleepUntil) {
break;
}
}
-
+
Y_VERIFY(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
"Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
Message->GetHeader()->Id, Message->GetHeader()->Type);
-
+
return IsDone();
}
-
+
void TBusJob::DoCallReplyHandler(TJobState& call) {
if (call.Handler) {
TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");
-
+
TThreadCurrentJobGuard threadCurrentJobGuard(this);
(Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
}
@@ -477,7 +477,7 @@ namespace NBus {
break;
}
}
-
+
/// if not found, report error
if (i == Pending.size()) {
Y_FAIL("must not happen");
@@ -496,7 +496,7 @@ namespace NBus {
DoCallReplyHandler(call);
return 0;
}
-
+
/// call the handler if provided
DoCallReplyHandler(call);
@@ -515,50 +515,50 @@ namespace NBus {
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
}
-
+
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
CheckThreadCurrentJob();
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
}
-
+
void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
CheckThreadCurrentJob();
SetJob(req.Get(), Runner);
Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
}
-
+
void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
CheckThreadCurrentJob();
-
+
SetJob(req.Get(), Runner);
Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
}
-
+
///////////////////////////////////////////////////////////////
/// send reply to the starter message
void TBusJob::SendReply(TBusMessageAutoPtr reply) {
CheckThreadCurrentJob();
-
+
Y_VERIFY(!ReplySent, "cannot call SendReply twice");
ReplySent = true;
if (!OnMessageContext)
return;
-
+
EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
if (ok != MESSAGE_OK) {
// TODO: count errors
}
}
-
+
/// set the flag to terminate job at the earliest convenience
void TBusJob::Cancel(EMessageStatus status) {
CheckThreadCurrentJob();
-
+
Status = status;
- }
+ }
void TBusJob::ClearState(TJobState& call) {
TJobStateVec::iterator it;
@@ -580,10 +580,10 @@ namespace NBus {
void TBusJob::Sleep(int milliSeconds) {
CheckThreadCurrentJob();
-
+
Y_VERIFY(Pending.empty(), "sleep is not allowed when there are pending job");
Y_VERIFY(SleepUntil == 0, "must not override sleep");
-
+
SleepUntil = Now() + milliSeconds;
}
@@ -642,12 +642,12 @@ namespace NBus {
: StarterMaxInFlight(1000)
{
}
-
+
TBusModuleConfig::TSecret::TSecret()
: SchedulePeriod(TDuration::Seconds(1))
{
}
-
+
TBusModule::TBusModule(const char* name)
: Impl(new TBusModuleImpl(this, name))
{
@@ -659,16 +659,16 @@ namespace NBus {
const char* TBusModule::GetName() const {
return Impl->Name;
}
-
+
void TBusModule::SetConfig(const TBusModuleConfig& config) {
Impl->ModuleConfig = config;
}
-
+
bool TBusModule::StartInput() {
Y_VERIFY(Impl->State == TBusModuleImpl::CREATED, "state check");
Y_VERIFY(!!Impl->Queue, "state check");
Impl->State = TBusModuleImpl::RUNNING;
-
+
Y_ASSERT(!Impl->ExternalSession);
TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
if (extSession != nullptr) {
@@ -676,14 +676,14 @@ namespace NBus {
}
return true;
- }
+ }
bool TBusModule::Shutdown() {
Impl->Shutdown();
return true;
}
-
+
TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
TBusJob* job = new TBusJob(this, message);
return job;
@@ -706,11 +706,11 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
int TBusModule::GetModuleSessionInFlight() const {
return Impl->Size();
}
-
+
TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
return Impl.Get();
}
-
+
TBusServerSessionPtr TBusModule::CreateDefaultDestination(
TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
TBusServerSessionConfig patchedConfig = config;
@@ -725,7 +725,7 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
Impl->ServerSessions.push_back(session);
return session;
- }
+ }
TBusClientSessionPtr TBusModule::CreateDefaultSource(
TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
@@ -741,140 +741,140 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
Impl->ClientSessions.push_back(session);
return session;
- }
-
+ }
+
TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
TBusStarter* session = new TBusStarter(this, config);
Impl->Starters.push_back(session);
return session;
- }
+ }
void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
Y_UNUSED(event);
- }
-
+ }
+
TString TBusModule::GetStatus(unsigned flags) {
TString strReturn = Sprintf("%s\n", Impl->Name);
strReturn += Impl->GetStatus(flags);
return strReturn;
}
-
+
}
void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
- Jobs.push_back(jobRunner);
- jobRunner->JobStorageIterator = Jobs.end();
- --jobRunner->JobStorageIterator;
-}
-
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
+ Jobs.push_back(jobRunner);
+ jobRunner->JobStorageIterator = Jobs.end();
+ --jobRunner->JobStorageIterator;
+}
+
void TBusModuleImpl::DestroyJob(TJobRunner* job) {
Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator());
-
- {
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob");
- int jobCount = AtomicDecrement(JobCount);
+
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob");
+ int jobCount = AtomicDecrement(JobCount);
Y_VERIFY(jobCount >= 0, "decremented too much");
- Jobs.erase(job->JobStorageIterator);
-
+ Jobs.erase(job->JobStorageIterator);
+
if (AtomicGet(State) == STOPPED) {
- if (jobCount == 0) {
- ShutdownCondVar.BroadCast();
- }
+ if (jobCount == 0) {
+ ShutdownCondVar.BroadCast();
+ }
}
}
-
+
job->JobStorageIterator = TList<TJobRunner*>::iterator();
}
-void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
- TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
+void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
+ TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
Y_VERIFY(!!msg);
-
- THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg)));
- jobRunner->Job->MessageHolder.Reset(msg0.Release());
- jobRunner->Job->OnMessageContext.Swap(context);
- SetJob(jobRunner->Job->Message, jobRunner.Get());
-
- AtomicIncrement(JobCount);
-
- AddJob(jobRunner.Get());
-
- jobRunner.Release()->Schedule();
-}
-
-void TBusModuleImpl::Shutdown() {
+
+ THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg)));
+ jobRunner->Job->MessageHolder.Reset(msg0.Release());
+ jobRunner->Job->OnMessageContext.Swap(context);
+ SetJob(jobRunner->Job->Message, jobRunner.Get());
+
+ AtomicIncrement(JobCount);
+
+ AddJob(jobRunner.Get());
+
+ jobRunner.Release()->Schedule();
+}
+
+void TBusModuleImpl::Shutdown() {
if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
AtomicSet(State, TBusModuleImpl::STOPPED);
- return;
- }
+ return;
+ }
AtomicSet(State, TBusModuleImpl::STOPPED);
-
+
for (auto& clientSession : ClientSessions) {
clientSession->Shutdown();
- }
+ }
for (auto& serverSession : ServerSessions) {
serverSession->Shutdown();
- }
-
- for (size_t starter = 0; starter < Starters.size(); ++starter) {
- Starters[starter]->Shutdown();
- }
-
- {
- TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown");
+ }
+
+ for (size_t starter = 0; starter < Starters.size(); ++starter) {
+ Starters[starter]->Shutdown();
+ }
+
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown");
for (auto& Job : Jobs) {
Job->Schedule();
- }
-
- while (!Jobs.empty()) {
- ShutdownCondVar.WaitI(Lock);
- }
- }
+ }
+
+ while (!Jobs.empty()) {
+ ShutdownCondVar.WaitI(Lock);
+ }
+ }
}
-EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
+EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
Y_VERIFY(Impl->State == TBusModuleImpl::RUNNING);
Y_VERIFY(!!Impl->Queue);
-
+
if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
- return MESSAGE_BUSY;
- }
-
- TOnMessageContext dummy;
- Impl->OnMessageReceived(message.Release(), dummy);
-
- return MESSAGE_OK;
-}
-
-void TModuleServerHandler::OnMessage(TOnMessageContext& msg) {
+ return MESSAGE_BUSY;
+ }
+
+ TOnMessageContext dummy;
+ Impl->OnMessageReceived(message.Release(), dummy);
+
+ return MESSAGE_OK;
+}
+
+void TModuleServerHandler::OnMessage(TOnMessageContext& msg) {
Module->OnMessageReceived(nullptr, msg);
-}
-
-void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) {
- TJobRunner* job = GetJob(req.Get());
+}
+
+void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) {
+ TJobRunner* job = GetJob(req.Get());
Y_ASSERT(job);
Y_ASSERT(job->Job->Message != req.Get());
- job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
- job->UnRef();
+ job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
+ job->UnRef();
}
-void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
- TJobRunner* job = GetJob(req.Get());
+void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
+ TJobRunner* job = GetJob(req.Get());
Y_ASSERT(job);
Y_ASSERT(job->Job->Message != req.Get());
job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK));
- job->UnRef();
+ job->UnRef();
}
-
-void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
- TJobRunner* job = GetJob(msg.Get());
- if (job) {
+
+void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
+ TJobRunner* job = GetJob(msg.Get());
+ if (job) {
Y_ASSERT(job->Job->Message != msg.Get());
job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status));
- job->UnRef();
- }
-}
+ job->UnRef();
+ }
+}
void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
Module->OnClientConnectionEvent(event);
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index 8d1c4a5d52..ead001480c 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -40,7 +40,7 @@
/// error (not MESSAGE_OK)
#include "startsession.h"
-
+
#include <library/cpp/messagebus/ybus.h>
#include <util/generic/noncopyable.h>
@@ -62,7 +62,7 @@ namespace NBus {
protected:
typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
TBusHandlerPtr MyPtr;
-
+
public:
template <class B>
TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) {
@@ -107,7 +107,7 @@ namespace NBus {
TNetAddr Addr;
bool UseAddr;
bool OneWay;
-
+
private:
TJobState(TReplyHandler handler,
EMessageStatus status,
@@ -126,7 +126,7 @@ namespace NBus {
Addr = *addr;
}
UseAddr = !!addr;
- }
+ }
public:
TString GetStatus(unsigned flags);
@@ -140,10 +140,10 @@ namespace NBus {
/// Maintains internal state of document in computation
class TBusJob {
TObjectCounter<TBusJob> ObjectCounter;
-
+
private:
void CheckThreadCurrentJob();
-
+
public:
/// given a module and starter message
TBusJob(TBusModule* module, TBusMessage* message);
@@ -154,9 +154,9 @@ namespace NBus {
TBusMessage* GetMessage() const {
return Message;
}
-
+
TNetAddr GetPeerAddrNetAddr() const;
-
+
/// send message to any other session or application
/// If addr is set then use it as destination.
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
@@ -164,7 +164,7 @@ namespace NBus {
void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
-
+
/// send reply to the starter message
virtual void SendReply(TBusMessageAutoPtr reply);
@@ -186,7 +186,7 @@ namespace NBus {
Y_ASSERT(stateVec);
*stateVec = Pending;
}
-
+
/// helper function to find state of previously sent messages
template <class MessageType>
TJobState* GetState(int* startFrom = nullptr) {
@@ -275,18 +275,18 @@ namespace NBus {
void Sleep(int milliSeconds);
void CallJobHandlerOnly();
-
+
private:
bool CallJobHandler();
void DoCallReplyHandler(TJobState&);
/// send out all Pending jobs, failed sends will be migrated to Finished
bool SendPending();
bool AnyPendingToSend();
-
+
public:
/// helper to call from OnReply() and OnError()
int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
-
+
public:
TJobHandler Handler; ///< job handler to be executed within next CallJobHandler()
EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap
@@ -315,48 +315,48 @@ namespace NBus {
////////////////////////////////////////////////////////////////////
/// \brief Classes to implement basic module functionality
-
+
class IJobFactory {
protected:
virtual ~IJobFactory() {
}
-
+
public:
/// job factory method, override to create custom jobs
virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0;
- };
-
+ };
+
struct TBusModuleConfig {
unsigned StarterMaxInFlight;
-
+
struct TSecret {
TDuration SchedulePeriod;
-
+
TSecret();
};
TSecret Secret;
-
+
TBusModuleConfig();
};
-
+
namespace NPrivate {
struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> {
virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0;
virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0;
virtual TBusMessageQueue* GetQueue() = 0;
-
+
virtual TString GetNameInternal() = 0;
-
+
virtual TString GetStatusSingleLine() = 0;
-
+
virtual ~TBusModuleInternal() {
}
};
}
-
+
class TBusModule: public IJobFactory, TNonCopyable {
friend class TBusJob;
-
+
TObjectCounter<TBusModule> ObjectCounter;
TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
@@ -365,7 +365,7 @@ namespace NBus {
/// Each module should have a name which is used as protocol service
TBusModule(const char* name);
~TBusModule() override;
-
+
const char* GetName() const;
void SetConfig(const TBusModuleConfig& config);
@@ -377,17 +377,17 @@ namespace NBus {
virtual bool StartInput();
/// called when application is about to exit
virtual bool Shutdown();
-
+
// this default implementation just creates TBusJob object
TBusJob* CreateJobInstance(TBusMessage* message) override;
EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
-
+
/// creates private sessions, calls CreateExtSession(), should be called before StartInput()
bool CreatePrivateSessions(TBusMessageQueue* queue);
-
+
virtual void OnClientConnectionEvent(const TClientConnectionEvent& event);
-
+
public:
/// entry point into module, first function to call
virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h
index 5e26e7e1e5..864f82b316 100644
--- a/library/cpp/messagebus/oldmodule/startsession.h
+++ b/library/cpp/messagebus/oldmodule/startsession.h
@@ -15,7 +15,7 @@ namespace NBus {
bool Exiting;
TCondVar ExitSignal;
TMutex ExitLock;
-
+
static void* _starter(void* data);
void Starter();
diff --git a/library/cpp/messagebus/oldmodule/ya.make b/library/cpp/messagebus/oldmodule/ya.make
index ca5eae74f0..03ff8a46ac 100644
--- a/library/cpp/messagebus/oldmodule/ya.make
+++ b/library/cpp/messagebus/oldmodule/ya.make
@@ -1,15 +1,15 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/messagebus
library/cpp/messagebus/actor
-)
-
-SRCS(
- module.cpp
- startsession.cpp
-)
-
-END()
+)
+
+SRCS(
+ module.cpp
+ startsession.cpp
+)
+
+END()