#pragma once #include "connection.h" #include "defs.h" #include "handler.h" #include "message.h" #include "netaddr.h" #include "network.h" #include "session_config.h" #include "misc/weak_ptr.h" #include <library/cpp/messagebus/monitoring/mon_proto.pb.h> #include <util/generic/array_ref.h> #include <util/generic/ptr.h> namespace NBus { template <typename TBusSessionSubclass> class TBusSessionPtr; using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>; using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>; /////////////////////////////////////////////////////////////////// /// \brief Interface of session object. /// Each client and server /// should instantiate session object to be able to communicate via bus /// client: sess = queue->CreateSource(protocol, handler); /// server: sess = queue->CreateDestination(protocol, handler); class TBusSession: public TWeakRefCounted<TBusSession> { public: size_t GetInFlight(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; virtual int GetInFlight() const noexcept = 0; /// monitoring status of current session and it's connections virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0; virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0; virtual TString GetStatusSingleLine() = 0; /// return session config virtual const TBusSessionConfig* GetConfig() const noexcept = 0; /// return session protocol virtual const TBusProtocol* GetProto() const noexcept = 0; virtual TBusMessageQueue* GetQueue() const noexcept = 0; /// registers external session on host:port with locator service int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); protected: TBusSession(); public: virtual TString GetNameInternal() = 0; virtual void Shutdown() = 0; virtual ~TBusSession(); }; struct TBusClientSession: public virtual TBusSession { typedef ::NBus::NPrivate::TRemoteClientSession TImpl; static TBusClientSessionPtr Create( TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue); virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; /// if you want to open connection early virtual void OpenConnection(const TNetAddr&) = 0; /// Send message to the destination /// If addr is set then use it as destination. /// Takes ownership of addr (see ClearState method). virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; /// Like SendMessage but cares about message template <typename T /* <: TBusMessage */> EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { EMessageStatus status = SendMessage(mes.Get(), addr, wait); if (status == MESSAGE_OK) Y_UNUSED(mes.Release()); return status; } /// Like SendMessageOneWay but cares about message template <typename T /* <: TBusMessage */> EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait); if (status == MESSAGE_OK) Y_UNUSED(mes.Release()); return status; } EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { return SendMessageAutoPtr(message, addr, wait); } EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { return SendMessageOneWayAutoPtr(message, addr, wait); } // TODO: implement similar one-way methods }; struct TBusServerSession: public virtual TBusSession { typedef ::NBus::NPrivate::TRemoteServerSession TImpl; static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue); static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo); // TODO: make parameter non-const virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; // TODO: make parameter non-const virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; template <typename U /* <: TBusMessage */> EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) { EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get()); if (status == MESSAGE_OK) { Y_UNUSED(resp.Release()); } return status; } EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { return SendReplyAutoPtr(ident, resp); } /// Pause input from the network. /// It is valid to call this method in parallel. /// TODO: pull this method up to TBusSession. virtual void PauseInput(bool pause) = 0; virtual unsigned GetActualListenPort() = 0; }; namespace NPrivate { template <typename TBusSessionSubclass> class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> { private: TIntrusivePtr<TBusSessionSubclass> Ptr; public: TBusOwnerSessionPtr(TBusSessionSubclass* session) : Ptr(session) { Y_ASSERT(!!Ptr); } ~TBusOwnerSessionPtr() { Ptr->Shutdown(); } TBusSessionSubclass* Get() const { return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get()); } }; } template <typename TBusSessionSubclass> class TBusSessionPtr { private: TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr; TBusSessionSubclass* Ptr; public: TBusSessionPtr() : Ptr() { } TBusSessionPtr(TBusSessionSubclass* session) : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr) , Ptr(session) { } TBusSessionSubclass* Get() const { return Ptr; } operator TBusSessionSubclass*() { return Get(); } TBusSessionSubclass& operator*() const { return *Get(); } TBusSessionSubclass* operator->() const { return Get(); } bool operator!() const { return !Ptr; } void Swap(TBusSessionPtr& t) noexcept { DoSwap(SmartPtr, t.SmartPtr); DoSwap(Ptr, t.Ptr); } void Drop() { TBusSessionPtr().Swap(*this); } }; }