diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/session.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session.h')
-rw-r--r-- | library/cpp/messagebus/session.h | 354 |
1 files changed, 177 insertions, 177 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h index fb12ab7c22..857f58d7e5 100644 --- a/library/cpp/messagebus/session.h +++ b/library/cpp/messagebus/session.h @@ -15,211 +15,211 @@ #include <util/generic/ptr.h> namespace NBus { - template <typename TBusSessionSubclass> - class TBusSessionPtr; - using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>; - using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>; - - /////////////////////////////////////////////////////////////////// - /// \brief Interface of session object. - - /// Each client and server - /// should instantiate session object to be able to communicate via bus - /// client: sess = queue->CreateSource(protocol, handler); - /// server: sess = queue->CreateDestination(protocol, handler); - - class TBusSession: public TWeakRefCounted<TBusSession> { - public: - size_t GetInFlight(const TNetAddr& addr) const; - size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; - - virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; - virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; - - virtual int GetInFlight() const noexcept = 0; - /// monitoring status of current session and it's connections - virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; - virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0; - virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0; - virtual TString GetStatusSingleLine() = 0; - /// return session config - virtual const TBusSessionConfig* GetConfig() const noexcept = 0; - /// return session protocol - virtual const TBusProtocol* GetProto() const noexcept = 0; - virtual TBusMessageQueue* GetQueue() const noexcept = 0; - - /// registers external session on host:port with locator service - int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); - - protected: - TBusSession(); - - public: - virtual TString GetNameInternal() = 0; - - virtual void Shutdown() = 0; - - virtual ~TBusSession(); - }; - - struct TBusClientSession: public virtual TBusSession { - typedef ::NBus::NPrivate::TRemoteClientSession TImpl; - - static TBusClientSessionPtr Create( - TBusProtocol* proto, - IBusClientHandler* handler, + template <typename TBusSessionSubclass> + class TBusSessionPtr; + using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>; + using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>; + + /////////////////////////////////////////////////////////////////// + /// \brief Interface of session object. + + /// Each client and server + /// should instantiate session object to be able to communicate via bus + /// client: sess = queue->CreateSource(protocol, handler); + /// server: sess = queue->CreateDestination(protocol, handler); + + class TBusSession: public TWeakRefCounted<TBusSession> { + public: + size_t GetInFlight(const TNetAddr& addr) const; + size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; + + virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; + virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; + + virtual int GetInFlight() const noexcept = 0; + /// monitoring status of current session and it's connections + virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; + virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0; + virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0; + virtual TString GetStatusSingleLine() = 0; + /// return session config + virtual const TBusSessionConfig* GetConfig() const noexcept = 0; + /// return session protocol + virtual const TBusProtocol* GetProto() const noexcept = 0; + virtual TBusMessageQueue* GetQueue() const noexcept = 0; + + /// registers external session on host:port with locator service + int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); + + protected: + TBusSession(); + + public: + virtual TString GetNameInternal() = 0; + + virtual void Shutdown() = 0; + + virtual ~TBusSession(); + }; + + struct TBusClientSession: public virtual TBusSession { + typedef ::NBus::NPrivate::TRemoteClientSession TImpl; + + static TBusClientSessionPtr Create( + TBusProtocol* proto, + IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue); - virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; + virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; - /// if you want to open connection early - virtual void OpenConnection(const TNetAddr&) = 0; + /// if you want to open connection early + virtual void OpenConnection(const TNetAddr&) = 0; - /// Send message to the destination - /// If addr is set then use it as destination. - /// Takes ownership of addr (see ClearState method). - virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; + /// Send message to the destination + /// If addr is set then use it as destination. + /// Takes ownership of addr (see ClearState method). + virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; - virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; + virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; - /// Like SendMessage but cares about message - template <typename T /* <: TBusMessage */> - EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { - EMessageStatus status = SendMessage(mes.Get(), addr, wait); - if (status == MESSAGE_OK) + /// Like SendMessage but cares about message + template <typename T /* <: TBusMessage */> + EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { + EMessageStatus status = SendMessage(mes.Get(), addr, wait); + if (status == MESSAGE_OK) Y_UNUSED(mes.Release()); - return status; - } - - /// Like SendMessageOneWay but cares about message - template <typename T /* <: TBusMessage */> - EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { - EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait); - if (status == MESSAGE_OK) + return status; + } + + /// Like SendMessageOneWay but cares about message + template <typename T /* <: TBusMessage */> + EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { + EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait); + if (status == MESSAGE_OK) Y_UNUSED(mes.Release()); - return status; - } + return status; + } - EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { - return SendMessageAutoPtr(message, addr, wait); - } + EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { + return SendMessageAutoPtr(message, addr, wait); + } - EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { - return SendMessageOneWayAutoPtr(message, addr, wait); - } + EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { + return SendMessageOneWayAutoPtr(message, addr, wait); + } - // TODO: implement similar one-way methods - }; + // TODO: implement similar one-way methods + }; - struct TBusServerSession: public virtual TBusSession { - typedef ::NBus::NPrivate::TRemoteServerSession TImpl; + struct TBusServerSession: public virtual TBusSession { + typedef ::NBus::NPrivate::TRemoteServerSession TImpl; - static TBusServerSessionPtr Create( - TBusProtocol* proto, - IBusServerHandler* handler, + static TBusServerSessionPtr Create( + TBusProtocol* proto, + IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue); - static TBusServerSessionPtr Create( - TBusProtocol* proto, - IBusServerHandler* handler, + static TBusServerSessionPtr Create( + TBusProtocol* proto, + IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo); - // TODO: make parameter non-const - virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; + // TODO: make parameter non-const + virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; - // TODO: make parameter non-const - virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; + // TODO: make parameter non-const + virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; - template <typename U /* <: TBusMessage */> - EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) { - EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get()); - if (status == MESSAGE_OK) { + template <typename U /* <: TBusMessage */> + EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) { + EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get()); + if (status == MESSAGE_OK) { Y_UNUSED(resp.Release()); - } - return status; + } + return status; } - EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { - return SendReplyAutoPtr(ident, resp); - } - - /// Pause input from the network. - /// It is valid to call this method in parallel. - /// TODO: pull this method up to TBusSession. - virtual void PauseInput(bool pause) = 0; - virtual unsigned GetActualListenPort() = 0; - }; - - namespace NPrivate { - template <typename TBusSessionSubclass> - class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> { - private: - TIntrusivePtr<TBusSessionSubclass> Ptr; - - public: - TBusOwnerSessionPtr(TBusSessionSubclass* session) - : Ptr(session) - { - Y_ASSERT(!!Ptr); - } - - ~TBusOwnerSessionPtr() { - Ptr->Shutdown(); - } - - TBusSessionSubclass* Get() const { - return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get()); - } - }; + EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { + return SendReplyAutoPtr(ident, resp); + } + + /// Pause input from the network. + /// It is valid to call this method in parallel. + /// TODO: pull this method up to TBusSession. + virtual void PauseInput(bool pause) = 0; + virtual unsigned GetActualListenPort() = 0; + }; + + namespace NPrivate { + template <typename TBusSessionSubclass> + class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> { + private: + TIntrusivePtr<TBusSessionSubclass> Ptr; + + public: + TBusOwnerSessionPtr(TBusSessionSubclass* session) + : Ptr(session) + { + Y_ASSERT(!!Ptr); + } + + ~TBusOwnerSessionPtr() { + Ptr->Shutdown(); + } + + TBusSessionSubclass* Get() const { + return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get()); + } + }; } - template <typename TBusSessionSubclass> - class TBusSessionPtr { - private: - TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr; - TBusSessionSubclass* Ptr; - - public: - TBusSessionPtr() - : Ptr() - { - } - TBusSessionPtr(TBusSessionSubclass* session) - : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr) - , Ptr(session) - { - } - - TBusSessionSubclass* Get() const { - return Ptr; - } - operator TBusSessionSubclass*() { - return Get(); - } - TBusSessionSubclass& operator*() const { - return *Get(); - } - TBusSessionSubclass* operator->() const { - return Get(); - } - - bool operator!() const { - return !Ptr; - } - - void Swap(TBusSessionPtr& t) noexcept { - DoSwap(SmartPtr, t.SmartPtr); - DoSwap(Ptr, t.Ptr); - } - - void Drop() { - TBusSessionPtr().Swap(*this); - } - }; + template <typename TBusSessionSubclass> + class TBusSessionPtr { + private: + TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr; + TBusSessionSubclass* Ptr; + + public: + TBusSessionPtr() + : Ptr() + { + } + TBusSessionPtr(TBusSessionSubclass* session) + : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr) + , Ptr(session) + { + } + + TBusSessionSubclass* Get() const { + return Ptr; + } + operator TBusSessionSubclass*() { + return Get(); + } + TBusSessionSubclass& operator*() const { + return *Get(); + } + TBusSessionSubclass* operator->() const { + return Get(); + } + + bool operator!() const { + return !Ptr; + } + + void Swap(TBusSessionPtr& t) noexcept { + DoSwap(SmartPtr, t.SmartPtr); + DoSwap(Ptr, t.Ptr); + } + + void Drop() { + TBusSessionPtr().Swap(*this); + } + }; } |