aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/session.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/session.cpp')
-rw-r--r--library/cpp/messagebus/session.cpp130
1 files changed, 130 insertions, 0 deletions
diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp
new file mode 100644
index 0000000000..46a7ece6a8
--- /dev/null
+++ b/library/cpp/messagebus/session.cpp
@@ -0,0 +1,130 @@
+#include "ybus.h"
+
+#include <util/generic/cast.h>
+
+using namespace NBus;
+
+namespace NBus {
+ TBusSession::TBusSession() {
+ }
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief Adds peer of connection into connection list
+
+ int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept {
+ if (l.Addr()->sa_family != r.Addr()->sa_family) {
+ return l.Addr()->sa_family < r.Addr()->sa_family ? -1 : +1;
+ }
+
+ switch (l.Addr()->sa_family) {
+ case AF_INET: {
+ return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr));
+ }
+
+ case AF_INET6: {
+ return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr));
+ }
+ }
+
+ return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len()));
+ }
+
+ bool operator<(const TNetAddr& a1, const TNetAddr& a2) {
+ return CompareByHost(a1, a2) < 0;
+ }
+
+ size_t TBusSession::GetInFlight(const TNetAddr& addr) const {
+ size_t r;
+ GetInFlightBulk({addr}, MakeArrayRef(&r, 1));
+ return r;
+ }
+
+ size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const {
+ size_t r;
+ GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1));
+ return r;
+ }
+
+ // Split 'host' into name and port taking into account that host can be specified
+ // as ipv6 address ('[<ipv6 address]:port' notion).
+ bool SplitHost(const TString& host, TString* hostName, TString* portNum) {
+ hostName->clear();
+ portNum->clear();
+
+ // Simple check that we have to deal with ipv6 address specification or
+ // just host name or ipv4 address.
+ if (!host.empty() && (host[0] == '[')) {
+ size_t pos = host.find(']');
+ if (pos < 2 || pos == TString::npos) {
+ // '[]' and '[<address>' are errors.
+ return false;
+ }
+
+ *hostName = host.substr(1, pos - 1);
+
+ pos++;
+ if (pos != host.length()) {
+ if (host[pos] != ':') {
+ // Do not allow '[...]a' but '[...]:' is ok (as for ipv4 before
+ return false;
+ }
+
+ *portNum = host.substr(pos + 1);
+ }
+ } else {
+ size_t pos = host.find(':');
+ if (pos != TString::npos) {
+ if (pos == 0) {
+ // Treat ':<port>' as errors but allow or '<host>:' for compatibility.
+ return false;
+ }
+
+ *portNum = host.substr(pos + 1);
+ }
+
+ *hostName = host.substr(0, pos);
+ }
+
+ return true;
+ }
+
+ /// registers external session on host:port with locator service
+ int TBusSession::RegisterService(const char* host, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion ipVersion) {
+ TString hostName;
+ TString port;
+ int portNum;
+
+ if (!SplitHost(host, &hostName, &port)) {
+ hostName = host;
+ }
+
+ if (port.empty()) {
+ portNum = GetProto()->GetPort();
+ } else {
+ try {
+ portNum = FromString<int>(port);
+ } catch (const TFromStringException&) {
+ return -1;
+ }
+ }
+
+ TBusService service = GetProto()->GetService();
+ return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion);
+ }
+
+ TBusSession::~TBusSession() {
+ }
+
+}
+
+TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) {
+ return queue->CreateSource(proto, handler, config);
+}
+
+TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) {
+ return queue->CreateDestination(proto, handler, config);
+}
+
+TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) {
+ return queue->CreateDestination(proto, handler, config, bindTo);
+}