aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/module.cpp
diff options
context:
space:
mode:
authorvladimir <vladimir@yandex-team.ru>2022-02-10 16:50:29 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:29 +0300
commit4bac7bacd041dac72ece081598805d03d2e80a3e (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/oldmodule/module.cpp
parent3e7ff6e4ee637c04455854159e84850e613ebc16 (diff)
downloadydb-4bac7bacd041dac72ece081598805d03d2e80a3e.tar.gz
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.cpp')
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp56
1 files changed, 28 insertions, 28 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 3abcbfc87b..24bd778799 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -1,5 +1,5 @@
#include "module.h"
-
+
#include <library/cpp/messagebus/scheduler_actor.h>
#include <library/cpp/messagebus/thread_extra.h>
#include <library/cpp/messagebus/actor/actor.h>
@@ -64,7 +64,7 @@ namespace NBus {
namespace NPrivate {
class TJobStorage {
};
-
+
struct TModuleClientHandler
: public IBusClientHandler {
TModuleClientHandler(TBusModuleImpl* module)
@@ -334,16 +334,16 @@ namespace NBus {
ClearAllMessageStates();
}
-
+
TNetAddr TBusJob::GetPeerAddrNetAddr() const {
Y_VERIFY(!!OnMessageContext);
return OnMessageContext.GetPeerAddrNetAddr();
}
-
+
void TBusJob::CheckThreadCurrentJob() {
Y_ASSERT(ThreadCurrentJob == this);
}
-
+
/////////////////////////////////////////////////////////
/// \brief Send messages in pending list
@@ -405,17 +405,17 @@ namespace NBus {
}
return Pending.size() > 0;
}
-
+
bool TBusJob::AnyPendingToSend() {
for (unsigned i = 0; i < Pending.size(); ++i) {
if (Pending[i].Status == MESSAGE_DONT_ASK) {
return true;
}
- }
+ }
return false;
- }
-
+ }
+
bool TBusJob::IsDone() {
bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
return r;
@@ -427,7 +427,7 @@ namespace NBus {
Handler = Handler(ModuleImpl->Module, this, Message);
}
-
+
bool TBusJob::CallJobHandler() {
/// go on as far as we can go without waiting
while (!IsDone()) {
@@ -465,9 +465,9 @@ namespace NBus {
TThreadCurrentJobGuard threadCurrentJobGuard(this);
(Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
- }
+ }
}
-
+
int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
/// find handler for given message and update it's status
size_t i = 0;
@@ -476,34 +476,34 @@ namespace NBus {
if (call.Message == mess) {
break;
}
- }
+ }
/// if not found, report error
if (i == Pending.size()) {
Y_FAIL("must not happen");
}
-
+
/// fill in response into job state
TJobState& call = Pending[i];
call.Status = status;
Y_ASSERT(call.Message == mess);
call.Reply = reply;
-
+
if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
call.NumRetries++;
call.Status = MESSAGE_DONT_ASK;
call.Message->Reset(); // generate new Id
DoCallReplyHandler(call);
return 0;
- }
+ }
/// call the handler if provided
DoCallReplyHandler(call);
-
+
/// move job state into the finished stack
Finished.push_back(Pending[i]);
Pending.erase(Pending.begin() + i);
-
+
return 0;
}
@@ -511,21 +511,21 @@ namespace NBus {
/// send message to any other session or application
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
CheckThreadCurrentJob();
-
+
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
}
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
CheckThreadCurrentJob();
-
+
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
}
void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
CheckThreadCurrentJob();
-
+
SetJob(req.Get(), Runner);
Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
}
@@ -559,7 +559,7 @@ namespace NBus {
Status = status;
}
-
+
void TBusJob::ClearState(TJobState& call) {
TJobStateVec::iterator it;
for (it = Finished.begin(); it != Finished.end(); ++it) {
@@ -569,10 +569,10 @@ namespace NBus {
Finished.erase(it);
return;
}
- }
+ }
Y_ASSERT(0);
- }
-
+ }
+
void TBusJob::ClearAllMessageStates() {
ClearJobStateVector(&Finished);
ClearJobStateVector(&Pending);
@@ -586,7 +586,7 @@ namespace NBus {
SleepUntil = Now() + milliSeconds;
}
-
+
TString TBusJob::GetStatus(unsigned flags) {
TString strReturn;
strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
@@ -624,8 +624,8 @@ namespace NBus {
if (job) {
job->Cancel(status);
}
- }
-
+ }
+
TString TBusModuleImpl::GetStatus(unsigned flags) {
Y_UNUSED(flags);
TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");