aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_server_connection.cpp
blob: 9af7fb21857a2b93b12464ebceee730f26d4f597 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include "remote_server_connection.h" 
 
#include "mb_lwtrace.h"
#include "remote_server_session.h" 
 
#include <util/generic/cast.h>
 
LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) 
 
using namespace NBus; 
using namespace NBus::NPrivate; 
 
TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr) 
    : TRemoteConnection(session.Get(), id, addr) 
{ 
} 
 
void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
    WriterData.Status.ConnectTime = now; 
    WriterData.Status.Connected = true; 
 
    Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
 
    TSocket readSocket(socket); 
    TSocket writeSocket = readSocket; 
 
    // this must not be done in constructor, because if event loop is stopped, 
    // this is deleted 
    WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie)); 
    WriterData.SocketVersion = 1; 
 
    ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); 
} 
 
TRemoteServerSession* TRemoteServerConnection::GetSession() { 
    return CheckedCast<TRemoteServerSession*>(Session.Get()); 
} 
 
void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) { 
    Y_UNUSED(socket);
    Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
    if (cookie == ReadCookie) { 
        GetSession()->ServerOwnedMessages.Wait(); 
        ScheduleRead(); 
    } else { 
        ScheduleWrite(); 
    } 
} 
 
bool TRemoteServerConnection::NeedInterruptRead() { 
    return !GetSession()->ServerOwnedMessages.TryWait(); 
} 
 
void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
    TInstant now = TInstant::Now(); 
 
    GetSession()->ReleaseInWorkResponses(messages); 
    for (auto& message : messages) {
        TInstant recvTime = message.MessagePtr->RecvTime;
        GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
        TDuration d = now - recvTime; 
        WriterData.Status.DurationCounter.AddDuration(d); 
        WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d); 
    } 
} 
 
void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
    TBusHeader header(dataRef); 
    // TODO: full version hex 
    LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal()); 
    WrongVersionRequests.Enqueue(header); 
    GetWriterActor()->Schedule(); 
}