1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
#pragma once
#include "connection.h"
#include "defs.h"
#include "handler.h"
#include "message.h"
#include "netaddr.h"
#include "network.h"
#include "session_config.h"
#include "misc/weak_ptr.h"
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
#include <util/generic/array_ref.h>
#include <util/generic/ptr.h>
namespace NBus {
template <typename TBusSessionSubclass>
class TBusSessionPtr;
using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
///////////////////////////////////////////////////////////////////
/// \brief Interface of session object.
/// Each client and server
/// should instantiate session object to be able to communicate via bus
/// client: sess = queue->CreateSource(protocol, handler);
/// server: sess = queue->CreateDestination(protocol, handler);
class TBusSession: public TWeakRefCounted<TBusSession> {
public:
size_t GetInFlight(const TNetAddr& addr) const;
size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
virtual int GetInFlight() const noexcept = 0;
/// monitoring status of current session and it's connections
virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
virtual TString GetStatusSingleLine() = 0;
/// return session config
virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
/// return session protocol
virtual const TBusProtocol* GetProto() const noexcept = 0;
virtual TBusMessageQueue* GetQueue() const noexcept = 0;
/// registers external session on host:port with locator service
int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
protected:
TBusSession();
public:
virtual TString GetNameInternal() = 0;
virtual void Shutdown() = 0;
virtual ~TBusSession();
};
struct TBusClientSession: public virtual TBusSession {
typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
static TBusClientSessionPtr Create(
TBusProtocol* proto,
IBusClientHandler* handler,
const TBusClientSessionConfig& config,
TBusMessageQueuePtr queue);
virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
/// if you want to open connection early
virtual void OpenConnection(const TNetAddr&) = 0;
/// Send message to the destination
/// If addr is set then use it as destination.
/// Takes ownership of addr (see ClearState method).
virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
/// Like SendMessage but cares about message
template <typename T /* <: TBusMessage */>
EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
EMessageStatus status = SendMessage(mes.Get(), addr, wait);
if (status == MESSAGE_OK)
Y_UNUSED(mes.Release());
return status;
}
/// Like SendMessageOneWay but cares about message
template <typename T /* <: TBusMessage */>
EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
if (status == MESSAGE_OK)
Y_UNUSED(mes.Release());
return status;
}
EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
return SendMessageAutoPtr(message, addr, wait);
}
EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
return SendMessageOneWayAutoPtr(message, addr, wait);
}
// TODO: implement similar one-way methods
};
struct TBusServerSession: public virtual TBusSession {
typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
static TBusServerSessionPtr Create(
TBusProtocol* proto,
IBusServerHandler* handler,
const TBusServerSessionConfig& config,
TBusMessageQueuePtr queue);
static TBusServerSessionPtr Create(
TBusProtocol* proto,
IBusServerHandler* handler,
const TBusServerSessionConfig& config,
TBusMessageQueuePtr queue,
const TVector<TBindResult>& bindTo);
// TODO: make parameter non-const
virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
// TODO: make parameter non-const
virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
template <typename U /* <: TBusMessage */>
EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
if (status == MESSAGE_OK) {
Y_UNUSED(resp.Release());
}
return status;
}
EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
return SendReplyAutoPtr(ident, resp);
}
/// Pause input from the network.
/// It is valid to call this method in parallel.
/// TODO: pull this method up to TBusSession.
virtual void PauseInput(bool pause) = 0;
virtual unsigned GetActualListenPort() = 0;
};
namespace NPrivate {
template <typename TBusSessionSubclass>
class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
private:
TIntrusivePtr<TBusSessionSubclass> Ptr;
public:
TBusOwnerSessionPtr(TBusSessionSubclass* session)
: Ptr(session)
{
Y_ASSERT(!!Ptr);
}
~TBusOwnerSessionPtr() {
Ptr->Shutdown();
}
TBusSessionSubclass* Get() const {
return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
}
};
}
template <typename TBusSessionSubclass>
class TBusSessionPtr {
private:
TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
TBusSessionSubclass* Ptr;
public:
TBusSessionPtr()
: Ptr()
{
}
TBusSessionPtr(TBusSessionSubclass* session)
: SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
, Ptr(session)
{
}
TBusSessionSubclass* Get() const {
return Ptr;
}
operator TBusSessionSubclass*() {
return Get();
}
TBusSessionSubclass& operator*() const {
return *Get();
}
TBusSessionSubclass* operator->() const {
return Get();
}
bool operator!() const {
return !Ptr;
}
void Swap(TBusSessionPtr& t) noexcept {
DoSwap(SmartPtr, t.SmartPtr);
DoSwap(Ptr, t.Ptr);
}
void Drop() {
TBusSessionPtr().Swap(*this);
}
};
}
|