diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/session.h | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session.h')
-rw-r--r-- | library/cpp/messagebus/session.h | 112 |
1 files changed, 56 insertions, 56 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h index fb12ab7c22..5a1a01d808 100644 --- a/library/cpp/messagebus/session.h +++ b/library/cpp/messagebus/session.h @@ -1,41 +1,41 @@ -#pragma once - +#pragma once + #include "connection.h" -#include "defs.h" +#include "defs.h" #include "handler.h" -#include "message.h" -#include "netaddr.h" +#include "message.h" +#include "netaddr.h" #include "network.h" -#include "session_config.h" +#include "session_config.h" #include "misc/weak_ptr.h" - + #include <library/cpp/messagebus/monitoring/mon_proto.pb.h> - + #include <util/generic/array_ref.h> #include <util/generic/ptr.h> -namespace NBus { +namespace NBus { template <typename TBusSessionSubclass> class TBusSessionPtr; using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>; using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>; - + /////////////////////////////////////////////////////////////////// /// \brief Interface of session object. - + /// Each client and server /// should instantiate session object to be able to communicate via bus /// client: sess = queue->CreateSource(protocol, handler); /// server: sess = queue->CreateDestination(protocol, handler); - + class TBusSession: public TWeakRefCounted<TBusSession> { public: size_t GetInFlight(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; - + virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0; - + virtual int GetInFlight() const noexcept = 0; /// monitoring status of current session and it's connections virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; @@ -47,42 +47,42 @@ namespace NBus { /// return session protocol virtual const TBusProtocol* GetProto() const noexcept = 0; virtual TBusMessageQueue* GetQueue() const noexcept = 0; - + /// registers external session on host:port with locator service int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); - + protected: TBusSession(); - + public: virtual TString GetNameInternal() = 0; - + virtual void Shutdown() = 0; - + virtual ~TBusSession(); }; - + struct TBusClientSession: public virtual TBusSession { typedef ::NBus::NPrivate::TRemoteClientSession TImpl; - + static TBusClientSessionPtr Create( TBusProtocol* proto, IBusClientHandler* handler, - const TBusClientSessionConfig& config, - TBusMessageQueuePtr queue); - + const TBusClientSessionConfig& config, + TBusMessageQueuePtr queue); + virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; - + /// if you want to open connection early virtual void OpenConnection(const TNetAddr&) = 0; - + /// Send message to the destination /// If addr is set then use it as destination. /// Takes ownership of addr (see ClearState method). virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; - + virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; - + /// Like SendMessage but cares about message template <typename T /* <: TBusMessage */> EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { @@ -91,7 +91,7 @@ namespace NBus { Y_UNUSED(mes.Release()); return status; } - + /// Like SendMessageOneWay but cares about message template <typename T /* <: TBusMessage */> EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) { @@ -100,27 +100,27 @@ namespace NBus { Y_UNUSED(mes.Release()); return status; } - + EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { return SendMessageAutoPtr(message, addr, wait); } - + EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { return SendMessageOneWayAutoPtr(message, addr, wait); } - + // TODO: implement similar one-way methods }; - + struct TBusServerSession: public virtual TBusSession { typedef ::NBus::NPrivate::TRemoteServerSession TImpl; - + static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, - const TBusServerSessionConfig& config, - TBusMessageQueuePtr queue); - + const TBusServerSessionConfig& config, + TBusMessageQueuePtr queue); + static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, @@ -130,10 +130,10 @@ namespace NBus { // TODO: make parameter non-const virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; - + // TODO: make parameter non-const virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; - + template <typename U /* <: TBusMessage */> EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) { EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get()); @@ -141,49 +141,49 @@ namespace NBus { Y_UNUSED(resp.Release()); } return status; - } - + } + EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { return SendReplyAutoPtr(ident, resp); } - + /// Pause input from the network. /// It is valid to call this method in parallel. /// TODO: pull this method up to TBusSession. virtual void PauseInput(bool pause) = 0; virtual unsigned GetActualListenPort() = 0; }; - + namespace NPrivate { template <typename TBusSessionSubclass> class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> { private: TIntrusivePtr<TBusSessionSubclass> Ptr; - + public: TBusOwnerSessionPtr(TBusSessionSubclass* session) : Ptr(session) { Y_ASSERT(!!Ptr); } - + ~TBusOwnerSessionPtr() { Ptr->Shutdown(); } - + TBusSessionSubclass* Get() const { return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get()); } }; - - } - + + } + template <typename TBusSessionSubclass> class TBusSessionPtr { private: TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr; TBusSessionSubclass* Ptr; - + public: TBusSessionPtr() : Ptr() @@ -194,7 +194,7 @@ namespace NBus { , Ptr(session) { } - + TBusSessionSubclass* Get() const { return Ptr; } @@ -207,19 +207,19 @@ namespace NBus { TBusSessionSubclass* operator->() const { return Get(); } - + bool operator!() const { return !Ptr; } - + void Swap(TBusSessionPtr& t) noexcept { DoSwap(SmartPtr, t.SmartPtr); DoSwap(Ptr, t.Ptr); } - + void Drop() { TBusSessionPtr().Swap(*this); } }; - -} + +} |