1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
#include "remote_server_connection.h"
#include "mb_lwtrace.h"
#include "remote_server_session.h"
#include <util/generic/cast.h>
LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
using namespace NBus;
using namespace NBus::NPrivate;
TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
: TRemoteConnection(session.Get(), id, addr)
{
}
void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
WriterData.Status.ConnectTime = now;
WriterData.Status.Connected = true;
Y_ABORT_UNLESS(socket != INVALID_SOCKET, "must be a valid socket");
TSocket readSocket(socket);
TSocket writeSocket = readSocket;
// this must not be done in constructor, because if event loop is stopped,
// this is deleted
WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
WriterData.SocketVersion = 1;
ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
}
TRemoteServerSession* TRemoteServerConnection::GetSession() {
return CheckedCast<TRemoteServerSession*>(Session.Get());
}
void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
Y_UNUSED(socket);
Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
if (cookie == ReadCookie) {
GetSession()->ServerOwnedMessages.Wait();
ScheduleRead();
} else {
ScheduleWrite();
}
}
bool TRemoteServerConnection::NeedInterruptRead() {
return !GetSession()->ServerOwnedMessages.TryWait();
}
void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
TInstant now = TInstant::Now();
GetSession()->ReleaseInWorkResponses(messages);
for (auto& message : messages) {
TInstant recvTime = message.MessagePtr->RecvTime;
GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
TDuration d = now - recvTime;
WriterData.Status.DurationCounter.AddDuration(d);
WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
}
}
void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
TBusHeader header(dataRef);
// TODO: full version hex
LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
WrongVersionRequests.Enqueue(header);
GetWriterActor()->Schedule();
}
|