aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-23 19:33:27 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-23 19:33:27 +0300
commita77ae5f11416ba3add5205dea69d1efe9012a096 (patch)
tree8d45400465a373fe5babda2ad163e226c87c7cae
parent5b76ffead8d299254cded90dd586f64e81f90080 (diff)
downloadydb-a77ae5f11416ba3add5205dea69d1efe9012a096.tar.gz
Connect sequentially to all available addresses KIKIMR-14911
ref:6bb441beeb02359f1262541c1c41aa700130e53c
-rw-r--r--library/cpp/actors/interconnect/events_local.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_address.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_address.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp61
-rw-r--r--library/cpp/actors/interconnect/interconnect_resolve.cpp54
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--ydb/core/mind/node_broker_ut.cpp20
7 files changed, 91 insertions, 62 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 474b3dba8d..7edb444346 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -232,7 +232,7 @@ namespace NActors {
DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo")
ui32 NodeId;
- NAddr::IRemoteAddrPtr Address;
+ std::vector<NInterconnect::TAddress> Addresses;
};
struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> {
diff --git a/library/cpp/actors/interconnect/interconnect_address.cpp b/library/cpp/actors/interconnect/interconnect_address.cpp
index 290aee1b55..d6adb8098d 100644
--- a/library/cpp/actors/interconnect/interconnect_address.cpp
+++ b/library/cpp/actors/interconnect/interconnect_address.cpp
@@ -70,6 +70,18 @@ namespace NInterconnect {
: TAddress(addr.data(), port)
{}
+ TAddress::TAddress(in_addr addr, ui16 port) {
+ Addr.Ipv4.sin_family = AF_INET;
+ Addr.Ipv4.sin_port = htons(port);
+ Addr.Ipv4.sin_addr = addr;
+ }
+
+ TAddress::TAddress(in6_addr addr, ui16 port) {
+ Addr.Ipv6.sin6_family = AF_INET6;
+ Addr.Ipv6.sin6_port = htons(port);
+ Addr.Ipv6.sin6_addr = addr;
+ }
+
TString TAddress::GetAddress() const {
const void *src;
socklen_t size;
diff --git a/library/cpp/actors/interconnect/interconnect_address.h b/library/cpp/actors/interconnect/interconnect_address.h
index 5a78193abb..b19d751806 100644
--- a/library/cpp/actors/interconnect/interconnect_address.h
+++ b/library/cpp/actors/interconnect/interconnect_address.h
@@ -17,6 +17,8 @@ namespace NInterconnect {
TAddress();
TAddress(const char* addr, ui16 port);
TAddress(const TString& addr, ui16 port);
+ TAddress(in_addr addr, ui16 port);
+ TAddress(in6_addr addr, ui16 port);
TAddress(NAddr::IRemoteAddr& addr);
int GetFamily() const;
socklen_t Size() const;
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 78e114a574..116ccc2724 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -832,59 +832,66 @@ namespace NActors {
Now() + ResolveTimeout);
// extract address from the result
- NInterconnect::TAddress address;
+ std::vector<NInterconnect::TAddress> addresses;
if (!ev) {
ResolveTimedOut = true;
if (auto peerNodeInfo = GetPeerNodeInfo(); peerNodeInfo && peerNodeInfo->Address) {
- address = {peerNodeInfo->Address, peerNodeInfo->Port};
+ addresses.emplace_back(peerNodeInfo->Address, peerNodeInfo->Port);
} else {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out and no static address defined", true);
}
} else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) {
- if (!p->Address) {
+ addresses = std::move(p->Addresses);
+ if (addresses.empty()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
}
- address = {*p->Address};
} else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) {
const auto& r = p->Record;
if (!r.HasAddress() || !r.HasPort()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
}
- address = {r.GetAddress(), static_cast<ui16>(r.GetPort())};
+ addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort()));
} else {
Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError));
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain, true);
}
- // create the socket with matching address family
- Socket = NInterconnect::TStreamSocket::Make(address.GetFamily());
- if (*Socket == -1) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket");
- }
-
- // extract peer address
- if (updatePeerAddr) {
- PeerAddr = address.ToString();
- }
+ for (const NInterconnect::TAddress& address : addresses) {
+ // create the socket with matching address family
+ int err = 0;
+ Socket = NInterconnect::TStreamSocket::Make(address.GetFamily(), &err);
+ if (err == EAFNOSUPPORT) {
+ continue;
+ } else if (*Socket == -1) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket");
+ }
- // set up socket parameters
- SetupSocket();
+ // extract peer address
+ if (updatePeerAddr) {
+ PeerAddr = address.ToString();
+ }
- // start connecting
- switch (int err = -Socket->Connect(address)) {
- case 0: // successful connection
- break;
+ // set up socket parameters
+ SetupSocket();
- case EINPROGRESS: // connection in progress
+ // start connecting
+ err = -Socket->Connect(address);
+ if (err == EINPROGRESS) {
WaitPoller(false, true, "WaitConnect");
err = Socket->GetConnectStatus();
- if (err) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Connection failed: %s", strerror(err)), true);
- }
- break;
+ }
- default:
+ // check if connection succeeded
+ if (err) {
+ Socket.Reset();
+ PollerToken.Reset();
+ } else {
break;
+ }
+ }
+
+ if (!Socket) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Couldn't connect to any resolved address", true);
}
auto it = LastLogNotice.find(PeerNodeId);
diff --git a/library/cpp/actors/interconnect/interconnect_resolve.cpp b/library/cpp/actors/interconnect/interconnect_resolve.cpp
index 14296194df..502de446c7 100644
--- a/library/cpp/actors/interconnect/interconnect_resolve.cpp
+++ b/library/cpp/actors/interconnect/interconnect_resolve.cpp
@@ -41,7 +41,11 @@ namespace NActors {
void Bootstrap() {
TMaybe<TString> errorText;
if (auto addr = ExtractDefaultAddr(errorText)) {
- return SendAddrAndDie(std::move(addr));
+ if (NodeId) {
+ return SendLocalNodeInfoAndDie({{*addr}});
+ } else {
+ return SendAddressInfoAndDie(std::move(addr));
+ }
}
if (errorText) {
@@ -55,8 +59,10 @@ namespace NActors {
}
Send(MakeDnsResolverActorId(),
- new TEvDns::TEvGetAddr(Host, AF_UNSPEC),
- IEventHandle::FlagTrackDelivery);
+ NodeId
+ ? static_cast<IEventBase*>(new TEvDns::TEvGetHostByName(Host, AF_UNSPEC))
+ : static_cast<IEventBase*>(new TEvDns::TEvGetAddr(Host, AF_UNSPEC)),
+ IEventHandle::FlagTrackDelivery);
if (Deadline != TInstant::Max()) {
Schedule(Deadline, new TEvents::TEvWakeup);
@@ -69,6 +75,7 @@ namespace NActors {
sFunc(TEvents::TEvWakeup, HandleTimeout);
sFunc(TEvents::TEvUndelivered, HandleUndelivered);
hFunc(TEvDns::TEvGetAddrResult, Handle);
+ hFunc(TEvDns::TEvGetHostByNameResult, Handle);
});
void HandleTimeout() {
@@ -81,23 +88,40 @@ namespace NActors {
void Handle(TEvDns::TEvGetAddrResult::TPtr& ev) {
if (auto addr = ExtractAddr(ev->Get())) {
- return SendAddrAndDie(std::move(addr));
+ SendAddressInfoAndDie(std::move(addr));
+ } else {
+ SendErrorAndDie(ev->Get()->ErrorText);
}
-
- SendErrorAndDie(ev->Get()->ErrorText);
}
- void SendAddrAndDie(NAddr::IRemoteAddrPtr addr) {
- if (NodeId) {
- auto reply = new TEvLocalNodeInfo;
- reply->NodeId = *NodeId;
- reply->Address = std::move(addr);
- TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply));
+ void Handle(TEvDns::TEvGetHostByNameResult::TPtr& ev) {
+ auto& msg = *ev->Get();
+ if (msg.Status) {
+ SendErrorAndDie(msg.ErrorText);
} else {
- auto reply = new TEvAddressInfo;
- reply->Address = std::move(addr);
- TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply));
+ std::vector<NInterconnect::TAddress> addresses;
+ for (const auto& ipv6 : msg.AddrsV6) {
+ addresses.emplace_back(ipv6, Port);
+ }
+ for (const auto& ipv4 : msg.AddrsV4) {
+ addresses.emplace_back(ipv4, Port);
+ }
+ SendLocalNodeInfoAndDie(std::move(addresses));
}
+ }
+
+ void SendAddressInfoAndDie(NAddr::IRemoteAddrPtr addr) {
+ auto reply = new TEvAddressInfo;
+ reply->Address = std::move(addr);
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply));
+ PassAway();
+ }
+
+ void SendLocalNodeInfoAndDie(std::vector<NInterconnect::TAddress> addresses) {
+ auto reply = std::make_unique<TEvLocalNodeInfo>();
+ reply->NodeId = *NodeId;
+ reply->Addresses = std::move(addresses);
+ TActivationContext::Send(new IEventHandle(ReplyTo, ReplyFrom, reply.release()));
PassAway();
}
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 565a511859..9b47f5b592 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
- ch.Push(*ev);
+ ch.Push(*ev, TInstant::Now());
if (!wasWorking) {
scheduler.AddToHeap(ch, 0);
}
diff --git a/ydb/core/mind/node_broker_ut.cpp b/ydb/core/mind/node_broker_ut.cpp
index d3186b5a6f..6293413ef1 100644
--- a/ydb/core/mind/node_broker_ut.cpp
+++ b/ydb/core/mind/node_broker_ut.cpp
@@ -492,22 +492,6 @@ void CheckLeaseExtension(TTestActorRuntime &runtime,
}
}
-TString AddrToString(const struct sockaddr *sa)
-{
- char str[INET6_ADDRSTRLEN];
- switch(sa->sa_family) {
- case AF_INET:
- inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), str, INET6_ADDRSTRLEN);
- break;
- case AF_INET6:
- inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), str, INET6_ADDRSTRLEN);
- break;
- default:
- return "unknown";
- }
- return str;
-}
-
void CheckResolveNode(TTestActorRuntime &runtime,
TActorId sender,
ui32 nodeId,
@@ -521,7 +505,7 @@ void CheckResolveNode(TTestActorRuntime &runtime,
UNIT_ASSERT(reply);
UNIT_ASSERT_VALUES_EQUAL(reply->NodeId, nodeId);
- UNIT_ASSERT_VALUES_EQUAL(AddrToString(reply->Address->Addr()), addr);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Addresses[0].GetAddress(), addr);
}
void CheckResolveUnknownNode(TTestActorRuntime &runtime,
@@ -536,7 +520,7 @@ void CheckResolveUnknownNode(TTestActorRuntime &runtime,
UNIT_ASSERT(reply);
UNIT_ASSERT_VALUES_EQUAL(reply->NodeId, nodeId);
- UNIT_ASSERT(!reply->Address);
+ UNIT_ASSERT(reply->Addresses.empty());
}
void GetNameserverNodesList(TTestActorRuntime &runtime,