diff options
| author | Anton Samokhvalov <[email protected]> | 2022-02-10 16:45:17 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:45:17 +0300 | 
| commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
| tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/session.h | |
| parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
Restoring authorship annotation for Anton Samokhvalov <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/session.h')
| -rw-r--r-- | library/cpp/messagebus/session.h | 354 | 
1 files changed, 177 insertions, 177 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h index 857f58d7e59..fb12ab7c229 100644 --- a/library/cpp/messagebus/session.h +++ b/library/cpp/messagebus/session.h @@ -15,211 +15,211 @@  #include <util/generic/ptr.h>  namespace NBus { -    template <typename TBusSessionSubclass>  -    class TBusSessionPtr;  -    using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;  -    using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;  - -    ///////////////////////////////////////////////////////////////////  -    /// \brief Interface of session object.  - -    /// Each client and server  -    /// should instantiate session object to be able to communicate via bus  -    /// client: sess = queue->CreateSource(protocol, handler);  -    /// server: sess = queue->CreateDestination(protocol, handler);  - -    class TBusSession: public TWeakRefCounted<TBusSession> {  -    public:  -        size_t GetInFlight(const TNetAddr& addr) const;  -        size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;  - -        virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;  -        virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;  - -        virtual int GetInFlight() const noexcept = 0;  -        /// monitoring status of current session and it's connections  -        virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;  -        virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;  -        virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;  -        virtual TString GetStatusSingleLine() = 0;  -        /// return session config  -        virtual const TBusSessionConfig* GetConfig() const noexcept = 0;  -        /// return session protocol  -        virtual const TBusProtocol* GetProto() const noexcept = 0;  -        virtual TBusMessageQueue* GetQueue() const noexcept = 0;  - -        /// registers external session on host:port with locator service  -        int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);  - -    protected:  -        TBusSession();  - -    public:  -        virtual TString GetNameInternal() = 0;  - -        virtual void Shutdown() = 0;  - -        virtual ~TBusSession();  -    };  - -    struct TBusClientSession: public virtual TBusSession {  -        typedef ::NBus::NPrivate::TRemoteClientSession TImpl;  - -        static TBusClientSessionPtr Create(  -            TBusProtocol* proto,  -            IBusClientHandler* handler,  +    template <typename TBusSessionSubclass> +    class TBusSessionPtr; +    using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>; +    using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>; + +    /////////////////////////////////////////////////////////////////// +    /// \brief Interface of session object. + +    /// Each client and server +    /// should instantiate session object to be able to communicate via bus +    /// client: sess = queue->CreateSource(protocol, handler); +    /// server: sess = queue->CreateDestination(protocol, handler); + +    class TBusSession: public TWeakRefCounted<TBusSession> { +    public: +        size_t GetInFlight(const TNetAddr& addr) const; +        size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; + +        virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; +        virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; + +        virtual int GetInFlight() const noexcept = 0; +        /// monitoring status of current session and it's connections +        virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; +        virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0; +        virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0; +        virtual TString GetStatusSingleLine() = 0; +        /// return session config +        virtual const TBusSessionConfig* GetConfig() const noexcept = 0; +        /// return session protocol +        virtual const TBusProtocol* GetProto() const noexcept = 0; +        virtual TBusMessageQueue* GetQueue() const noexcept = 0; + +        /// registers external session on host:port with locator service +        int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); + +    protected: +        TBusSession(); + +    public: +        virtual TString GetNameInternal() = 0; + +        virtual void Shutdown() = 0; + +        virtual ~TBusSession(); +    }; + +    struct TBusClientSession: public virtual TBusSession { +        typedef ::NBus::NPrivate::TRemoteClientSession TImpl; + +        static TBusClientSessionPtr Create( +            TBusProtocol* proto, +            IBusClientHandler* handler,              const TBusClientSessionConfig& config,              TBusMessageQueuePtr queue); -        virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;  +        virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; -        /// if you want to open connection early  -        virtual void OpenConnection(const TNetAddr&) = 0;  +        /// if you want to open connection early +        virtual void OpenConnection(const TNetAddr&) = 0; -        /// Send message to the destination  -        /// If addr is set then use it as destination.  -        /// Takes ownership of addr (see ClearState method).  -        virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;  +        /// Send message to the destination +        /// If addr is set then use it as destination. +        /// Takes ownership of addr (see ClearState method). +        virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; -        virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;  +        virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; -        /// Like SendMessage but cares about message  -        template <typename T /* <: TBusMessage */>  -        EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {  -            EMessageStatus status = SendMessage(mes.Get(), addr, wait);  -            if (status == MESSAGE_OK)  +        /// Like SendMessage but cares about message +        template <typename T /* <: TBusMessage */> +        EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { +            EMessageStatus status = SendMessage(mes.Get(), addr, wait); +            if (status == MESSAGE_OK)                  Y_UNUSED(mes.Release()); -            return status;  -        }  - -        /// Like SendMessageOneWay but cares about message  -        template <typename T /* <: TBusMessage */>  -        EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {  -            EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);  -            if (status == MESSAGE_OK)  +            return status; +        } + +        /// Like SendMessageOneWay but cares about message +        template <typename T /* <: TBusMessage */> +        EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { +            EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait); +            if (status == MESSAGE_OK)                  Y_UNUSED(mes.Release()); -            return status;  -        }  +            return status; +        } -        EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {  -            return SendMessageAutoPtr(message, addr, wait);  -        }  +        EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { +            return SendMessageAutoPtr(message, addr, wait); +        } -        EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {  -            return SendMessageOneWayAutoPtr(message, addr, wait);  -        }  +        EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { +            return SendMessageOneWayAutoPtr(message, addr, wait); +        } -        // TODO: implement similar one-way methods  -    };  +        // TODO: implement similar one-way methods +    }; -    struct TBusServerSession: public virtual TBusSession {  -        typedef ::NBus::NPrivate::TRemoteServerSession TImpl;  +    struct TBusServerSession: public virtual TBusSession { +        typedef ::NBus::NPrivate::TRemoteServerSession TImpl; -        static TBusServerSessionPtr Create(  -            TBusProtocol* proto,  -            IBusServerHandler* handler,  +        static TBusServerSessionPtr Create( +            TBusProtocol* proto, +            IBusServerHandler* handler,              const TBusServerSessionConfig& config,              TBusMessageQueuePtr queue); -        static TBusServerSessionPtr Create(  -            TBusProtocol* proto,  -            IBusServerHandler* handler,  +        static TBusServerSessionPtr Create( +            TBusProtocol* proto, +            IBusServerHandler* handler,              const TBusServerSessionConfig& config,              TBusMessageQueuePtr queue,              const TVector<TBindResult>& bindTo); -        // TODO: make parameter non-const  -        virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;  +        // TODO: make parameter non-const +        virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; -        // TODO: make parameter non-const  -        virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;  +        // TODO: make parameter non-const +        virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; -        template <typename U /* <: TBusMessage */>  -        EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {  -            EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());  -            if (status == MESSAGE_OK) {  +        template <typename U /* <: TBusMessage */> +        EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) { +            EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get()); +            if (status == MESSAGE_OK) {                  Y_UNUSED(resp.Release()); -            }  -            return status;  +            } +            return status;          } -        EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {  -            return SendReplyAutoPtr(ident, resp);  -        }  - -        /// Pause input from the network.  -        /// It is valid to call this method in parallel.  -        /// TODO: pull this method up to TBusSession.  -        virtual void PauseInput(bool pause) = 0;  -        virtual unsigned GetActualListenPort() = 0;  -    };  - -    namespace NPrivate {  -        template <typename TBusSessionSubclass>  -        class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {  -        private:  -            TIntrusivePtr<TBusSessionSubclass> Ptr;  - -        public:  -            TBusOwnerSessionPtr(TBusSessionSubclass* session)  -                : Ptr(session)  -            {  -                Y_ASSERT(!!Ptr);  -            }  - -            ~TBusOwnerSessionPtr() {  -                Ptr->Shutdown();  -            }  - -            TBusSessionSubclass* Get() const {  -                return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());  -            }  -        };  +        EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { +            return SendReplyAutoPtr(ident, resp); +        } + +        /// Pause input from the network. +        /// It is valid to call this method in parallel. +        /// TODO: pull this method up to TBusSession. +        virtual void PauseInput(bool pause) = 0; +        virtual unsigned GetActualListenPort() = 0; +    }; + +    namespace NPrivate { +        template <typename TBusSessionSubclass> +        class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> { +        private: +            TIntrusivePtr<TBusSessionSubclass> Ptr; + +        public: +            TBusOwnerSessionPtr(TBusSessionSubclass* session) +                : Ptr(session) +            { +                Y_ASSERT(!!Ptr); +            } + +            ~TBusOwnerSessionPtr() { +                Ptr->Shutdown(); +            } + +            TBusSessionSubclass* Get() const { +                return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get()); +            } +        };      } -    template <typename TBusSessionSubclass>  -    class TBusSessionPtr {  -    private:  -        TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;  -        TBusSessionSubclass* Ptr;  - -    public:  -        TBusSessionPtr()  -            : Ptr()  -        {  -        }  -        TBusSessionPtr(TBusSessionSubclass* session)  -            : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)  -            , Ptr(session)  -        {  -        }  - -        TBusSessionSubclass* Get() const {  -            return Ptr;  -        }  -        operator TBusSessionSubclass*() {  -            return Get();  -        }  -        TBusSessionSubclass& operator*() const {  -            return *Get();  -        }  -        TBusSessionSubclass* operator->() const {  -            return Get();  -        }  - -        bool operator!() const {  -            return !Ptr;  -        }  - -        void Swap(TBusSessionPtr& t) noexcept {  -            DoSwap(SmartPtr, t.SmartPtr);  -            DoSwap(Ptr, t.Ptr);  -        }  - -        void Drop() {  -            TBusSessionPtr().Swap(*this);  -        }  -    };  +    template <typename TBusSessionSubclass> +    class TBusSessionPtr { +    private: +        TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr; +        TBusSessionSubclass* Ptr; + +    public: +        TBusSessionPtr() +            : Ptr() +        { +        } +        TBusSessionPtr(TBusSessionSubclass* session) +            : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr) +            , Ptr(session) +        { +        } + +        TBusSessionSubclass* Get() const { +            return Ptr; +        } +        operator TBusSessionSubclass*() { +            return Get(); +        } +        TBusSessionSubclass& operator*() const { +            return *Get(); +        } +        TBusSessionSubclass* operator->() const { +            return Get(); +        } + +        bool operator!() const { +            return !Ptr; +        } + +        void Swap(TBusSessionPtr& t) noexcept { +            DoSwap(SmartPtr, t.SmartPtr); +            DoSwap(Ptr, t.Ptr); +        } + +        void Drop() { +            TBusSessionPtr().Swap(*this); +        } +    };  }  | 
