aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/handler.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/handler.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/handler.h')
-rw-r--r--library/cpp/messagebus/handler.h135
1 files changed, 135 insertions, 0 deletions
diff --git a/library/cpp/messagebus/handler.h b/library/cpp/messagebus/handler.h
new file mode 100644
index 0000000000..60002c68a6
--- /dev/null
+++ b/library/cpp/messagebus/handler.h
@@ -0,0 +1,135 @@
+#pragma once
+
+#include "defs.h"
+#include "message.h"
+#include "message_status.h"
+#include "use_after_free_checker.h"
+#include "use_count_checker.h"
+
+#include <util/generic/noncopyable.h>
+
+namespace NBus {
+ /////////////////////////////////////////////////////////////////
+ /// \brief Interface to message bus handler
+
+ struct IBusErrorHandler {
+ friend struct ::NBus::NPrivate::TBusSessionImpl;
+
+ private:
+ TUseAfterFreeChecker UseAfterFreeChecker;
+ TUseCountChecker UseCountChecker;
+
+ public:
+ /// called when message or reply can't be delivered
+ virtual void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status);
+
+ virtual ~IBusErrorHandler() {
+ }
+ };
+
+ class TClientConnectionEvent {
+ public:
+ enum EType {
+ CONNECTED,
+ DISCONNECTED,
+ };
+
+ private:
+ EType Type;
+ ui64 Id;
+ TNetAddr Addr;
+
+ public:
+ TClientConnectionEvent(EType type, ui64 id, TNetAddr addr)
+ : Type(type)
+ , Id(id)
+ , Addr(addr)
+ {
+ }
+
+ EType GetType() const {
+ return Type;
+ }
+ ui64 GetId() const {
+ return Id;
+ }
+ TNetAddr GetAddr() const {
+ return Addr;
+ }
+ };
+
+ class TOnMessageContext : TNonCopyable {
+ private:
+ THolder<TBusMessage> Message;
+ TBusIdentity Ident;
+ // TODO: we don't need to store session, we have connection in ident
+ TBusServerSession* Session;
+
+ public:
+ TOnMessageContext()
+ : Session()
+ {
+ }
+ TOnMessageContext(TAutoPtr<TBusMessage> message, TBusIdentity& ident, TBusServerSession* session)
+ : Message(message)
+ , Session(session)
+ {
+ Ident.Swap(ident);
+ }
+
+ bool IsInWork() const {
+ return Ident.IsInWork();
+ }
+
+ bool operator!() const {
+ return !IsInWork();
+ }
+
+ TBusMessage* GetMessage() {
+ return Message.Get();
+ }
+
+ TBusMessage* ReleaseMessage() {
+ return Message.Release();
+ }
+
+ TBusServerSession* GetSession() {
+ return Session;
+ }
+
+ template <typename U /* <: TBusMessage */>
+ EMessageStatus SendReplyAutoPtr(TAutoPtr<U>& rep);
+
+ EMessageStatus SendReplyMove(TBusMessageAutoPtr response);
+
+ void AckMessage(TBusIdentity& ident);
+
+ void ForgetRequest();
+
+ void Swap(TOnMessageContext& that) {
+ DoSwap(Message, that.Message);
+ Ident.Swap(that.Ident);
+ DoSwap(Session, that.Session);
+ }
+
+ TNetAddr GetPeerAddrNetAddr() const;
+
+ bool IsConnectionAlive() const;
+ };
+
+ struct IBusServerHandler: public IBusErrorHandler {
+ virtual void OnMessage(TOnMessageContext& onMessage) = 0;
+ /// called when reply has been sent from destination
+ virtual void OnSent(TAutoPtr<TBusMessage> pMessage);
+ };
+
+ struct IBusClientHandler: public IBusErrorHandler {
+ /// called on source when reply arrives from destination
+ virtual void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) = 0;
+ /// called when client side message has gone into wire, place to call AckMessage()
+ virtual void OnMessageSent(TBusMessage* pMessage);
+ virtual void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage);
+ virtual void OnClientConnectionEvent(const TClientConnectionEvent&);
+ };
+
+}