aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session.h
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/session.h
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session.h')
-rw-r--r--library/cpp/messagebus/session.h112
1 files changed, 56 insertions, 56 deletions
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h
index fb12ab7c22..5a1a01d808 100644
--- a/library/cpp/messagebus/session.h
+++ b/library/cpp/messagebus/session.h
@@ -1,41 +1,41 @@
-#pragma once
-
+#pragma once
+
#include "connection.h"
-#include "defs.h"
+#include "defs.h"
#include "handler.h"
-#include "message.h"
-#include "netaddr.h"
+#include "message.h"
+#include "netaddr.h"
#include "network.h"
-#include "session_config.h"
+#include "session_config.h"
#include "misc/weak_ptr.h"
-
+
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
-
+
#include <util/generic/array_ref.h>
#include <util/generic/ptr.h>
-namespace NBus {
+namespace NBus {
template <typename TBusSessionSubclass>
class TBusSessionPtr;
using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
-
+
///////////////////////////////////////////////////////////////////
/// \brief Interface of session object.
-
+
/// Each client and server
/// should instantiate session object to be able to communicate via bus
/// client: sess = queue->CreateSource(protocol, handler);
/// server: sess = queue->CreateDestination(protocol, handler);
-
+
class TBusSession: public TWeakRefCounted<TBusSession> {
public:
size_t GetInFlight(const TNetAddr& addr) const;
size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
-
+
virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
-
+
virtual int GetInFlight() const noexcept = 0;
/// monitoring status of current session and it's connections
virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
@@ -47,42 +47,42 @@ namespace NBus {
/// return session protocol
virtual const TBusProtocol* GetProto() const noexcept = 0;
virtual TBusMessageQueue* GetQueue() const noexcept = 0;
-
+
/// registers external session on host:port with locator service
int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
-
+
protected:
TBusSession();
-
+
public:
virtual TString GetNameInternal() = 0;
-
+
virtual void Shutdown() = 0;
-
+
virtual ~TBusSession();
};
-
+
struct TBusClientSession: public virtual TBusSession {
typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
-
+
static TBusClientSessionPtr Create(
TBusProtocol* proto,
IBusClientHandler* handler,
- const TBusClientSessionConfig& config,
- TBusMessageQueuePtr queue);
-
+ const TBusClientSessionConfig& config,
+ TBusMessageQueuePtr queue);
+
virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
-
+
/// if you want to open connection early
virtual void OpenConnection(const TNetAddr&) = 0;
-
+
/// Send message to the destination
/// If addr is set then use it as destination.
/// Takes ownership of addr (see ClearState method).
virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
-
+
virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
-
+
/// Like SendMessage but cares about message
template <typename T /* <: TBusMessage */>
EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
@@ -91,7 +91,7 @@ namespace NBus {
Y_UNUSED(mes.Release());
return status;
}
-
+
/// Like SendMessageOneWay but cares about message
template <typename T /* <: TBusMessage */>
EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
@@ -100,27 +100,27 @@ namespace NBus {
Y_UNUSED(mes.Release());
return status;
}
-
+
EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
return SendMessageAutoPtr(message, addr, wait);
}
-
+
EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
return SendMessageOneWayAutoPtr(message, addr, wait);
}
-
+
// TODO: implement similar one-way methods
};
-
+
struct TBusServerSession: public virtual TBusSession {
typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
-
+
static TBusServerSessionPtr Create(
TBusProtocol* proto,
IBusServerHandler* handler,
- const TBusServerSessionConfig& config,
- TBusMessageQueuePtr queue);
-
+ const TBusServerSessionConfig& config,
+ TBusMessageQueuePtr queue);
+
static TBusServerSessionPtr Create(
TBusProtocol* proto,
IBusServerHandler* handler,
@@ -130,10 +130,10 @@ namespace NBus {
// TODO: make parameter non-const
virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
-
+
// TODO: make parameter non-const
virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
-
+
template <typename U /* <: TBusMessage */>
EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
@@ -141,49 +141,49 @@ namespace NBus {
Y_UNUSED(resp.Release());
}
return status;
- }
-
+ }
+
EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
return SendReplyAutoPtr(ident, resp);
}
-
+
/// Pause input from the network.
/// It is valid to call this method in parallel.
/// TODO: pull this method up to TBusSession.
virtual void PauseInput(bool pause) = 0;
virtual unsigned GetActualListenPort() = 0;
};
-
+
namespace NPrivate {
template <typename TBusSessionSubclass>
class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
private:
TIntrusivePtr<TBusSessionSubclass> Ptr;
-
+
public:
TBusOwnerSessionPtr(TBusSessionSubclass* session)
: Ptr(session)
{
Y_ASSERT(!!Ptr);
}
-
+
~TBusOwnerSessionPtr() {
Ptr->Shutdown();
}
-
+
TBusSessionSubclass* Get() const {
return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
}
};
-
- }
-
+
+ }
+
template <typename TBusSessionSubclass>
class TBusSessionPtr {
private:
TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
TBusSessionSubclass* Ptr;
-
+
public:
TBusSessionPtr()
: Ptr()
@@ -194,7 +194,7 @@ namespace NBus {
, Ptr(session)
{
}
-
+
TBusSessionSubclass* Get() const {
return Ptr;
}
@@ -207,19 +207,19 @@ namespace NBus {
TBusSessionSubclass* operator->() const {
return Get();
}
-
+
bool operator!() const {
return !Ptr;
}
-
+
void Swap(TBusSessionPtr& t) noexcept {
DoSwap(SmartPtr, t.SmartPtr);
DoSwap(Ptr, t.Ptr);
}
-
+
void Drop() {
TBusSessionPtr().Swap(*this);
}
};
-
-}
+
+}