aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule
diff options
context:
space:
mode:
authorvladimir <vladimir@yandex-team.ru>2022-02-10 16:50:28 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:28 +0300
commit3e7ff6e4ee637c04455854159e84850e613ebc16 (patch)
tree1ea1786a47f104a0657e0f935ce63dcaeec3fd26 /library/cpp/messagebus/oldmodule
parentdad82c0e0157ebad6bfd7cf0e5fb3c15c42922b3 (diff)
downloadydb-3e7ff6e4ee637c04455854159e84850e613ebc16.tar.gz
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule')
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp56
-rw-r--r--library/cpp/messagebus/oldmodule/module.h102
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.cpp40
-rw-r--r--library/cpp/messagebus/oldmodule/startsession.h14
4 files changed, 106 insertions, 106 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 24bd778799..3abcbfc87b 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -1,5 +1,5 @@
#include "module.h"
-
+
#include <library/cpp/messagebus/scheduler_actor.h>
#include <library/cpp/messagebus/thread_extra.h>
#include <library/cpp/messagebus/actor/actor.h>
@@ -64,7 +64,7 @@ namespace NBus {
namespace NPrivate {
class TJobStorage {
};
-
+
struct TModuleClientHandler
: public IBusClientHandler {
TModuleClientHandler(TBusModuleImpl* module)
@@ -334,16 +334,16 @@ namespace NBus {
ClearAllMessageStates();
}
-
+
TNetAddr TBusJob::GetPeerAddrNetAddr() const {
Y_VERIFY(!!OnMessageContext);
return OnMessageContext.GetPeerAddrNetAddr();
}
-
+
void TBusJob::CheckThreadCurrentJob() {
Y_ASSERT(ThreadCurrentJob == this);
}
-
+
/////////////////////////////////////////////////////////
/// \brief Send messages in pending list
@@ -405,17 +405,17 @@ namespace NBus {
}
return Pending.size() > 0;
}
-
+
bool TBusJob::AnyPendingToSend() {
for (unsigned i = 0; i < Pending.size(); ++i) {
if (Pending[i].Status == MESSAGE_DONT_ASK) {
return true;
}
- }
+ }
return false;
- }
-
+ }
+
bool TBusJob::IsDone() {
bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
return r;
@@ -427,7 +427,7 @@ namespace NBus {
Handler = Handler(ModuleImpl->Module, this, Message);
}
-
+
bool TBusJob::CallJobHandler() {
/// go on as far as we can go without waiting
while (!IsDone()) {
@@ -465,9 +465,9 @@ namespace NBus {
TThreadCurrentJobGuard threadCurrentJobGuard(this);
(Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
- }
+ }
}
-
+
int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
/// find handler for given message and update it's status
size_t i = 0;
@@ -476,34 +476,34 @@ namespace NBus {
if (call.Message == mess) {
break;
}
- }
+ }
/// if not found, report error
if (i == Pending.size()) {
Y_FAIL("must not happen");
}
-
+
/// fill in response into job state
TJobState& call = Pending[i];
call.Status = status;
Y_ASSERT(call.Message == mess);
call.Reply = reply;
-
+
if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
call.NumRetries++;
call.Status = MESSAGE_DONT_ASK;
call.Message->Reset(); // generate new Id
DoCallReplyHandler(call);
return 0;
- }
+ }
/// call the handler if provided
DoCallReplyHandler(call);
-
+
/// move job state into the finished stack
Finished.push_back(Pending[i]);
Pending.erase(Pending.begin() + i);
-
+
return 0;
}
@@ -511,21 +511,21 @@ namespace NBus {
/// send message to any other session or application
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
CheckThreadCurrentJob();
-
+
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
}
void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
CheckThreadCurrentJob();
-
+
SetJob(mess.Get(), Runner);
Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
}
void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
CheckThreadCurrentJob();
-
+
SetJob(req.Get(), Runner);
Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
}
@@ -559,7 +559,7 @@ namespace NBus {
Status = status;
}
-
+
void TBusJob::ClearState(TJobState& call) {
TJobStateVec::iterator it;
for (it = Finished.begin(); it != Finished.end(); ++it) {
@@ -569,10 +569,10 @@ namespace NBus {
Finished.erase(it);
return;
}
- }
+ }
Y_ASSERT(0);
- }
-
+ }
+
void TBusJob::ClearAllMessageStates() {
ClearJobStateVector(&Finished);
ClearJobStateVector(&Pending);
@@ -586,7 +586,7 @@ namespace NBus {
SleepUntil = Now() + milliSeconds;
}
-
+
TString TBusJob::GetStatus(unsigned flags) {
TString strReturn;
strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
@@ -624,8 +624,8 @@ namespace NBus {
if (job) {
job->Cancel(status);
}
- }
-
+ }
+
TString TBusModuleImpl::GetStatus(unsigned flags) {
Y_UNUSED(flags);
TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index 8d1c4a5d52..e263d1b618 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -1,55 +1,55 @@
#pragma once
-///////////////////////////////////////////////////////////////////////////
-/// \file
-/// \brief Application interface for modules
+///////////////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Application interface for modules
-/// NBus::TBusModule provides foundation for implementation of asynchnous
-/// modules that communicate with multiple external or local sessions
+/// NBus::TBusModule provides foundation for implementation of asynchnous
+/// modules that communicate with multiple external or local sessions
/// NBus::TBusSession.
/// To implement the module some virtual functions needs to be overridden:
/// NBus::TBusModule::CreateExtSession() creates and registers an
-/// external session that receives incoming messages as input for module
+/// external session that receives incoming messages as input for module
/// processing.
/// When new incoming message arrives the new NBus::TBusJob is created.
-/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
+/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
/// during processing of one incoming message. Default implementation of
/// NBus::TBusJob will maintain all send and received messages during
/// lifetime of this job. Each message, status and reply can be found
-/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
-/// needs to maintain an additional information during lifetime of the job
+/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
+/// needs to maintain an additional information during lifetime of the job
/// you can derive your own class from NBus::TBusJob and override job
/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
/// Processing of a given message starts with a call to NBus::TBusModule::Start()
/// handler that should be overridden in the module implementation. Within
-/// the callback handler module can perform any computation and access any
-/// datastore tables that it needs. The handler can also access any module
+/// the callback handler module can perform any computation and access any
+/// datastore tables that it needs. The handler can also access any module
/// variables. However, same handler can be called from multiple threads so,
/// it is recommended that handler only access read-only module level variables.
/// Handler should use NBus::TBusJob::Send() to send messages to other client
-/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
+/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
/// job message. When handler is done, it returns the pointer to the next handler to call
-/// when all pending messages have cleared. If handler
-/// returns pointer to itself the module will reschedule execution of this handler
-/// for a later time. This should be done in case NBus::TBusJob::Send() returns
-/// error (not MESSAGE_OK)
-
+/// when all pending messages have cleared. If handler
+/// returns pointer to itself the module will reschedule execution of this handler
+/// for a later time. This should be done in case NBus::TBusJob::Send() returns
+/// error (not MESSAGE_OK)
+
#include "startsession.h"
#include <library/cpp/messagebus/ybus.h>
-
+
#include <util/generic/noncopyable.h>
#include <util/generic/object_counter.h>
-namespace NBus {
+namespace NBus {
class TBusJob;
class TBusModule;
-
+
namespace NPrivate {
struct TCallJobHandlerWorkItem;
struct TBusModuleImpl;
@@ -57,7 +57,7 @@ namespace NBus {
struct TModuleClientHandler;
struct TJobRunner;
}
-
+
class TJobHandler {
protected:
typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
@@ -86,12 +86,12 @@ namespace NBus {
return (b->*MyPtr)(job, mess);
}
};
-
+
typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
-
+
////////////////////////////////////////////////////
/// \brief Pending message state
-
+
struct TJobState {
friend class TBusJob;
friend class ::TCrawlerModule;
@@ -131,9 +131,9 @@ namespace NBus {
public:
TString GetStatus(unsigned flags);
};
-
+
using TJobStateVec = TVector<TJobState>;
-
+
/////////////////////////////////////////////////////////
/// \brief Execution item = thread
@@ -147,10 +147,10 @@ namespace NBus {
public:
/// given a module and starter message
TBusJob(TBusModule* module, TBusMessage* message);
-
+
/// destructor will free all the message that were send and received
virtual ~TBusJob();
-
+
TBusMessage* GetMessage() const {
return Message;
}
@@ -161,13 +161,13 @@ namespace NBus {
/// If addr is set then use it as destination.
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
-
+
void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
/// send reply to the starter message
virtual void SendReply(TBusMessageAutoPtr reply);
-
+
/// set the flag to terminate job at the earliest convenience
void Cancel(EMessageStatus status);
@@ -179,7 +179,7 @@ namespace NBus {
void PutState(const TJobState& state) {
Finished.push_back(state);
}
-
+
public:
/// retrieve all pending messages
void GetPending(TJobStateVec* stateVec) {
@@ -225,10 +225,10 @@ namespace NBus {
}
return static_cast<MessageType*>(call.Message);
}
- }
+ }
return nullptr;
- }
-
+ }
+
/// helper function to find status for previously sent message
template <class MessageType>
EMessageStatus GetStatus(int* startFrom = nullptr) {
@@ -240,10 +240,10 @@ namespace NBus {
}
return call.Status;
}
- }
+ }
return MESSAGE_UNKNOWN;
- }
-
+ }
+
/// helper function to clear state of previosly sent messages
template <class MessageType>
void Clear() {
@@ -256,24 +256,24 @@ namespace NBus {
} else {
++i;
}
- }
- }
-
+ }
+ }
+
/// helper function to clear state in order to try again
void ClearState(TJobState& state);
-
+
/// clears all message states
void ClearAllMessageStates();
/// returns true if job is done
bool IsDone();
-
+
/// return human reabable status of this job
virtual TString GetStatus(unsigned flags);
/// set sleep time for job
void Sleep(int milliSeconds);
-
+
void CallJobHandlerOnly();
private:
@@ -297,7 +297,7 @@ namespace NBus {
TOnMessageContext OnMessageContext; // starter
public:
bool ReplySent;
-
+
private:
friend class TBusModule;
friend struct NPrivate::TBusModuleImpl;
@@ -312,7 +312,7 @@ namespace NBus {
NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
};
-
+
////////////////////////////////////////////////////////////////////
/// \brief Classes to implement basic module functionality
@@ -358,18 +358,18 @@ namespace NBus {
friend class TBusJob;
TObjectCounter<TBusModule> ObjectCounter;
-
+
TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
-
+
public:
/// Each module should have a name which is used as protocol service
TBusModule(const char* name);
~TBusModule() override;
const char* GetName() const;
-
+
void SetConfig(const TBusModuleConfig& config);
-
+
/// get status of all jobs in flight
TString GetStatus(unsigned flags = 0);
@@ -380,7 +380,7 @@ namespace NBus {
// this default implementation just creates TBusJob object
TBusJob* CreateJobInstance(TBusMessage* message) override;
-
+
EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
/// creates private sessions, calls CreateExtSession(), should be called before StartInput()
@@ -391,7 +391,7 @@ namespace NBus {
public:
/// entry point into module, first function to call
virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
-
+
protected:
/// override this function to create destination session
virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;
diff --git a/library/cpp/messagebus/oldmodule/startsession.cpp b/library/cpp/messagebus/oldmodule/startsession.cpp
index 7c38801d62..8126e11530 100644
--- a/library/cpp/messagebus/oldmodule/startsession.cpp
+++ b/library/cpp/messagebus/oldmodule/startsession.cpp
@@ -1,19 +1,19 @@
-///////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////
/// \file
-/// \brief Starter session implementation
+/// \brief Starter session implementation
-/// Starter session will generate emtpy message to insert
-/// into local session that are registered under same protocol
+/// Starter session will generate emtpy message to insert
+/// into local session that are registered under same protocol
-/// Starter (will one day) automatically adjust number
-/// of message inflight to make sure that at least one of source
-/// sessions within message queue is at the limit (bottle neck)
-
-/// Maximum number of messages that starter will instert into
-/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
-
-#include "startsession.h"
+/// Starter (will one day) automatically adjust number
+/// of message inflight to make sure that at least one of source
+/// sessions within message queue is at the limit (bottle neck)
+/// Maximum number of messages that starter will instert into
+/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
+
+#include "startsession.h"
+
#include "module.h"
#include <library/cpp/messagebus/ybus.h>
@@ -24,7 +24,7 @@ namespace NBus {
pThis->Starter();
return nullptr;
}
-
+
TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
: Module(module)
, Config(config)
@@ -33,11 +33,11 @@ namespace NBus {
{
StartThread.Start();
}
-
+
TBusStarter::~TBusStarter() {
Shutdown();
}
-
+
void TBusStarter::Shutdown() {
{
TGuard<TMutex> g(ExitLock);
@@ -46,20 +46,20 @@ namespace NBus {
}
StartThread.Join();
}
-
+
void TBusStarter::Starter() {
TGuard<TMutex> g(ExitLock);
while (!Exiting) {
TAutoPtr<TBusMessage> empty(new TBusMessage(0));
-
+
EMessageStatus status = Module->StartJob(empty);
-
+
if (Config.SendTimeout > 0) {
ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
} else {
ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
}
- }
- }
+ }
+ }
}
diff --git a/library/cpp/messagebus/oldmodule/startsession.h b/library/cpp/messagebus/oldmodule/startsession.h
index 5e26e7e1e5..c6b407743d 100644
--- a/library/cpp/messagebus/oldmodule/startsession.h
+++ b/library/cpp/messagebus/oldmodule/startsession.h
@@ -1,12 +1,12 @@
#pragma once
-
+
#include <library/cpp/messagebus/ybus.h>
-#include <util/system/thread.h>
-
-namespace NBus {
+#include <util/system/thread.h>
+
+namespace NBus {
class TBusModule;
-
+
class TBusStarter {
private:
TBusModule* Module;
@@ -23,12 +23,12 @@ namespace NBus {
TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) {
return "";
}
-
+
public:
TBusStarter(TBusModule* module, const TBusSessionConfig& config);
~TBusStarter();
void Shutdown();
};
-
+
}