1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
  | 
#include "remote_server_connection.h"
#include "mb_lwtrace.h"
#include "remote_server_session.h"
#include <util/generic/cast.h>
LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
using namespace NBus;
using namespace NBus::NPrivate;
TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
    : TRemoteConnection(session.Get(), id, addr)
{
}
void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
    WriterData.Status.ConnectTime = now;
    WriterData.Status.Connected = true;
    Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
    TSocket readSocket(socket);
    TSocket writeSocket = readSocket;
    // this must not be done in constructor, because if event loop is stopped,
    // this is deleted
    WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
    WriterData.SocketVersion = 1;
    ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
}
TRemoteServerSession* TRemoteServerConnection::GetSession() {
    return CheckedCast<TRemoteServerSession*>(Session.Get());
}
void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
    Y_UNUSED(socket);
    Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
    if (cookie == ReadCookie) {
        GetSession()->ServerOwnedMessages.Wait();
        ScheduleRead();
    } else {
        ScheduleWrite();
    }
}
bool TRemoteServerConnection::NeedInterruptRead() {
    return !GetSession()->ServerOwnedMessages.TryWait();
}
void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
    TInstant now = TInstant::Now();
    GetSession()->ReleaseInWorkResponses(messages);
    for (auto& message : messages) {
        TInstant recvTime = message.MessagePtr->RecvTime;
        GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
        TDuration d = now - recvTime;
        WriterData.Status.DurationCounter.AddDuration(d);
        WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
    }
}
void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
    TBusHeader header(dataRef);
    // TODO: full version hex
    LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
    WrongVersionRequests.Enqueue(header);
    GetWriterActor()->Schedule();
}
  |