aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session.cpp
blob: 772984a0869b6d8c06518660155fba704cf774c3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include "ybus.h"

#include <util/generic/cast.h>

using namespace NBus;

namespace NBus {
    TBusSession::TBusSession() { 
    } 

    //////////////////////////////////////////////////////////////////// 
    /// \brief Adds peer of connection into connection list 

    int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept { 
        if (l.Addr()->sa_family != r.Addr()->sa_family) { 
            return l.Addr()->sa_family < r.Addr()->sa_family ? -1 : +1; 
        } 
 
        switch (l.Addr()->sa_family) { 
            case AF_INET: { 
                return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr)); 
            } 

            case AF_INET6: { 
                return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr)); 
            } 
        }

        return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len())); 
    }

    bool operator<(const TNetAddr& a1, const TNetAddr& a2) { 
        return CompareByHost(a1, a2) < 0; 
    } 

    size_t TBusSession::GetInFlight(const TNetAddr& addr) const { 
        size_t r; 
        GetInFlightBulk({addr}, MakeArrayRef(&r, 1)); 
        return r; 
    } 

    size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const { 
        size_t r; 
        GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1)); 
        return r; 
    } 

    // Split 'host' into name and port taking into account that host can be specified 
    // as ipv6 address ('[<ipv6 address]:port' notion). 
    bool SplitHost(const TString& host, TString* hostName, TString* portNum) { 
        hostName->clear(); 
        portNum->clear(); 

        // Simple check that we have to deal with ipv6 address specification or 
        // just host name or ipv4 address. 
        if (!host.empty() && (host[0] == '[')) {
            size_t pos = host.find(']'); 
            if (pos < 2 || pos == TString::npos) { 
                // '[]' and '[<address>' are errors. 
                return false; 
            } 

            *hostName = host.substr(1, pos - 1); 

            pos++; 
            if (pos != host.length()) { 
                if (host[pos] != ':') { 
                    // Do not allow '[...]a' but '[...]:' is ok (as for ipv4 before 
                    return false; 
                } 

                *portNum = host.substr(pos + 1); 
            }
        } else { 
            size_t pos = host.find(':'); 
            if (pos != TString::npos) { 
                if (pos == 0) { 
                    // Treat ':<port>' as errors but allow or '<host>:' for compatibility. 
                    return false; 
                } 

                *portNum = host.substr(pos + 1); 
            }

            *hostName = host.substr(0, pos); 
        }

        return true; 
    }

    /// registers external session on host:port with locator service 
    int TBusSession::RegisterService(const char* host, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion ipVersion) { 
        TString hostName; 
        TString port; 
        int portNum; 

        if (!SplitHost(host, &hostName, &port)) { 
            hostName = host; 
        } 

        if (port.empty()) {
            portNum = GetProto()->GetPort(); 
        } else { 
            try { 
                portNum = FromString<int>(port); 
            } catch (const TFromStringException&) { 
                return -1; 
            } 
        } 

        TBusService service = GetProto()->GetService(); 
        return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion);
    }

    TBusSession::~TBusSession() { 
    }

}

TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) { 
    return queue->CreateSource(proto, handler, config);
}

TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) { 
    return queue->CreateDestination(proto, handler, config);
}

TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) { 
    return queue->CreateDestination(proto, handler, config, bindTo);
}