aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_server_connection.cpp
blob: a87ca51e429938d06dda4b8e8f61f2d130e14238 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include "remote_server_connection.h"

#include "mb_lwtrace.h" 
#include "remote_server_session.h"

#include <util/generic/cast.h> 

LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)

using namespace NBus;
using namespace NBus::NPrivate;

TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
    : TRemoteConnection(session.Get(), id, addr)
{
}

void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
    WriterData.Status.ConnectTime = now;
    WriterData.Status.Connected = true;

    Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");

    TSocket readSocket(socket);
    TSocket writeSocket = readSocket;

    // this must not be done in constructor, because if event loop is stopped,
    // this is deleted
    WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
    WriterData.SocketVersion = 1;

    ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
}

TRemoteServerSession* TRemoteServerConnection::GetSession() {
    return CheckedCast<TRemoteServerSession*>(Session.Get());
}

void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
    Y_UNUSED(socket);
    Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
    if (cookie == ReadCookie) {
        GetSession()->ServerOwnedMessages.Wait();
        ScheduleRead();
    } else {
        ScheduleWrite();
    }
}

bool TRemoteServerConnection::NeedInterruptRead() {
    return !GetSession()->ServerOwnedMessages.TryWait();
}

void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
    TInstant now = TInstant::Now();

    GetSession()->ReleaseInWorkResponses(messages);
    for (auto& message : messages) {
        TInstant recvTime = message.MessagePtr->RecvTime;
        GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
        TDuration d = now - recvTime;
        WriterData.Status.DurationCounter.AddDuration(d);
        WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
    }
}

void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
    TBusHeader header(dataRef);
    // TODO: full version hex
    LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
    WrongVersionRequests.Enqueue(header);
    GetWriterActor()->Schedule();
}