aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/module.h
diff options
context:
space:
mode:
authorvladimir <vladimir@yandex-team.ru>2022-02-10 16:50:28 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:28 +0300
commit3e7ff6e4ee637c04455854159e84850e613ebc16 (patch)
tree1ea1786a47f104a0657e0f935ce63dcaeec3fd26 /library/cpp/messagebus/oldmodule/module.h
parentdad82c0e0157ebad6bfd7cf0e5fb3c15c42922b3 (diff)
downloadydb-3e7ff6e4ee637c04455854159e84850e613ebc16.tar.gz
Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.h')
-rw-r--r--library/cpp/messagebus/oldmodule/module.h102
1 files changed, 51 insertions, 51 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index 8d1c4a5d52..e263d1b618 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -1,55 +1,55 @@
#pragma once
-///////////////////////////////////////////////////////////////////////////
-/// \file
-/// \brief Application interface for modules
+///////////////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Application interface for modules
-/// NBus::TBusModule provides foundation for implementation of asynchnous
-/// modules that communicate with multiple external or local sessions
+/// NBus::TBusModule provides foundation for implementation of asynchnous
+/// modules that communicate with multiple external or local sessions
/// NBus::TBusSession.
/// To implement the module some virtual functions needs to be overridden:
/// NBus::TBusModule::CreateExtSession() creates and registers an
-/// external session that receives incoming messages as input for module
+/// external session that receives incoming messages as input for module
/// processing.
/// When new incoming message arrives the new NBus::TBusJob is created.
-/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
+/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
/// during processing of one incoming message. Default implementation of
/// NBus::TBusJob will maintain all send and received messages during
/// lifetime of this job. Each message, status and reply can be found
-/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
-/// needs to maintain an additional information during lifetime of the job
+/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
+/// needs to maintain an additional information during lifetime of the job
/// you can derive your own class from NBus::TBusJob and override job
/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
/// Processing of a given message starts with a call to NBus::TBusModule::Start()
/// handler that should be overridden in the module implementation. Within
-/// the callback handler module can perform any computation and access any
-/// datastore tables that it needs. The handler can also access any module
+/// the callback handler module can perform any computation and access any
+/// datastore tables that it needs. The handler can also access any module
/// variables. However, same handler can be called from multiple threads so,
/// it is recommended that handler only access read-only module level variables.
/// Handler should use NBus::TBusJob::Send() to send messages to other client
-/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
+/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
/// job message. When handler is done, it returns the pointer to the next handler to call
-/// when all pending messages have cleared. If handler
-/// returns pointer to itself the module will reschedule execution of this handler
-/// for a later time. This should be done in case NBus::TBusJob::Send() returns
-/// error (not MESSAGE_OK)
-
+/// when all pending messages have cleared. If handler
+/// returns pointer to itself the module will reschedule execution of this handler
+/// for a later time. This should be done in case NBus::TBusJob::Send() returns
+/// error (not MESSAGE_OK)
+
#include "startsession.h"
#include <library/cpp/messagebus/ybus.h>
-
+
#include <util/generic/noncopyable.h>
#include <util/generic/object_counter.h>
-namespace NBus {
+namespace NBus {
class TBusJob;
class TBusModule;
-
+
namespace NPrivate {
struct TCallJobHandlerWorkItem;
struct TBusModuleImpl;
@@ -57,7 +57,7 @@ namespace NBus {
struct TModuleClientHandler;
struct TJobRunner;
}
-
+
class TJobHandler {
protected:
typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
@@ -86,12 +86,12 @@ namespace NBus {
return (b->*MyPtr)(job, mess);
}
};
-
+
typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
-
+
////////////////////////////////////////////////////
/// \brief Pending message state
-
+
struct TJobState {
friend class TBusJob;
friend class ::TCrawlerModule;
@@ -131,9 +131,9 @@ namespace NBus {
public:
TString GetStatus(unsigned flags);
};
-
+
using TJobStateVec = TVector<TJobState>;
-
+
/////////////////////////////////////////////////////////
/// \brief Execution item = thread
@@ -147,10 +147,10 @@ namespace NBus {
public:
/// given a module and starter message
TBusJob(TBusModule* module, TBusMessage* message);
-
+
/// destructor will free all the message that were send and received
virtual ~TBusJob();
-
+
TBusMessage* GetMessage() const {
return Message;
}
@@ -161,13 +161,13 @@ namespace NBus {
/// If addr is set then use it as destination.
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
-
+
void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
/// send reply to the starter message
virtual void SendReply(TBusMessageAutoPtr reply);
-
+
/// set the flag to terminate job at the earliest convenience
void Cancel(EMessageStatus status);
@@ -179,7 +179,7 @@ namespace NBus {
void PutState(const TJobState& state) {
Finished.push_back(state);
}
-
+
public:
/// retrieve all pending messages
void GetPending(TJobStateVec* stateVec) {
@@ -225,10 +225,10 @@ namespace NBus {
}
return static_cast<MessageType*>(call.Message);
}
- }
+ }
return nullptr;
- }
-
+ }
+
/// helper function to find status for previously sent message
template <class MessageType>
EMessageStatus GetStatus(int* startFrom = nullptr) {
@@ -240,10 +240,10 @@ namespace NBus {
}
return call.Status;
}
- }
+ }
return MESSAGE_UNKNOWN;
- }
-
+ }
+
/// helper function to clear state of previosly sent messages
template <class MessageType>
void Clear() {
@@ -256,24 +256,24 @@ namespace NBus {
} else {
++i;
}
- }
- }
-
+ }
+ }
+
/// helper function to clear state in order to try again
void ClearState(TJobState& state);
-
+
/// clears all message states
void ClearAllMessageStates();
/// returns true if job is done
bool IsDone();
-
+
/// return human reabable status of this job
virtual TString GetStatus(unsigned flags);
/// set sleep time for job
void Sleep(int milliSeconds);
-
+
void CallJobHandlerOnly();
private:
@@ -297,7 +297,7 @@ namespace NBus {
TOnMessageContext OnMessageContext; // starter
public:
bool ReplySent;
-
+
private:
friend class TBusModule;
friend struct NPrivate::TBusModuleImpl;
@@ -312,7 +312,7 @@ namespace NBus {
NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
};
-
+
////////////////////////////////////////////////////////////////////
/// \brief Classes to implement basic module functionality
@@ -358,18 +358,18 @@ namespace NBus {
friend class TBusJob;
TObjectCounter<TBusModule> ObjectCounter;
-
+
TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
-
+
public:
/// Each module should have a name which is used as protocol service
TBusModule(const char* name);
~TBusModule() override;
const char* GetName() const;
-
+
void SetConfig(const TBusModuleConfig& config);
-
+
/// get status of all jobs in flight
TString GetStatus(unsigned flags = 0);
@@ -380,7 +380,7 @@ namespace NBus {
// this default implementation just creates TBusJob object
TBusJob* CreateJobInstance(TBusMessage* message) override;
-
+
EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
/// creates private sessions, calls CreateExtSession(), should be called before StartInput()
@@ -391,7 +391,7 @@ namespace NBus {
public:
/// entry point into module, first function to call
virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
-
+
protected:
/// override this function to create destination session
virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;