aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/module.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/oldmodule/module.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/oldmodule/module.h')
-rw-r--r--library/cpp/messagebus/oldmodule/module.h410
1 files changed, 410 insertions, 0 deletions
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
new file mode 100644
index 0000000000..8d1c4a5d52
--- /dev/null
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -0,0 +1,410 @@
+#pragma once
+
+///////////////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Application interface for modules
+
+/// NBus::TBusModule provides foundation for implementation of asynchnous
+/// modules that communicate with multiple external or local sessions
+/// NBus::TBusSession.
+
+/// To implement the module some virtual functions needs to be overridden:
+
+/// NBus::TBusModule::CreateExtSession() creates and registers an
+/// external session that receives incoming messages as input for module
+/// processing.
+
+/// When new incoming message arrives the new NBus::TBusJob is created.
+/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
+/// during processing of one incoming message. Default implementation of
+/// NBus::TBusJob will maintain all send and received messages during
+/// lifetime of this job. Each message, status and reply can be found
+/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
+/// needs to maintain an additional information during lifetime of the job
+/// you can derive your own class from NBus::TBusJob and override job
+/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
+
+/// Processing of a given message starts with a call to NBus::TBusModule::Start()
+/// handler that should be overridden in the module implementation. Within
+/// the callback handler module can perform any computation and access any
+/// datastore tables that it needs. The handler can also access any module
+/// variables. However, same handler can be called from multiple threads so,
+/// it is recommended that handler only access read-only module level variables.
+
+/// Handler should use NBus::TBusJob::Send() to send messages to other client
+/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
+/// job message. When handler is done, it returns the pointer to the next handler to call
+/// when all pending messages have cleared. If handler
+/// returns pointer to itself the module will reschedule execution of this handler
+/// for a later time. This should be done in case NBus::TBusJob::Send() returns
+/// error (not MESSAGE_OK)
+
+#include "startsession.h"
+
+#include <library/cpp/messagebus/ybus.h>
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/object_counter.h>
+
+namespace NBus {
+ class TBusJob;
+ class TBusModule;
+
+ namespace NPrivate {
+ struct TCallJobHandlerWorkItem;
+ struct TBusModuleImpl;
+ struct TModuleServerHandler;
+ struct TModuleClientHandler;
+ struct TJobRunner;
+ }
+
+ class TJobHandler {
+ protected:
+ typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
+ TBusHandlerPtr MyPtr;
+
+ public:
+ template <class B>
+ TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) {
+ MyPtr = static_cast<TBusHandlerPtr>(fptr);
+ }
+ TJobHandler(TBusHandlerPtr fptr = nullptr) {
+ MyPtr = fptr;
+ }
+ TJobHandler(const TJobHandler&) = default;
+ TJobHandler& operator =(const TJobHandler&) = default;
+ bool operator==(TJobHandler h) const {
+ return MyPtr == h.MyPtr;
+ }
+ bool operator!=(TJobHandler h) const {
+ return MyPtr != h.MyPtr;
+ }
+ bool operator!() const {
+ return !MyPtr;
+ }
+ TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) {
+ return (b->*MyPtr)(job, mess);
+ }
+ };
+
+ typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
+
+ ////////////////////////////////////////////////////
+ /// \brief Pending message state
+
+ struct TJobState {
+ friend class TBusJob;
+ friend class ::TCrawlerModule;
+
+ TReplyHandler Handler;
+ EMessageStatus Status;
+ TBusMessage* Message;
+ TBusMessage* Reply;
+ TBusClientSession* Session;
+ size_t NumRetries;
+ size_t MaxRetries;
+ // If != NULL then use it as destination.
+ TNetAddr Addr;
+ bool UseAddr;
+ bool OneWay;
+
+ private:
+ TJobState(TReplyHandler handler,
+ EMessageStatus status,
+ TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0,
+ const TNetAddr* addr = nullptr, bool oneWay = false)
+ : Handler(handler)
+ , Status(status)
+ , Message(mess)
+ , Reply(reply)
+ , Session(session)
+ , NumRetries(0)
+ , MaxRetries(maxRetries)
+ , OneWay(oneWay)
+ {
+ if (!!addr) {
+ Addr = *addr;
+ }
+ UseAddr = !!addr;
+ }
+
+ public:
+ TString GetStatus(unsigned flags);
+ };
+
+ using TJobStateVec = TVector<TJobState>;
+
+ /////////////////////////////////////////////////////////
+ /// \brief Execution item = thread
+
+ /// Maintains internal state of document in computation
+ class TBusJob {
+ TObjectCounter<TBusJob> ObjectCounter;
+
+ private:
+ void CheckThreadCurrentJob();
+
+ public:
+ /// given a module and starter message
+ TBusJob(TBusModule* module, TBusMessage* message);
+
+ /// destructor will free all the message that were send and received
+ virtual ~TBusJob();
+
+ TBusMessage* GetMessage() const {
+ return Message;
+ }
+
+ TNetAddr GetPeerAddrNetAddr() const;
+
+ /// send message to any other session or application
+ /// If addr is set then use it as destination.
+ void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
+ void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
+
+ void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
+ void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
+
+ /// send reply to the starter message
+ virtual void SendReply(TBusMessageAutoPtr reply);
+
+ /// set the flag to terminate job at the earliest convenience
+ void Cancel(EMessageStatus status);
+
+ /// helper to put item on finished list of states
+ /// It should not be a part of public API,
+ /// so prohibit it for all except current users.
+ private:
+ friend class ::TCrawlerModule;
+ void PutState(const TJobState& state) {
+ Finished.push_back(state);
+ }
+
+ public:
+ /// retrieve all pending messages
+ void GetPending(TJobStateVec* stateVec) {
+ Y_ASSERT(stateVec);
+ *stateVec = Pending;
+ }
+
+ /// helper function to find state of previously sent messages
+ template <class MessageType>
+ TJobState* GetState(int* startFrom = nullptr) {
+ for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
+ TJobState* call = &Finished[i];
+ if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return call;
+ }
+ if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return call;
+ }
+ }
+ return nullptr;
+ }
+
+ /// helper function to find response for previously sent messages
+ template <class MessageType>
+ MessageType* Get(int* startFrom = nullptr) {
+ for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
+ TJobState& call = Finished[i];
+ if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return static_cast<MessageType*>(call.Reply);
+ }
+ if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return static_cast<MessageType*>(call.Message);
+ }
+ }
+ return nullptr;
+ }
+
+ /// helper function to find status for previously sent message
+ template <class MessageType>
+ EMessageStatus GetStatus(int* startFrom = nullptr) {
+ for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
+ TJobState& call = Finished[i];
+ if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
+ if (startFrom) {
+ *startFrom = i;
+ }
+ return call.Status;
+ }
+ }
+ return MESSAGE_UNKNOWN;
+ }
+
+ /// helper function to clear state of previosly sent messages
+ template <class MessageType>
+ void Clear() {
+ for (size_t i = 0; i < Finished.size();) {
+ // `Finished.size() - i` decreases with each iteration
+ // we either increment i, or remove element from Finished.
+ TJobState& call = Finished[i];
+ if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
+ ClearState(call);
+ } else {
+ ++i;
+ }
+ }
+ }
+
+ /// helper function to clear state in order to try again
+ void ClearState(TJobState& state);
+
+ /// clears all message states
+ void ClearAllMessageStates();
+
+ /// returns true if job is done
+ bool IsDone();
+
+ /// return human reabable status of this job
+ virtual TString GetStatus(unsigned flags);
+
+ /// set sleep time for job
+ void Sleep(int milliSeconds);
+
+ void CallJobHandlerOnly();
+
+ private:
+ bool CallJobHandler();
+ void DoCallReplyHandler(TJobState&);
+ /// send out all Pending jobs, failed sends will be migrated to Finished
+ bool SendPending();
+ bool AnyPendingToSend();
+
+ public:
+ /// helper to call from OnReply() and OnError()
+ int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
+
+ public:
+ TJobHandler Handler; ///< job handler to be executed within next CallJobHandler()
+ EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap
+ private:
+ NPrivate::TJobRunner* Runner;
+ TBusMessage* Message;
+ THolder<TBusMessage> MessageHolder;
+ TOnMessageContext OnMessageContext; // starter
+ public:
+ bool ReplySent;
+
+ private:
+ friend class TBusModule;
+ friend struct NPrivate::TBusModuleImpl;
+ friend struct NPrivate::TCallJobHandlerWorkItem;
+ friend struct NPrivate::TModuleServerHandler;
+ friend struct NPrivate::TModuleClientHandler;
+ friend struct NPrivate::TJobRunner;
+
+ TJobStateVec Pending; ///< messages currently outstanding via Send()
+ TJobStateVec Finished; ///< messages that were replied to
+ TBusModule* Module;
+ NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
+ TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief Classes to implement basic module functionality
+
+ class IJobFactory {
+ protected:
+ virtual ~IJobFactory() {
+ }
+
+ public:
+ /// job factory method, override to create custom jobs
+ virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0;
+ };
+
+ struct TBusModuleConfig {
+ unsigned StarterMaxInFlight;
+
+ struct TSecret {
+ TDuration SchedulePeriod;
+
+ TSecret();
+ };
+ TSecret Secret;
+
+ TBusModuleConfig();
+ };
+
+ namespace NPrivate {
+ struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> {
+ virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0;
+ virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0;
+ virtual TBusMessageQueue* GetQueue() = 0;
+
+ virtual TString GetNameInternal() = 0;
+
+ virtual TString GetStatusSingleLine() = 0;
+
+ virtual ~TBusModuleInternal() {
+ }
+ };
+ }
+
+ class TBusModule: public IJobFactory, TNonCopyable {
+ friend class TBusJob;
+
+ TObjectCounter<TBusModule> ObjectCounter;
+
+ TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
+
+ public:
+ /// Each module should have a name which is used as protocol service
+ TBusModule(const char* name);
+ ~TBusModule() override;
+
+ const char* GetName() const;
+
+ void SetConfig(const TBusModuleConfig& config);
+
+ /// get status of all jobs in flight
+ TString GetStatus(unsigned flags = 0);
+
+ /// called when application is about to start
+ virtual bool StartInput();
+ /// called when application is about to exit
+ virtual bool Shutdown();
+
+ // this default implementation just creates TBusJob object
+ TBusJob* CreateJobInstance(TBusMessage* message) override;
+
+ EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
+
+ /// creates private sessions, calls CreateExtSession(), should be called before StartInput()
+ bool CreatePrivateSessions(TBusMessageQueue* queue);
+
+ virtual void OnClientConnectionEvent(const TClientConnectionEvent& event);
+
+ public:
+ /// entry point into module, first function to call
+ virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
+
+ protected:
+ /// override this function to create destination session
+ virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;
+
+ public:
+ int GetModuleSessionInFlight() const;
+
+ TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal();
+
+ protected:
+ TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString());
+ TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString());
+ TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
+ };
+
+}