1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#pragma once
#include "defs.h"
#include "message.h"
#include "message_status.h"
#include "use_after_free_checker.h"
#include "use_count_checker.h"
#include <util/generic/noncopyable.h>
namespace NBus {
/////////////////////////////////////////////////////////////////
/// \brief Interface to message bus handler
struct IBusErrorHandler {
friend struct ::NBus::NPrivate::TBusSessionImpl;
private:
TUseAfterFreeChecker UseAfterFreeChecker;
TUseCountChecker UseCountChecker;
public:
/// called when message or reply can't be delivered
virtual void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status);
virtual ~IBusErrorHandler() {
}
};
class TClientConnectionEvent {
public:
enum EType {
CONNECTED,
DISCONNECTED,
};
private:
EType Type;
ui64 Id;
TNetAddr Addr;
public:
TClientConnectionEvent(EType type, ui64 id, TNetAddr addr)
: Type(type)
, Id(id)
, Addr(addr)
{
}
EType GetType() const {
return Type;
}
ui64 GetId() const {
return Id;
}
TNetAddr GetAddr() const {
return Addr;
}
};
class TOnMessageContext : TNonCopyable {
private:
THolder<TBusMessage> Message;
TBusIdentity Ident;
// TODO: we don't need to store session, we have connection in ident
TBusServerSession* Session;
public:
TOnMessageContext()
: Session()
{
}
TOnMessageContext(TAutoPtr<TBusMessage> message, TBusIdentity& ident, TBusServerSession* session)
: Message(message)
, Session(session)
{
Ident.Swap(ident);
}
bool IsInWork() const {
return Ident.IsInWork();
}
bool operator!() const {
return !IsInWork();
}
TBusMessage* GetMessage() {
return Message.Get();
}
TBusMessage* ReleaseMessage() {
return Message.Release();
}
TBusServerSession* GetSession() {
return Session;
}
template <typename U /* <: TBusMessage */>
EMessageStatus SendReplyAutoPtr(TAutoPtr<U>& rep);
EMessageStatus SendReplyMove(TBusMessageAutoPtr response);
void AckMessage(TBusIdentity& ident);
void ForgetRequest();
void Swap(TOnMessageContext& that) {
DoSwap(Message, that.Message);
Ident.Swap(that.Ident);
DoSwap(Session, that.Session);
}
TNetAddr GetPeerAddrNetAddr() const;
bool IsConnectionAlive() const;
};
struct IBusServerHandler: public IBusErrorHandler {
virtual void OnMessage(TOnMessageContext& onMessage) = 0;
/// called when reply has been sent from destination
virtual void OnSent(TAutoPtr<TBusMessage> pMessage);
};
struct IBusClientHandler: public IBusErrorHandler {
/// called on source when reply arrives from destination
virtual void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) = 0;
/// called when client side message has gone into wire, place to call AckMessage()
virtual void OnMessageSent(TBusMessage* pMessage);
virtual void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage);
virtual void OnClientConnectionEvent(const TClientConnectionEvent&);
};
}
|