diff options
author | vladimir <vladimir@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:28 +0300 |
commit | 3e7ff6e4ee637c04455854159e84850e613ebc16 (patch) | |
tree | 1ea1786a47f104a0657e0f935ce63dcaeec3fd26 /library/cpp/messagebus/oldmodule/module.cpp | |
parent | dad82c0e0157ebad6bfd7cf0e5fb3c15c42922b3 (diff) | |
download | ydb-3e7ff6e4ee637c04455854159e84850e613ebc16.tar.gz |
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.cpp')
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 56 |
1 files changed, 28 insertions, 28 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..3abcbfc87b 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -1,5 +1,5 @@ #include "module.h" - + #include <library/cpp/messagebus/scheduler_actor.h> #include <library/cpp/messagebus/thread_extra.h> #include <library/cpp/messagebus/actor/actor.h> @@ -64,7 +64,7 @@ namespace NBus { namespace NPrivate { class TJobStorage { }; - + struct TModuleClientHandler : public IBusClientHandler { TModuleClientHandler(TBusModuleImpl* module) @@ -334,16 +334,16 @@ namespace NBus { ClearAllMessageStates(); } - + TNetAddr TBusJob::GetPeerAddrNetAddr() const { Y_VERIFY(!!OnMessageContext); return OnMessageContext.GetPeerAddrNetAddr(); } - + void TBusJob::CheckThreadCurrentJob() { Y_ASSERT(ThreadCurrentJob == this); } - + ///////////////////////////////////////////////////////// /// \brief Send messages in pending list @@ -405,17 +405,17 @@ namespace NBus { } return Pending.size() > 0; } - + bool TBusJob::AnyPendingToSend() { for (unsigned i = 0; i < Pending.size(); ++i) { if (Pending[i].Status == MESSAGE_DONT_ASK) { return true; } - } + } return false; - } - + } + bool TBusJob::IsDone() { bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK)); return r; @@ -427,7 +427,7 @@ namespace NBus { Handler = Handler(ModuleImpl->Module, this, Message); } - + bool TBusJob::CallJobHandler() { /// go on as far as we can go without waiting while (!IsDone()) { @@ -465,9 +465,9 @@ namespace NBus { TThreadCurrentJobGuard threadCurrentJobGuard(this); (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply); - } + } } - + int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { /// find handler for given message and update it's status size_t i = 0; @@ -476,34 +476,34 @@ namespace NBus { if (call.Message == mess) { break; } - } + } /// if not found, report error if (i == Pending.size()) { Y_FAIL("must not happen"); } - + /// fill in response into job state TJobState& call = Pending[i]; call.Status = status; Y_ASSERT(call.Message == mess); call.Reply = reply; - + if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) { call.NumRetries++; call.Status = MESSAGE_DONT_ASK; call.Message->Reset(); // generate new Id DoCallReplyHandler(call); return 0; - } + } /// call the handler if provided DoCallReplyHandler(call); - + /// move job state into the finished stack Finished.push_back(Pending[i]); Pending.erase(Pending.begin() + i); - + return 0; } @@ -511,21 +511,21 @@ namespace NBus { /// send message to any other session or application void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) { CheckThreadCurrentJob(); - + SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false)); } void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) { CheckThreadCurrentJob(); - + SetJob(mess.Get(), Runner); Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false)); } void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) { CheckThreadCurrentJob(); - + SetJob(req.Get(), Runner); Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true)); } @@ -559,7 +559,7 @@ namespace NBus { Status = status; } - + void TBusJob::ClearState(TJobState& call) { TJobStateVec::iterator it; for (it = Finished.begin(); it != Finished.end(); ++it) { @@ -569,10 +569,10 @@ namespace NBus { Finished.erase(it); return; } - } + } Y_ASSERT(0); - } - + } + void TBusJob::ClearAllMessageStates() { ClearJobStateVector(&Finished); ClearJobStateVector(&Pending); @@ -586,7 +586,7 @@ namespace NBus { SleepUntil = Now() + milliSeconds; } - + TString TBusJob::GetStatus(unsigned flags) { TString strReturn; strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n", @@ -624,8 +624,8 @@ namespace NBus { if (job) { job->Cancel(status); } - } - + } + TString TBusModuleImpl::GetStatus(unsigned flags) { Y_UNUSED(flags); TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus"); |