diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/session.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/session.h')
-rw-r--r-- | library/cpp/messagebus/session.h | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h new file mode 100644 index 0000000000..fb12ab7c22 --- /dev/null +++ b/library/cpp/messagebus/session.h @@ -0,0 +1,225 @@ +#pragma once + +#include "connection.h" +#include "defs.h" +#include "handler.h" +#include "message.h" +#include "netaddr.h" +#include "network.h" +#include "session_config.h" +#include "misc/weak_ptr.h" + +#include <library/cpp/messagebus/monitoring/mon_proto.pb.h> + +#include <util/generic/array_ref.h> +#include <util/generic/ptr.h> + +namespace NBus { + template <typename TBusSessionSubclass> + class TBusSessionPtr; + using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>; + using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>; + + /////////////////////////////////////////////////////////////////// + /// \brief Interface of session object. + + /// Each client and server + /// should instantiate session object to be able to communicate via bus + /// client: sess = queue->CreateSource(protocol, handler); + /// server: sess = queue->CreateDestination(protocol, handler); + + class TBusSession: public TWeakRefCounted<TBusSession> { + public: + size_t GetInFlight(const TNetAddr& addr) const; + size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; + + virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; + virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; + + virtual int GetInFlight() const noexcept = 0; + /// monitoring status of current session and it's connections + virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; + virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0; + virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0; + virtual TString GetStatusSingleLine() = 0; + /// return session config + virtual const TBusSessionConfig* GetConfig() const noexcept = 0; + /// return session protocol + virtual const TBusProtocol* GetProto() const noexcept = 0; + virtual TBusMessageQueue* GetQueue() const noexcept = 0; + + /// registers external session on host:port with locator service + int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); + + protected: + TBusSession(); + + public: + virtual TString GetNameInternal() = 0; + + virtual void Shutdown() = 0; + + virtual ~TBusSession(); + }; + + struct TBusClientSession: public virtual TBusSession { + typedef ::NBus::NPrivate::TRemoteClientSession TImpl; + + static TBusClientSessionPtr Create( + TBusProtocol* proto, + IBusClientHandler* handler, + const TBusClientSessionConfig& config, + TBusMessageQueuePtr queue); + + virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; + + /// if you want to open connection early + virtual void OpenConnection(const TNetAddr&) = 0; + + /// Send message to the destination + /// If addr is set then use it as destination. + /// Takes ownership of addr (see ClearState method). + virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; + + virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; + + /// Like SendMessage but cares about message + template <typename T /* <: TBusMessage */> + EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { + EMessageStatus status = SendMessage(mes.Get(), addr, wait); + if (status == MESSAGE_OK) + Y_UNUSED(mes.Release()); + return status; + } + + /// Like SendMessageOneWay but cares about message + template <typename T /* <: TBusMessage */> + EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { + EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait); + if (status == MESSAGE_OK) + Y_UNUSED(mes.Release()); + return status; + } + + EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { + return SendMessageAutoPtr(message, addr, wait); + } + + EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { + return SendMessageOneWayAutoPtr(message, addr, wait); + } + + // TODO: implement similar one-way methods + }; + + struct TBusServerSession: public virtual TBusSession { + typedef ::NBus::NPrivate::TRemoteServerSession TImpl; + + static TBusServerSessionPtr Create( + TBusProtocol* proto, + IBusServerHandler* handler, + const TBusServerSessionConfig& config, + TBusMessageQueuePtr queue); + + static TBusServerSessionPtr Create( + TBusProtocol* proto, + IBusServerHandler* handler, + const TBusServerSessionConfig& config, + TBusMessageQueuePtr queue, + const TVector<TBindResult>& bindTo); + + // TODO: make parameter non-const + virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; + + // TODO: make parameter non-const + virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; + + template <typename U /* <: TBusMessage */> + EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) { + EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get()); + if (status == MESSAGE_OK) { + Y_UNUSED(resp.Release()); + } + return status; + } + + EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { + return SendReplyAutoPtr(ident, resp); + } + + /// Pause input from the network. + /// It is valid to call this method in parallel. + /// TODO: pull this method up to TBusSession. + virtual void PauseInput(bool pause) = 0; + virtual unsigned GetActualListenPort() = 0; + }; + + namespace NPrivate { + template <typename TBusSessionSubclass> + class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> { + private: + TIntrusivePtr<TBusSessionSubclass> Ptr; + + public: + TBusOwnerSessionPtr(TBusSessionSubclass* session) + : Ptr(session) + { + Y_ASSERT(!!Ptr); + } + + ~TBusOwnerSessionPtr() { + Ptr->Shutdown(); + } + + TBusSessionSubclass* Get() const { + return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get()); + } + }; + + } + + template <typename TBusSessionSubclass> + class TBusSessionPtr { + private: + TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr; + TBusSessionSubclass* Ptr; + + public: + TBusSessionPtr() + : Ptr() + { + } + TBusSessionPtr(TBusSessionSubclass* session) + : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr) + , Ptr(session) + { + } + + TBusSessionSubclass* Get() const { + return Ptr; + } + operator TBusSessionSubclass*() { + return Get(); + } + TBusSessionSubclass& operator*() const { + return *Get(); + } + TBusSessionSubclass* operator->() const { + return Get(); + } + + bool operator!() const { + return !Ptr; + } + + void Swap(TBusSessionPtr& t) noexcept { + DoSwap(SmartPtr, t.SmartPtr); + DoSwap(Ptr, t.Ptr); + } + + void Drop() { + TBusSessionPtr().Swap(*this); + } + }; + +} |