aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/session.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/session.h')
-rw-r--r--library/cpp/messagebus/session.h225
1 files changed, 225 insertions, 0 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h
new file mode 100644
index 0000000000..fb12ab7c22
--- /dev/null
+++ b/library/cpp/messagebus/session.h
@@ -0,0 +1,225 @@
+#pragma once
+
+#include "connection.h"
+#include "defs.h"
+#include "handler.h"
+#include "message.h"
+#include "netaddr.h"
+#include "network.h"
+#include "session_config.h"
+#include "misc/weak_ptr.h"
+
+#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
+
+#include <util/generic/array_ref.h>
+#include <util/generic/ptr.h>
+
+namespace NBus {
+ template <typename TBusSessionSubclass>
+ class TBusSessionPtr;
+ using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
+ using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
+
+ ///////////////////////////////////////////////////////////////////
+ /// \brief Interface of session object.
+
+ /// Each client and server
+ /// should instantiate session object to be able to communicate via bus
+ /// client: sess = queue->CreateSource(protocol, handler);
+ /// server: sess = queue->CreateDestination(protocol, handler);
+
+ class TBusSession: public TWeakRefCounted<TBusSession> {
+ public:
+ size_t GetInFlight(const TNetAddr& addr) const;
+ size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
+
+ virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
+ virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
+
+ virtual int GetInFlight() const noexcept = 0;
+ /// monitoring status of current session and it's connections
+ virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
+ virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
+ virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
+ virtual TString GetStatusSingleLine() = 0;
+ /// return session config
+ virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
+ /// return session protocol
+ virtual const TBusProtocol* GetProto() const noexcept = 0;
+ virtual TBusMessageQueue* GetQueue() const noexcept = 0;
+
+ /// registers external session on host:port with locator service
+ int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
+
+ protected:
+ TBusSession();
+
+ public:
+ virtual TString GetNameInternal() = 0;
+
+ virtual void Shutdown() = 0;
+
+ virtual ~TBusSession();
+ };
+
+ struct TBusClientSession: public virtual TBusSession {
+ typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
+
+ static TBusClientSessionPtr Create(
+ TBusProtocol* proto,
+ IBusClientHandler* handler,
+ const TBusClientSessionConfig& config,
+ TBusMessageQueuePtr queue);
+
+ virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
+
+ /// if you want to open connection early
+ virtual void OpenConnection(const TNetAddr&) = 0;
+
+ /// Send message to the destination
+ /// If addr is set then use it as destination.
+ /// Takes ownership of addr (see ClearState method).
+ virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
+
+ virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
+
+ /// Like SendMessage but cares about message
+ template <typename T /* <: TBusMessage */>
+ EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
+ EMessageStatus status = SendMessage(mes.Get(), addr, wait);
+ if (status == MESSAGE_OK)
+ Y_UNUSED(mes.Release());
+ return status;
+ }
+
+ /// Like SendMessageOneWay but cares about message
+ template <typename T /* <: TBusMessage */>
+ EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
+ EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
+ if (status == MESSAGE_OK)
+ Y_UNUSED(mes.Release());
+ return status;
+ }
+
+ EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
+ return SendMessageAutoPtr(message, addr, wait);
+ }
+
+ EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
+ return SendMessageOneWayAutoPtr(message, addr, wait);
+ }
+
+ // TODO: implement similar one-way methods
+ };
+
+ struct TBusServerSession: public virtual TBusSession {
+ typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
+
+ static TBusServerSessionPtr Create(
+ TBusProtocol* proto,
+ IBusServerHandler* handler,
+ const TBusServerSessionConfig& config,
+ TBusMessageQueuePtr queue);
+
+ static TBusServerSessionPtr Create(
+ TBusProtocol* proto,
+ IBusServerHandler* handler,
+ const TBusServerSessionConfig& config,
+ TBusMessageQueuePtr queue,
+ const TVector<TBindResult>& bindTo);
+
+ // TODO: make parameter non-const
+ virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
+
+ // TODO: make parameter non-const
+ virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
+
+ template <typename U /* <: TBusMessage */>
+ EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
+ EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
+ if (status == MESSAGE_OK) {
+ Y_UNUSED(resp.Release());
+ }
+ return status;
+ }
+
+ EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
+ return SendReplyAutoPtr(ident, resp);
+ }
+
+ /// Pause input from the network.
+ /// It is valid to call this method in parallel.
+ /// TODO: pull this method up to TBusSession.
+ virtual void PauseInput(bool pause) = 0;
+ virtual unsigned GetActualListenPort() = 0;
+ };
+
+ namespace NPrivate {
+ template <typename TBusSessionSubclass>
+ class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
+ private:
+ TIntrusivePtr<TBusSessionSubclass> Ptr;
+
+ public:
+ TBusOwnerSessionPtr(TBusSessionSubclass* session)
+ : Ptr(session)
+ {
+ Y_ASSERT(!!Ptr);
+ }
+
+ ~TBusOwnerSessionPtr() {
+ Ptr->Shutdown();
+ }
+
+ TBusSessionSubclass* Get() const {
+ return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
+ }
+ };
+
+ }
+
+ template <typename TBusSessionSubclass>
+ class TBusSessionPtr {
+ private:
+ TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
+ TBusSessionSubclass* Ptr;
+
+ public:
+ TBusSessionPtr()
+ : Ptr()
+ {
+ }
+ TBusSessionPtr(TBusSessionSubclass* session)
+ : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
+ , Ptr(session)
+ {
+ }
+
+ TBusSessionSubclass* Get() const {
+ return Ptr;
+ }
+ operator TBusSessionSubclass*() {
+ return Get();
+ }
+ TBusSessionSubclass& operator*() const {
+ return *Get();
+ }
+ TBusSessionSubclass* operator->() const {
+ return Get();
+ }
+
+ bool operator!() const {
+ return !Ptr;
+ }
+
+ void Swap(TBusSessionPtr& t) noexcept {
+ DoSwap(SmartPtr, t.SmartPtr);
+ DoSwap(Ptr, t.Ptr);
+ }
+
+ void Drop() {
+ TBusSessionPtr().Swap(*this);
+ }
+ };
+
+}