diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-01 02:44:57 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-14 14:50:33 +0300 |
commit | db8ce766560aefc563049ecad19d05bdda6d48b6 (patch) | |
tree | 2f5c84845dc78bbe9ea51374f39adc8d672b3d29 | |
parent | 9a90a0b585f215a1ac2e35c8fb28258eb24416f7 (diff) | |
download | ydb-22.2.38.1.tar.gz |
Support for automatic IPv6/v4 selection in IC KIKIMR-1497722.2.38.1
ref:9fc307f23e6bd58288b342be1ced17b0f82fbb9a
5 files changed, 59 insertions, 38 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_address.h b/library/cpp/actors/interconnect/interconnect_address.h index e9e0faec81..5a78193abb 100644 --- a/library/cpp/actors/interconnect/interconnect_address.h +++ b/library/cpp/actors/interconnect/interconnect_address.h @@ -25,5 +25,21 @@ namespace NInterconnect { ui16 GetPort() const; TString GetAddress() const; TString ToString() const; + + static TAddress AnyIPv4(ui16 port) { + TAddress res; + res.Addr.Ipv4.sin_family = AF_INET; + res.Addr.Ipv4.sin_port = htons(port); + res.Addr.Ipv4.sin_addr.s_addr = htonl(INADDR_ANY); + return res; + } + + static TAddress AnyIPv6(ui16 port) { + TAddress res; + res.Addr.Ipv6.sin6_family = AF_INET6; + res.Addr.Ipv6.sin6_port = htons(port); + res.Addr.Ipv6.sin6_addr = in6addr_any; + return res; + } }; } diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index 158ebc9e1d..ad46453acb 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -92,11 +92,14 @@ namespace NInterconnect { ///////////////////////////////////////////////////////////////// - TIntrusivePtr<TStreamSocket> TStreamSocket::Make(int domain) { + TIntrusivePtr<TStreamSocket> TStreamSocket::Make(int domain, int *error) { const SOCKET res = ::socket(domain, SOCK_STREAM | SOCK_NONBLOCK, 0); if (res == -1) { const int err = LastSocketError(); Y_VERIFY(err != EMFILE && err != ENFILE); + if (error) { + *error = err; + } } return MakeIntrusive<TStreamSocket>(res); } diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index 074adc6e74..3ba7914f77 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -46,7 +46,7 @@ namespace NInterconnect { public: TStreamSocket(SOCKET fd); - static TIntrusivePtr<TStreamSocket> Make(int domain); + static TIntrusivePtr<TStreamSocket> Make(int domain, int *error = nullptr); virtual ssize_t Send(const void* msg, size_t len, TString *err = nullptr) const; virtual ssize_t Recv(void* buf, size_t len, TString *err = nullptr) const; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index b95c994598..aad8677ca4 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -10,7 +10,8 @@ namespace NActors { TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket) : TActor(&TThis::Initial) , TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data())) - , Address(address.c_str(), port) + , Address(address) + , Port(port) , Listener( socket ? new NInterconnect::TStreamSocket(*socket) @@ -33,54 +34,54 @@ namespace NActors { } int TInterconnectListenerTCP::Bind() { - NInterconnect::TAddress addr = Address; - - if (ProxyCommonCtx->Settings.BindOnAllAddresses) { - switch (addr.GetFamily()) { - case AF_INET: { - auto *sa = reinterpret_cast<sockaddr_in*>(addr.SockAddr()); - sa->sin_addr = {INADDR_ANY}; - break; - } - - case AF_INET6: { - auto *sa = reinterpret_cast<sockaddr_in6*>(addr.SockAddr()); - sa->sin6_addr = in6addr_any; - break; - } - - default: - Y_FAIL("Unsupported address family"); + auto doTry = [&](NInterconnect::TAddress addr) { + int error; + Listener = NInterconnect::TStreamSocket::Make(addr.GetFamily(), &error); + if (*Listener == -1) { + return error; } - } + SetNonBlock(*Listener); + Listener->SetSendBufferSize(ProxyCommonCtx->Settings.GetSendBufferSize()); // TODO(alexvru): WTF? + SetSockOpt(*Listener, SOL_SOCKET, SO_REUSEADDR, 1); + if (addr.GetFamily() == AF_INET6) { + SetSockOpt(*Listener, IPPROTO_IPV6, IPV6_V6ONLY, 0); + } + if (const auto e = -Listener->Bind(addr)) { + return e; + } else if (const auto e = -Listener->Listen(SOMAXCONN)) { + return e; + } else { + return 0; + } + }; - Listener = NInterconnect::TStreamSocket::Make(addr.GetFamily()); - if (*Listener == -1) { - return errno; - } - SetNonBlock(*Listener); - Listener->SetSendBufferSize(ProxyCommonCtx->Settings.GetSendBufferSize()); // TODO(alexvru): WTF? - SetSockOpt(*Listener, SOL_SOCKET, SO_REUSEADDR, 1); - if (const auto e = -Listener->Bind(addr)) { - return e; - } else if (const auto e = -Listener->Listen(SOMAXCONN)) { - return e; + if (Address) { + NInterconnect::TAddress addr(Address, Port); + if (ProxyCommonCtx->Settings.BindOnAllAddresses) { + addr = addr.GetFamily() == AF_INET ? NInterconnect::TAddress::AnyIPv4(Port) : + addr.GetFamily() == AF_INET6 ? NInterconnect::TAddress::AnyIPv6(Port) : addr; + } + return doTry(addr); } else { - return 0; + int error = doTry(NInterconnect::TAddress::AnyIPv6(Port)); + if (error == EAFNOSUPPORT || error == EPROTONOSUPPORT) { + error = doTry(NInterconnect::TAddress::AnyIPv4(Port)); + } + return error; } } void TInterconnectListenerTCP::Bootstrap(const TActorContext& ctx) { if (!Listener) { if (const int err = Bind()) { - LOG_ERROR_IC("ICL01", "Bind failed: %s (%s)", strerror(err), Address.ToString().data()); + LOG_ERROR_IC("ICL01", "Bind failed: %s (%s:%u)", strerror(err), Address.data(), Port); Listener.Reset(); Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap); return; } } if (const auto& callback = ProxyCommonCtx->InitWhiteboard) { - callback(Address.GetPort(), TlsActivationContext->ExecutorThread.ActorSystem); + callback(Port, TlsActivationContext->ExecutorThread.ActorSystem); } const bool success = ctx.Send(MakePollerActorId(), new TEvPollerRegister(Listener, SelfId(), {})); Y_VERIFY(success); @@ -103,7 +104,7 @@ namespace NActors { continue; } else if (-r != EAGAIN && -r != EWOULDBLOCK) { Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket); - LOG_ERROR_IC("ICL06", "Listen failed: %s (%s)", strerror(-r), Address.ToString().data()); + LOG_ERROR_IC("ICL06", "Listen failed: %s (%s:%u)", strerror(-r), Address.data(), Port); Listener.Reset(); PollerToken.Reset(); Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h index fc71073c2d..d3538940b7 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h @@ -43,7 +43,8 @@ namespace NActors { void Process(const TActorContext& ctx); - const NInterconnect::TAddress Address; + const TString Address; + const ui16 Port; TIntrusivePtr<NInterconnect::TStreamSocket> Listener; const bool ExternalSocket; TPollerToken::TPtr PollerToken; |