aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/session.h
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session.h')
-rw-r--r--library/cpp/messagebus/session.h354
1 files changed, 177 insertions, 177 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h
index fb12ab7c22..857f58d7e5 100644
--- a/library/cpp/messagebus/session.h
+++ b/library/cpp/messagebus/session.h
@@ -15,211 +15,211 @@
#include <util/generic/ptr.h>
namespace NBus {
- template <typename TBusSessionSubclass>
- class TBusSessionPtr;
- using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
- using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
-
- ///////////////////////////////////////////////////////////////////
- /// \brief Interface of session object.
-
- /// Each client and server
- /// should instantiate session object to be able to communicate via bus
- /// client: sess = queue->CreateSource(protocol, handler);
- /// server: sess = queue->CreateDestination(protocol, handler);
-
- class TBusSession: public TWeakRefCounted<TBusSession> {
- public:
- size_t GetInFlight(const TNetAddr& addr) const;
- size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
-
- virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
- virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
-
- virtual int GetInFlight() const noexcept = 0;
- /// monitoring status of current session and it's connections
- virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
- virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
- virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
- virtual TString GetStatusSingleLine() = 0;
- /// return session config
- virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
- /// return session protocol
- virtual const TBusProtocol* GetProto() const noexcept = 0;
- virtual TBusMessageQueue* GetQueue() const noexcept = 0;
-
- /// registers external session on host:port with locator service
- int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
-
- protected:
- TBusSession();
-
- public:
- virtual TString GetNameInternal() = 0;
-
- virtual void Shutdown() = 0;
-
- virtual ~TBusSession();
- };
-
- struct TBusClientSession: public virtual TBusSession {
- typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
-
- static TBusClientSessionPtr Create(
- TBusProtocol* proto,
- IBusClientHandler* handler,
+ template <typename TBusSessionSubclass>
+ class TBusSessionPtr;
+ using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
+ using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
+
+ ///////////////////////////////////////////////////////////////////
+ /// \brief Interface of session object.
+
+ /// Each client and server
+ /// should instantiate session object to be able to communicate via bus
+ /// client: sess = queue->CreateSource(protocol, handler);
+ /// server: sess = queue->CreateDestination(protocol, handler);
+
+ class TBusSession: public TWeakRefCounted<TBusSession> {
+ public:
+ size_t GetInFlight(const TNetAddr& addr) const;
+ size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
+
+ virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
+ virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
+
+ virtual int GetInFlight() const noexcept = 0;
+ /// monitoring status of current session and it's connections
+ virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
+ virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
+ virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
+ virtual TString GetStatusSingleLine() = 0;
+ /// return session config
+ virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
+ /// return session protocol
+ virtual const TBusProtocol* GetProto() const noexcept = 0;
+ virtual TBusMessageQueue* GetQueue() const noexcept = 0;
+
+ /// registers external session on host:port with locator service
+ int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
+
+ protected:
+ TBusSession();
+
+ public:
+ virtual TString GetNameInternal() = 0;
+
+ virtual void Shutdown() = 0;
+
+ virtual ~TBusSession();
+ };
+
+ struct TBusClientSession: public virtual TBusSession {
+ typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
+
+ static TBusClientSessionPtr Create(
+ TBusProtocol* proto,
+ IBusClientHandler* handler,
const TBusClientSessionConfig& config,
TBusMessageQueuePtr queue);
- virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
+ virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
- /// if you want to open connection early
- virtual void OpenConnection(const TNetAddr&) = 0;
+ /// if you want to open connection early
+ virtual void OpenConnection(const TNetAddr&) = 0;
- /// Send message to the destination
- /// If addr is set then use it as destination.
- /// Takes ownership of addr (see ClearState method).
- virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
+ /// Send message to the destination
+ /// If addr is set then use it as destination.
+ /// Takes ownership of addr (see ClearState method).
+ virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
- virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
+ virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
- /// Like SendMessage but cares about message
- template <typename T /* <: TBusMessage */>
- EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
- EMessageStatus status = SendMessage(mes.Get(), addr, wait);
- if (status == MESSAGE_OK)
+ /// Like SendMessage but cares about message
+ template <typename T /* <: TBusMessage */>
+ EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
+ EMessageStatus status = SendMessage(mes.Get(), addr, wait);
+ if (status == MESSAGE_OK)
Y_UNUSED(mes.Release());
- return status;
- }
-
- /// Like SendMessageOneWay but cares about message
- template <typename T /* <: TBusMessage */>
- EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
- EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
- if (status == MESSAGE_OK)
+ return status;
+ }
+
+ /// Like SendMessageOneWay but cares about message
+ template <typename T /* <: TBusMessage */>
+ EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
+ EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
+ if (status == MESSAGE_OK)
Y_UNUSED(mes.Release());
- return status;
- }
+ return status;
+ }
- EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
- return SendMessageAutoPtr(message, addr, wait);
- }
+ EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
+ return SendMessageAutoPtr(message, addr, wait);
+ }
- EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
- return SendMessageOneWayAutoPtr(message, addr, wait);
- }
+ EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
+ return SendMessageOneWayAutoPtr(message, addr, wait);
+ }
- // TODO: implement similar one-way methods
- };
+ // TODO: implement similar one-way methods
+ };
- struct TBusServerSession: public virtual TBusSession {
- typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
+ struct TBusServerSession: public virtual TBusSession {
+ typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
- static TBusServerSessionPtr Create(
- TBusProtocol* proto,
- IBusServerHandler* handler,
+ static TBusServerSessionPtr Create(
+ TBusProtocol* proto,
+ IBusServerHandler* handler,
const TBusServerSessionConfig& config,
TBusMessageQueuePtr queue);
- static TBusServerSessionPtr Create(
- TBusProtocol* proto,
- IBusServerHandler* handler,
+ static TBusServerSessionPtr Create(
+ TBusProtocol* proto,
+ IBusServerHandler* handler,
const TBusServerSessionConfig& config,
TBusMessageQueuePtr queue,
const TVector<TBindResult>& bindTo);
- // TODO: make parameter non-const
- virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
+ // TODO: make parameter non-const
+ virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
- // TODO: make parameter non-const
- virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
+ // TODO: make parameter non-const
+ virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
- template <typename U /* <: TBusMessage */>
- EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
- EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
- if (status == MESSAGE_OK) {
+ template <typename U /* <: TBusMessage */>
+ EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
+ EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
+ if (status == MESSAGE_OK) {
Y_UNUSED(resp.Release());
- }
- return status;
+ }
+ return status;
}
- EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
- return SendReplyAutoPtr(ident, resp);
- }
-
- /// Pause input from the network.
- /// It is valid to call this method in parallel.
- /// TODO: pull this method up to TBusSession.
- virtual void PauseInput(bool pause) = 0;
- virtual unsigned GetActualListenPort() = 0;
- };
-
- namespace NPrivate {
- template <typename TBusSessionSubclass>
- class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
- private:
- TIntrusivePtr<TBusSessionSubclass> Ptr;
-
- public:
- TBusOwnerSessionPtr(TBusSessionSubclass* session)
- : Ptr(session)
- {
- Y_ASSERT(!!Ptr);
- }
-
- ~TBusOwnerSessionPtr() {
- Ptr->Shutdown();
- }
-
- TBusSessionSubclass* Get() const {
- return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
- }
- };
+ EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
+ return SendReplyAutoPtr(ident, resp);
+ }
+
+ /// Pause input from the network.
+ /// It is valid to call this method in parallel.
+ /// TODO: pull this method up to TBusSession.
+ virtual void PauseInput(bool pause) = 0;
+ virtual unsigned GetActualListenPort() = 0;
+ };
+
+ namespace NPrivate {
+ template <typename TBusSessionSubclass>
+ class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
+ private:
+ TIntrusivePtr<TBusSessionSubclass> Ptr;
+
+ public:
+ TBusOwnerSessionPtr(TBusSessionSubclass* session)
+ : Ptr(session)
+ {
+ Y_ASSERT(!!Ptr);
+ }
+
+ ~TBusOwnerSessionPtr() {
+ Ptr->Shutdown();
+ }
+
+ TBusSessionSubclass* Get() const {
+ return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
+ }
+ };
}
- template <typename TBusSessionSubclass>
- class TBusSessionPtr {
- private:
- TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
- TBusSessionSubclass* Ptr;
-
- public:
- TBusSessionPtr()
- : Ptr()
- {
- }
- TBusSessionPtr(TBusSessionSubclass* session)
- : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
- , Ptr(session)
- {
- }
-
- TBusSessionSubclass* Get() const {
- return Ptr;
- }
- operator TBusSessionSubclass*() {
- return Get();
- }
- TBusSessionSubclass& operator*() const {
- return *Get();
- }
- TBusSessionSubclass* operator->() const {
- return Get();
- }
-
- bool operator!() const {
- return !Ptr;
- }
-
- void Swap(TBusSessionPtr& t) noexcept {
- DoSwap(SmartPtr, t.SmartPtr);
- DoSwap(Ptr, t.Ptr);
- }
-
- void Drop() {
- TBusSessionPtr().Swap(*this);
- }
- };
+ template <typename TBusSessionSubclass>
+ class TBusSessionPtr {
+ private:
+ TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
+ TBusSessionSubclass* Ptr;
+
+ public:
+ TBusSessionPtr()
+ : Ptr()
+ {
+ }
+ TBusSessionPtr(TBusSessionSubclass* session)
+ : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
+ , Ptr(session)
+ {
+ }
+
+ TBusSessionSubclass* Get() const {
+ return Ptr;
+ }
+ operator TBusSessionSubclass*() {
+ return Get();
+ }
+ TBusSessionSubclass& operator*() const {
+ return *Get();
+ }
+ TBusSessionSubclass* operator->() const {
+ return Get();
+ }
+
+ bool operator!() const {
+ return !Ptr;
+ }
+
+ void Swap(TBusSessionPtr& t) noexcept {
+ DoSwap(SmartPtr, t.SmartPtr);
+ DoSwap(Ptr, t.Ptr);
+ }
+
+ void Drop() {
+ TBusSessionPtr().Swap(*this);
+ }
+ };
}