aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/oldmodule/module.h
blob: 8d1c4a5d52b7ae1a876d82f432f58168a8407521 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
            

                                                                           
 
                                                                         
                      
                                                                          
 
                                                                 
                                                                        
               
                                                                       
                                                                             
                                                                        
                                                                     
                                                                          
                                                                     
                                                                                   
 
                                                                                 
                                                                          
                                                                          
                                                                            
                                                                                
                                                                             
                                                                            
                                                                                         



                                                                                  
                         
 
                                        
 

                                        
                
                     
 





                                       
 


                                                                                           
 






                                                                              
                                                              
                                              
                                    
                                              







                                                                                
 
                                                                                                                          
 
                                                        
 













                                                 
 
















                                                                                                           
         
 

                                          
 
                                            
 




                                                             
 
                                     
 

                                                          
 
                                                                            
 

                                         
 
                                            
 


                                                                                                                                        
 
                                                                                                    
 
                                                         
 








                                                                     
         
 




                                                 
 















                                                                                            
             
                           
         















                                                                                          
             
                           
         









                                                                                          
             
                                   
         










                                                                                          

             
                                                                
 
                                     
 
                                       
 
                                                    
 
                                     
 
                                  
 




                                                                                
 

                                                                                           
 








                                                                                           
 











                                                                                
      
 
                                                                        
 


                                
 

                                                                     
      
                                    
 
                                     
 

                       
 
                           
 



                                                                                  
 
                                                  
 
                                                      
 


                                           
 
                                                        
 
                                                 
 
                                                     
 


                                                                            
 
                                    
 
                                                       
 
                                              
 


                                                     
 
                                                                  
 
                                                               
 
                                                                                                    
 
                                                                                  
 

                                                                       
 












                                                                                                                                                                            
 
 
#pragma once

///////////////////////////////////////////////////////////////////////////
/// \file
/// \brief Application interface for modules

/// NBus::TBusModule provides foundation for implementation of asynchnous
/// modules that communicate with multiple external or local sessions
/// NBus::TBusSession.

/// To implement the module some virtual functions needs to be overridden:

/// NBus::TBusModule::CreateExtSession() creates and registers an
/// external session that receives incoming messages as input for module
/// processing.

/// When new incoming message arrives the new NBus::TBusJob is created.
/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
/// during processing of one incoming message. Default implementation of
/// NBus::TBusJob will maintain all send and received messages during
/// lifetime of this job. Each message, status and reply can be found
/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
/// needs to maintain an additional information during lifetime of the job
/// you can derive your own class from NBus::TBusJob and override job
/// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.

/// Processing of a given message starts with a call to NBus::TBusModule::Start()
/// handler that should be overridden in the module implementation. Within
/// the callback handler module can perform any computation and access any
/// datastore tables that it needs. The handler can also access any module
/// variables. However, same handler can be called from multiple threads so,
/// it is recommended that handler only access read-only module level variables.

/// Handler should use NBus::TBusJob::Send() to send messages to other client
/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
/// job message. When handler is done, it returns the pointer to the next handler to call
/// when all pending messages have cleared. If handler
/// returns pointer to itself the module will reschedule execution of this handler
/// for a later time. This should be done in case NBus::TBusJob::Send() returns
/// error (not MESSAGE_OK)

#include "startsession.h"

#include <library/cpp/messagebus/ybus.h>

#include <util/generic/noncopyable.h>
#include <util/generic/object_counter.h>

namespace NBus {
    class TBusJob;
    class TBusModule;

    namespace NPrivate {
        struct TCallJobHandlerWorkItem;
        struct TBusModuleImpl;
        struct TModuleServerHandler;
        struct TModuleClientHandler;
        struct TJobRunner;
    }

    class TJobHandler {
    protected:
        typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
        TBusHandlerPtr MyPtr;

    public:
        template <class B>
        TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) {
            MyPtr = static_cast<TBusHandlerPtr>(fptr);
        }
        TJobHandler(TBusHandlerPtr fptr = nullptr) {
            MyPtr = fptr;
        }
        TJobHandler(const TJobHandler&) = default;
        TJobHandler& operator =(const TJobHandler&) = default;
        bool operator==(TJobHandler h) const {
            return MyPtr == h.MyPtr;
        }
        bool operator!=(TJobHandler h) const {
            return MyPtr != h.MyPtr;
        }
        bool operator!() const {
            return !MyPtr;
        }
        TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) {
            return (b->*MyPtr)(job, mess);
        }
    };

    typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);

    ////////////////////////////////////////////////////
    /// \brief Pending message state

    struct TJobState {
        friend class TBusJob;
        friend class ::TCrawlerModule;

        TReplyHandler Handler;
        EMessageStatus Status;
        TBusMessage* Message;
        TBusMessage* Reply;
        TBusClientSession* Session;
        size_t NumRetries;
        size_t MaxRetries;
        // If != NULL then use it as destination.
        TNetAddr Addr;
        bool UseAddr;
        bool OneWay;

    private:
        TJobState(TReplyHandler handler,
                  EMessageStatus status,
                  TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0,
                  const TNetAddr* addr = nullptr, bool oneWay = false)
            : Handler(handler)
            , Status(status)
            , Message(mess)
            , Reply(reply)
            , Session(session)
            , NumRetries(0)
            , MaxRetries(maxRetries)
            , OneWay(oneWay)
        {
            if (!!addr) {
                Addr = *addr;
            }
            UseAddr = !!addr;
        }

    public:
        TString GetStatus(unsigned flags);
    };

    using TJobStateVec = TVector<TJobState>;

    /////////////////////////////////////////////////////////
    /// \brief Execution item = thread

    /// Maintains internal state of document in computation
    class TBusJob {
        TObjectCounter<TBusJob> ObjectCounter;

    private:
        void CheckThreadCurrentJob();

    public:
        /// given a module and starter message
        TBusJob(TBusModule* module, TBusMessage* message);

        /// destructor will free all the message that were send and received
        virtual ~TBusJob();

        TBusMessage* GetMessage() const {
            return Message;
        }

        TNetAddr GetPeerAddrNetAddr() const;

        /// send message to any other session or application
        /// If addr is set then use it as destination.
        void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
        void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);

        void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
        void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);

        /// send reply to the starter message
        virtual void SendReply(TBusMessageAutoPtr reply);

        /// set the flag to terminate job at the earliest convenience
        void Cancel(EMessageStatus status);

        /// helper to put item on finished list of states
        /// It should not be a part of public API,
        /// so prohibit it for all except current users.
    private:
        friend class ::TCrawlerModule;
        void PutState(const TJobState& state) {
            Finished.push_back(state);
        }

    public:
        /// retrieve all pending messages
        void GetPending(TJobStateVec* stateVec) {
            Y_ASSERT(stateVec);
            *stateVec = Pending;
        }

        /// helper function to find state of previously sent messages
        template <class MessageType>
        TJobState* GetState(int* startFrom = nullptr) {
            for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
                TJobState* call = &Finished[i];
                if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) {
                    if (startFrom) {
                        *startFrom = i;
                    }
                    return call;
                }
                if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) {
                    if (startFrom) {
                        *startFrom = i;
                    }
                    return call;
                }
            }
            return nullptr;
        }

        /// helper function to find response for previously sent messages
        template <class MessageType>
        MessageType* Get(int* startFrom = nullptr) {
            for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
                TJobState& call = Finished[i];
                if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) {
                    if (startFrom) {
                        *startFrom = i;
                    }
                    return static_cast<MessageType*>(call.Reply);
                }
                if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
                    if (startFrom) {
                        *startFrom = i;
                    }
                    return static_cast<MessageType*>(call.Message);
                }
            }
            return nullptr;
        }

        /// helper function to find status for previously sent message
        template <class MessageType>
        EMessageStatus GetStatus(int* startFrom = nullptr) {
            for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
                TJobState& call = Finished[i];
                if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
                    if (startFrom) {
                        *startFrom = i;
                    }
                    return call.Status;
                }
            }
            return MESSAGE_UNKNOWN;
        }

        /// helper function to clear state of previosly sent messages
        template <class MessageType>
        void Clear() {
            for (size_t i = 0; i < Finished.size();) {
                // `Finished.size() - i` decreases with each iteration
                //  we either increment i, or remove element from Finished.
                TJobState& call = Finished[i];
                if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
                    ClearState(call);
                } else {
                    ++i;
                }
            }
        }

        /// helper function to clear state in order to try again
        void ClearState(TJobState& state);

        /// clears all message states
        void ClearAllMessageStates();

        /// returns true if job is done
        bool IsDone();

        /// return human reabable status of this job
        virtual TString GetStatus(unsigned flags);

        /// set sleep time for job
        void Sleep(int milliSeconds);

        void CallJobHandlerOnly();

    private:
        bool CallJobHandler();
        void DoCallReplyHandler(TJobState&);
        /// send out all Pending jobs, failed sends will be migrated to Finished
        bool SendPending();
        bool AnyPendingToSend();

    public:
        /// helper to call from OnReply() and OnError()
        int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply);

    public:
        TJobHandler Handler;   ///< job handler to be executed within next CallJobHandler()
        EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap
    private:
        NPrivate::TJobRunner* Runner;
        TBusMessage* Message;
        THolder<TBusMessage> MessageHolder;
        TOnMessageContext OnMessageContext; // starter
    public:
        bool ReplySent;

    private:
        friend class TBusModule;
        friend struct NPrivate::TBusModuleImpl;
        friend struct NPrivate::TCallJobHandlerWorkItem;
        friend struct NPrivate::TModuleServerHandler;
        friend struct NPrivate::TModuleClientHandler;
        friend struct NPrivate::TJobRunner;

        TJobStateVec Pending;  ///< messages currently outstanding via Send()
        TJobStateVec Finished; ///< messages that were replied to
        TBusModule* Module;
        NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
        TBusInstant SleepUntil;               ///< time to wakeup, 0 if no sleep
    };

    ////////////////////////////////////////////////////////////////////
    /// \brief Classes to implement basic module functionality

    class IJobFactory {
    protected:
        virtual ~IJobFactory() {
        }

    public:
        /// job factory method, override to create custom jobs
        virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0;
    };

    struct TBusModuleConfig {
        unsigned StarterMaxInFlight;

        struct TSecret {
            TDuration SchedulePeriod;

            TSecret();
        };
        TSecret Secret;

        TBusModuleConfig();
    };

    namespace NPrivate {
        struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> {
            virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0;
            virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0;
            virtual TBusMessageQueue* GetQueue() = 0;

            virtual TString GetNameInternal() = 0;

            virtual TString GetStatusSingleLine() = 0;

            virtual ~TBusModuleInternal() {
            }
        };
    }

    class TBusModule: public IJobFactory, TNonCopyable {
        friend class TBusJob;

        TObjectCounter<TBusModule> ObjectCounter;

        TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;

    public:
        /// Each module should have a name which is used as protocol service
        TBusModule(const char* name);
        ~TBusModule() override;

        const char* GetName() const;

        void SetConfig(const TBusModuleConfig& config);

        /// get status of all jobs in flight
        TString GetStatus(unsigned flags = 0);

        /// called when application is about to start
        virtual bool StartInput();
        /// called when application is about to exit
        virtual bool Shutdown();

        // this default implementation just creates TBusJob object
        TBusJob* CreateJobInstance(TBusMessage* message) override;

        EMessageStatus StartJob(TAutoPtr<TBusMessage> message);

        /// creates private sessions, calls CreateExtSession(), should be called before StartInput()
        bool CreatePrivateSessions(TBusMessageQueue* queue);

        virtual void OnClientConnectionEvent(const TClientConnectionEvent& event);

    public:
        /// entry point into module, first function to call
        virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;

    protected:
        /// override this function to create destination session
        virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;

    public:
        int GetModuleSessionInFlight() const;

        TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal();

    protected:
        TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString());
        TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString());
        TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
    };

}