diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-23 19:33:27 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-23 19:33:27 +0300 |
commit | a77ae5f11416ba3add5205dea69d1efe9012a096 (patch) | |
tree | 8d45400465a373fe5babda2ad163e226c87c7cae | |
parent | 5b76ffead8d299254cded90dd586f64e81f90080 (diff) | |
download | ydb-a77ae5f11416ba3add5205dea69d1efe9012a096.tar.gz |
Connect sequentially to all available addresses KIKIMR-14911
ref:6bb441beeb02359f1262541c1c41aa700130e53c
7 files changed, 91 insertions, 62 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 474b3dba8d..7edb444346 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -232,7 +232,7 @@ namespace NActors { DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo") ui32 NodeId; - NAddr::IRemoteAddrPtr Address; + std::vector<NInterconnect::TAddress> Addresses; }; struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> { diff --git a/library/cpp/actors/interconnect/interconnect_address.cpp b/library/cpp/actors/interconnect/interconnect_address.cpp index 290aee1b55..d6adb8098d 100644 --- a/library/cpp/actors/interconnect/interconnect_address.cpp +++ b/library/cpp/actors/interconnect/interconnect_address.cpp @@ -70,6 +70,18 @@ namespace NInterconnect { : TAddress(addr.data(), port) {} + TAddress::TAddress(in_addr addr, ui16 port) { + Addr.Ipv4.sin_family = AF_INET; + Addr.Ipv4.sin_port = htons(port); + Addr.Ipv4.sin_addr = addr; + } + + TAddress::TAddress(in6_addr addr, ui16 port) { + Addr.Ipv6.sin6_family = AF_INET6; + Addr.Ipv6.sin6_port = htons(port); + Addr.Ipv6.sin6_addr = addr; + } + TString TAddress::GetAddress() const { const void *src; socklen_t size; diff --git a/library/cpp/actors/interconnect/interconnect_address.h b/library/cpp/actors/interconnect/interconnect_address.h index 5a78193abb..b19d751806 100644 --- a/library/cpp/actors/interconnect/interconnect_address.h +++ b/library/cpp/actors/interconnect/interconnect_address.h @@ -17,6 +17,8 @@ namespace NInterconnect { TAddress(); TAddress(const char* addr, ui16 port); TAddress(const TString& addr, ui16 port); + TAddress(in_addr addr, ui16 port); + TAddress(in6_addr addr, ui16 port); TAddress(NAddr::IRemoteAddr& addr); int GetFamily() const; socklen_t Size() const; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 78e114a574..116ccc2724 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -832,59 +832,66 @@ namespace NActors { Now() + ResolveTimeout); // extract address from the result - NInterconnect::TAddress address; + std::vector<NInterconnect::TAddress> addresses; if (!ev) { ResolveTimedOut = true; if (auto peerNodeInfo = GetPeerNodeInfo(); peerNodeInfo && peerNodeInfo->Address) { - address = {peerNodeInfo->Address, peerNodeInfo->Port}; + addresses.emplace_back(peerNodeInfo->Address, peerNodeInfo->Port); } else { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out and no static address defined", true); } } else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) { - if (!p->Address) { + addresses = std::move(p->Addresses); + if (addresses.empty()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true); } - address = {*p->Address}; } else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) { const auto& r = p->Record; if (!r.HasAddress() || !r.HasPort()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true); } - address = {r.GetAddress(), static_cast<ui16>(r.GetPort())}; + addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort())); } else { Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError)); Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain, true); } - // create the socket with matching address family - Socket = NInterconnect::TStreamSocket::Make(address.GetFamily()); - if (*Socket == -1) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket"); - } - - // extract peer address - if (updatePeerAddr) { - PeerAddr = address.ToString(); - } + for (const NInterconnect::TAddress& address : addresses) { + // create the socket with matching address family + int err = 0; + Socket = NInterconnect::TStreamSocket::Make(address.GetFamily(), &err); + if (err == EAFNOSUPPORT) { + continue; + } else if (*Socket == -1) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket"); + } - // set up socket parameters - SetupSocket(); + // extract peer address + if (updatePeerAddr) { + PeerAddr = address.ToString(); + } - // start connecting - switch (int err = -Socket->Connect(address)) { - case 0: // successful connection - break; + // set up socket parameters + SetupSocket(); - case EINPROGRESS: // connection in progress + // start connecting + err = -Socket->Connect(address); + if (err == EINPROGRESS) { WaitPoller(false, true, "WaitConnect"); err = Socket->GetConnectStatus(); - if (err) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Connection failed: %s", strerror(err)), true); - } - break; + } - default: + // check if connection succeeded + if (err) { + Socket.Reset(); + PollerToken.Reset(); + } else { break; + } + } + + if (!Socket) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Couldn't connect to any resolved address", true); } auto it = LastLogNotice.find(PeerNodeId); diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp index 14296194df..502de446c7 100644 --- a/library/cpp/actors/interconnect/interconnect_resolve.cpp +++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp @@ -41,7 +41,11 @@ namespace NActors { void Bootstrap() { TMaybe<TString> errorText; if (auto addr = ExtractDefaultAddr(errorText)) { - return SendAddrAndDie(std::move(addr)); + if (NodeId) { + return SendLocalNodeInfoAndDie({{*addr}}); + } else { + return SendAddressInfoAndDie(std::move(addr)); + } } if (errorText) { @@ -55,8 +59,10 @@ namespace NActors { } Send(MakeDnsResolverActorId(), - new TEvDns::TEvGetAddr(Host, AF_UNSPEC), - IEventHandle::FlagTrackDelivery); + NodeId + ? static_cast<IEventBase*>(new TEvDns::TEvGetHostByName(Host, AF_UNSPEC)) + : static_cast<IEventBase*>(new TEvDns::TEvGetAddr(Host, AF_UNSPEC)), + IEventHandle::FlagTrackDelivery); if (Deadline != TInstant::Max()) { Schedule(Deadline, new TEvents::TEvWakeup); @@ -69,6 +75,7 @@ namespace NActors { sFunc(TEvents::TEvWakeup, HandleTimeout); sFunc(TEvents::TEvUndelivered, HandleUndelivered); hFunc(TEvDns::TEvGetAddrResult, Handle); + hFunc(TEvDns::TEvGetHostByNameResult, Handle); }); void HandleTimeout() { @@ -81,23 +88,40 @@ namespace NActors { void Handle(TEvDns::TEvGetAddrResult::TPtr& ev) { if (auto addr = ExtractAddr(ev->Get())) { - return SendAddrAndDie(std::move(addr)); + SendAddressInfoAndDie(std::move(addr)); + } else { + SendErrorAndDie(ev->Get()->ErrorText); } - - SendErrorAndDie(ev->Get()->ErrorText); } - void SendAddrAndDie(NAddr::IRemoteAddrPtr addr) { - if (NodeId) { - auto reply = new TEvLocalNodeInfo; - reply->NodeId = *NodeId; - reply->Address = std::move(addr); - TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply)); + void Handle(TEvDns::TEvGetHostByNameResult::TPtr& ev) { + auto& msg = *ev->Get(); + if (msg.Status) { + SendErrorAndDie(msg.ErrorText); } else { - auto reply = new TEvAddressInfo; - reply->Address = std::move(addr); - TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply)); + std::vector<NInterconnect::TAddress> addresses; + for (const auto& ipv6 : msg.AddrsV6) { + addresses.emplace_back(ipv6, Port); + } + for (const auto& ipv4 : msg.AddrsV4) { + addresses.emplace_back(ipv4, Port); + } + SendLocalNodeInfoAndDie(std::move(addresses)); } + } + + void SendAddressInfoAndDie(NAddr::IRemoteAddrPtr addr) { + auto reply = new TEvAddressInfo; + reply->Address = std::move(addr); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply)); + PassAway(); + } + + void SendLocalNodeInfoAndDie(std::vector<NInterconnect::TAddress> addresses) { + auto reply = std::make_unique<TEvLocalNodeInfo>(); + reply->NodeId = *NodeId; + reply->Addresses = std::move(addresses); + TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release())); PassAway(); } diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..9b47f5b592 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); - ch.Push(*ev); + ch.Push(*ev, TInstant::Now()); if (!wasWorking) { scheduler.AddToHeap(ch, 0); } diff --git a/ydb/core/mind/node_broker_ut.cpp b/ydb/core/mind/node_broker_ut.cpp index d3186b5a6f..6293413ef1 100644 --- a/ydb/core/mind/node_broker_ut.cpp +++ b/ydb/core/mind/node_broker_ut.cpp @@ -492,22 +492,6 @@ void CheckLeaseExtension(TTestActorRuntime &runtime, } } -TString AddrToString(const struct sockaddr *sa) -{ - char str[INET6_ADDRSTRLEN]; - switch(sa->sa_family) { - case AF_INET: - inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), str, INET6_ADDRSTRLEN); - break; - case AF_INET6: - inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), str, INET6_ADDRSTRLEN); - break; - default: - return "unknown"; - } - return str; -} - void CheckResolveNode(TTestActorRuntime &runtime, TActorId sender, ui32 nodeId, @@ -521,7 +505,7 @@ void CheckResolveNode(TTestActorRuntime &runtime, UNIT_ASSERT(reply); UNIT_ASSERT_VALUES_EQUAL(reply->NodeId, nodeId); - UNIT_ASSERT_VALUES_EQUAL(AddrToString(reply->Address->Addr()), addr); + UNIT_ASSERT_VALUES_EQUAL(reply->Addresses[0].GetAddress(), addr); } void CheckResolveUnknownNode(TTestActorRuntime &runtime, @@ -536,7 +520,7 @@ void CheckResolveUnknownNode(TTestActorRuntime &runtime, UNIT_ASSERT(reply); UNIT_ASSERT_VALUES_EQUAL(reply->NodeId, nodeId); - UNIT_ASSERT(!reply->Address); + UNIT_ASSERT(reply->Addresses.empty()); } void GetNameserverNodesList(TTestActorRuntime &runtime, |