aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/session.h
blob: fb12ab7c22910263fecc1d8964fb3a5d30cabf54 (plain) (tree)
1
2
3
4
5
6
7
8
9
            
                       
                 
                    
                    
                    
                           
                          
 
                                                           
 

                                   
                


                                                                   
 
                                                                       
 


                                                                           
 


                                                                        
 
                                                                                                                            
 









                                                                           
 
                                                                                                                                                
 
                      
 
                                              
 
                                    
 
                               
 
                                                             
 

                                           

                                                  
                                                                           
 
                                                         
 


                                                                                                                     
 
                                                                                                                           
 



                                                                                                                      
                                        
                          
 



                                                                                                                            
                                        
                          
 

                                                                                                                       
 

                                                                                                                             
 
                                                  
 
                                                             
 

                                           

                                                  

                                           
                                                  
                                                
 
                                                                                           
 
                                                                            
 


                                                                                                  
                                         
                          
         

                                                                                    
 




                                                        
 



                                                                                                     
 




                                                             
 

                                    
 


                                                                         

     



                                                                                   
 








                                                                                                             
 










                                                 
 

                                
 


                                               
 


                                         
 
#pragma once

#include "connection.h"
#include "defs.h"
#include "handler.h"
#include "message.h"
#include "netaddr.h"
#include "network.h"
#include "session_config.h"
#include "misc/weak_ptr.h"

#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>

#include <util/generic/array_ref.h>
#include <util/generic/ptr.h>

namespace NBus {
    template <typename TBusSessionSubclass>
    class TBusSessionPtr;
    using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
    using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;

    ///////////////////////////////////////////////////////////////////
    /// \brief Interface of session object.

    /// Each client and server
    /// should instantiate session object to be able to communicate via bus
    /// client: sess = queue->CreateSource(protocol, handler);
    /// server: sess = queue->CreateDestination(protocol, handler);

    class TBusSession: public TWeakRefCounted<TBusSession> {
    public:
        size_t GetInFlight(const TNetAddr& addr) const;
        size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;

        virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
        virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;

        virtual int GetInFlight() const noexcept = 0;
        /// monitoring status of current session and it's connections
        virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
        virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
        virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
        virtual TString GetStatusSingleLine() = 0;
        /// return session config
        virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
        /// return session protocol
        virtual const TBusProtocol* GetProto() const noexcept = 0;
        virtual TBusMessageQueue* GetQueue() const noexcept = 0;

        /// registers external session on host:port with locator service
        int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);

    protected:
        TBusSession();

    public:
        virtual TString GetNameInternal() = 0;

        virtual void Shutdown() = 0;

        virtual ~TBusSession();
    };

    struct TBusClientSession: public virtual TBusSession {
        typedef ::NBus::NPrivate::TRemoteClientSession TImpl;

        static TBusClientSessionPtr Create(
            TBusProtocol* proto,
            IBusClientHandler* handler,
            const TBusClientSessionConfig& config,
            TBusMessageQueuePtr queue);

        virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;

        /// if you want to open connection early
        virtual void OpenConnection(const TNetAddr&) = 0;

        /// Send message to the destination
        /// If addr is set then use it as destination.
        /// Takes ownership of addr (see ClearState method).
        virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;

        virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;

        /// Like SendMessage but cares about message
        template <typename T /* <: TBusMessage */>
        EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
            EMessageStatus status = SendMessage(mes.Get(), addr, wait);
            if (status == MESSAGE_OK)
                Y_UNUSED(mes.Release());
            return status;
        }

        /// Like SendMessageOneWay but cares about message
        template <typename T /* <: TBusMessage */>
        EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
            EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
            if (status == MESSAGE_OK)
                Y_UNUSED(mes.Release());
            return status;
        }

        EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
            return SendMessageAutoPtr(message, addr, wait);
        }

        EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
            return SendMessageOneWayAutoPtr(message, addr, wait);
        }

        // TODO: implement similar one-way methods
    };

    struct TBusServerSession: public virtual TBusSession {
        typedef ::NBus::NPrivate::TRemoteServerSession TImpl;

        static TBusServerSessionPtr Create(
            TBusProtocol* proto,
            IBusServerHandler* handler,
            const TBusServerSessionConfig& config,
            TBusMessageQueuePtr queue);

        static TBusServerSessionPtr Create(
            TBusProtocol* proto,
            IBusServerHandler* handler,
            const TBusServerSessionConfig& config,
            TBusMessageQueuePtr queue,
            const TVector<TBindResult>& bindTo);

        // TODO: make parameter non-const
        virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;

        // TODO: make parameter non-const
        virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;

        template <typename U /* <: TBusMessage */>
        EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
            EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
            if (status == MESSAGE_OK) {
                Y_UNUSED(resp.Release());
            }
            return status;
        }

        EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
            return SendReplyAutoPtr(ident, resp);
        }

        /// Pause input from the network.
        /// It is valid to call this method in parallel.
        /// TODO: pull this method up to TBusSession.
        virtual void PauseInput(bool pause) = 0;
        virtual unsigned GetActualListenPort() = 0;
    };

    namespace NPrivate {
        template <typename TBusSessionSubclass>
        class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
        private:
            TIntrusivePtr<TBusSessionSubclass> Ptr;

        public:
            TBusOwnerSessionPtr(TBusSessionSubclass* session)
                : Ptr(session)
            {
                Y_ASSERT(!!Ptr);
            }

            ~TBusOwnerSessionPtr() {
                Ptr->Shutdown();
            }

            TBusSessionSubclass* Get() const {
                return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
            }
        };

    }

    template <typename TBusSessionSubclass>
    class TBusSessionPtr {
    private:
        TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
        TBusSessionSubclass* Ptr;

    public:
        TBusSessionPtr()
            : Ptr()
        {
        }
        TBusSessionPtr(TBusSessionSubclass* session)
            : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
            , Ptr(session)
        {
        }

        TBusSessionSubclass* Get() const {
            return Ptr;
        }
        operator TBusSessionSubclass*() {
            return Get();
        }
        TBusSessionSubclass& operator*() const {
            return *Get();
        }
        TBusSessionSubclass* operator->() const {
            return Get();
        }

        bool operator!() const {
            return !Ptr;
        }

        void Swap(TBusSessionPtr& t) noexcept {
            DoSwap(SmartPtr, t.SmartPtr);
            DoSwap(Ptr, t.Ptr);
        }

        void Drop() {
            TBusSessionPtr().Swap(*this);
        }
    };

}