aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-11 17:47:06 +0300
committeralexvru <alexvru@ydb.tech>2023-04-11 17:47:06 +0300
commit1fcae74aa73236d42a80519af04e8f5d0ef2592a (patch)
treed33561771718a91dab06f2c7bbc46f9a41c4dad7 /library/cpp
parent8c676fc6282c40bc884f51ed1255e99deb6bffa9 (diff)
downloadydb-1fcae74aa73236d42a80519af04e8f5d0ef2592a.tar.gz
Prepare for XDC support
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp456
1 files changed, 232 insertions, 224 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index b0c4e79b323..797d7097494 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -80,6 +80,173 @@ namespace NActors {
#pragma pack(pop)
private:
+ class TConnection : TNonCopyable {
+ THandshakeActor *Actor = nullptr;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TPollerToken::TPtr PollerToken;
+
+ public:
+ TConnection(THandshakeActor *actor, TIntrusivePtr<NInterconnect::TStreamSocket> socket)
+ : Actor(actor)
+ , Socket(std::move(socket))
+ {}
+
+ void Connect(TString *peerAddr) {
+ // issue request to a nameservice to resolve peer node address
+ const auto mono = TActivationContext::Monotonic();
+ Actor->Send(Actor->Common->NameserviceId, new TEvInterconnect::TEvResolveNode(Actor->PeerNodeId,
+ TActivationContext::Now() + (Actor->Deadline - mono)));
+
+ // wait for the result
+ auto ev = Actor->WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>(
+ "ResolveNode", mono + ResolveTimeout);
+
+ // extract address from the result
+ std::vector<NInterconnect::TAddress> addresses;
+ if (!ev) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out", true);
+ } else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) {
+ addresses = std::move(p->Addresses);
+ if (addresses.empty()) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
+ }
+ } else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) {
+ const auto& r = p->Record;
+ if (!r.HasAddress() || !r.HasPort()) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
+ }
+ addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort()));
+ } else {
+ Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError));
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain
+ + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true);
+ }
+
+ for (const NInterconnect::TAddress& address : addresses) {
+ // create the socket with matching address family
+ int err = 0;
+ Socket = NInterconnect::TStreamSocket::Make(address.GetFamily(), &err);
+ if (err == EAFNOSUPPORT) {
+ Reset();
+ continue;
+ } else if (*Socket == -1) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket");
+ }
+
+ // extract peer address
+ if (peerAddr) {
+ *peerAddr = address.ToString();
+ }
+
+ // set up socket parameters
+ SetupSocket();
+
+ // start connecting
+ err = -Socket->Connect(address);
+ if (err == EINPROGRESS) {
+ RegisterInPoller();
+ WaitPoller(false, true, "WaitConnect");
+ err = Socket->GetConnectStatus();
+ } else if (!err) {
+ RegisterInPoller();
+ }
+
+ // check if connection succeeded
+ if (err) {
+ Reset();
+ } else {
+ break;
+ }
+ }
+
+ if (!Socket) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Couldn't connect to any resolved address", true);
+ }
+ }
+
+ void Reset() {
+ Socket.Reset();
+ PollerToken.Reset();
+ }
+
+ void SetupSocket() {
+ // switch to nonblocking mode
+ try {
+ SetNonBlock(*Socket);
+ SetNoDelay(*Socket, true);
+ } catch (...) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: can't up nonblocking mode for socket");
+ }
+
+ // setup send buffer size
+ Socket->SetSendBufferSize(Actor->Common->Settings.GetSendBufferSize());
+ }
+
+ void RegisterInPoller() {
+ Y_VERIFY(!PollerToken);
+ const bool success = Actor->Send(MakePollerActorId(), new TEvPollerRegister(Socket, Actor->SelfActorId, Actor->SelfActorId));
+ Y_VERIFY(success);
+ auto result = Actor->WaitForSpecificEvent<TEvPollerRegisterResult>("RegisterPoller");
+ PollerToken = std::move(result->Get()->PollerToken);
+ Y_VERIFY(PollerToken);
+ Y_VERIFY(PollerToken->RefCount() == 1); // ensure exclusive ownership
+ }
+
+ void WaitPoller(bool read, bool write, TString state) {
+ PollerToken->Request(read, write);
+ Actor->WaitForSpecificEvent<TEvPollerReady>(std::move(state));
+ }
+
+ template <typename TDataPtr, typename TSendRecvFunc>
+ void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) {
+ Y_VERIFY(Socket);
+ NInterconnect::TStreamSocket* sock = Socket.Get();
+ ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv;
+ size_t processed = 0;
+
+ auto error = [&](TString msg) {
+ Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT,
+ Sprintf("Socket error# %s state# %s processed# %zu remain# %zu",
+ msg.data(), state.data(), processed, len), true);
+ };
+
+ while (len) {
+ TString err;
+ ssize_t nbytes = (sock->*pfn)(buffer, len, &err);
+ if (nbytes > 0) {
+ buffer = (char*)buffer + nbytes;
+ len -= nbytes;
+ processed += nbytes;
+ } else if (-nbytes == EAGAIN || -nbytes == EWOULDBLOCK) {
+ WaitPoller(read, write, state);
+ } else if (!nbytes) {
+ error("connection unexpectedly closed");
+ } else if (-nbytes != EINTR) {
+ error(err ? err : TString(strerror(-nbytes)));
+ }
+ }
+ }
+
+ void SendData(const void* buffer, size_t len, TString state) {
+ Process(buffer, len, &NInterconnect::TStreamSocket::Send, false, true, std::move(state));
+ }
+
+ void ReceiveData(void* buffer, size_t len, TString state) {
+ Process(buffer, len, &NInterconnect::TStreamSocket::Recv, true, false, std::move(state));
+ }
+
+ void ResetPollerToken() {
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ }
+ }
+
+ TIntrusivePtr<NInterconnect::TStreamSocket>& GetSocketRef() { return Socket; }
+ operator bool() const { return static_cast<bool>(Socket); }
+ };
+
+ private:
TInterconnectProxyCommon::TPtr Common;
TActorId SelfVirtualId;
TActorId PeerVirtualId;
@@ -88,14 +255,12 @@ namespace NActors {
TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet
TString PeerHostName;
TString PeerAddr;
- TSocketPtr Socket;
- TPollerToken::TPtr PollerToken;
+ TConnection MainChannel;
TString State;
TString HandshakeKind;
TMaybe<THolder<TProgramInfo>> ProgramInfo; // filled in in case of successful handshake; even if null
TSessionParams Params;
- bool ResolveTimedOut = false;
- THashMap<ui32, TInstant> LastLogNotice;
+ std::optional<TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
TMonotonic Deadline;
TActorId HandshakeBroker;
@@ -111,6 +276,7 @@ namespace NActors {
, PeerNodeId(nodeId)
, NextPacketToPeer(nextPacket)
, PeerHostName(std::move(peerHostName))
+ , MainChannel(this, nullptr)
, HandshakeKind("outgoing handshake")
, Params(std::move(params))
{
@@ -123,12 +289,12 @@ namespace NActors {
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
: TActorCoroImpl(StackSize, true)
, Common(std::move(common))
- , Socket(std::move(socket))
+ , MainChannel(this, std::move(socket))
, HandshakeKind("incoming handshake")
{
- Y_VERIFY(Socket);
+ Y_VERIFY(MainChannel);
PeerAddr = TString::Uninitialized(1024);
- if (GetRemoteAddr(*Socket, PeerAddr.Detach(), PeerAddr.size())) {
+ if (GetRemoteAddr(*MainChannel.GetSocketRef(), PeerAddr.Detach(), PeerAddr.size())) {
PeerAddr.resize(strlen(PeerAddr.data()));
} else {
PeerAddr.clear();
@@ -157,7 +323,7 @@ namespace NActors {
void RunImpl() {
UpdatePrefix();
- if (!Socket && Common->OutgoingHandshakeInflightLimit) {
+ if (!MainChannel && Common->OutgoingHandshakeInflightLimit) {
// Create holder, which sends request to broker and automatically frees the place when destroyed
BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker);
}
@@ -173,7 +339,7 @@ namespace NActors {
}
timeout += ResolveTimeout * 2;
- if (Socket) {
+ if (MainChannel) {
// Incoming handshakes have shorter timeout than outgoing
timeout *= 0.9;
}
@@ -182,7 +348,7 @@ namespace NActors {
Schedule(Deadline, new TEvents::TEvWakeup);
try {
- if (Socket) {
+ if (MainChannel) {
PerformIncomingHandshake();
} else {
PerformOutgoingHandshake();
@@ -191,7 +357,7 @@ namespace NActors {
// establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
if (ProgramInfo) {
if (Params.Encryption) {
- EstablishSecureConnection();
+ EstablishSecureConnection(MainChannel);
} else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
}
@@ -203,36 +369,35 @@ namespace NActors {
if (ProgramInfo) {
LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
- }
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ MainChannel.ResetPollerToken();
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(MainChannel.GetSocketRef()), PeerVirtualId, SelfVirtualId,
*NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
- Socket.Reset();
+ MainChannel.Reset();
}
- void EstablishSecureConnection() {
- Y_VERIFY(PollerToken && PollerToken->RefCount() == 1);
- PollerToken.Reset();
- auto ev = AskProxy<TEvSecureSocket>(MakeHolder<TEvGetSecureSocket>(Socket), "AskProxy(TEvSecureContext)");
- Socket = std::move(ev->Get()->Socket);
- RegisterInPoller();
+ void EstablishSecureConnection(TConnection& connection) {
+ // wrap current socket with secure one
+ connection.ResetPollerToken();
+ TIntrusivePtr<NInterconnect::TStreamSocket>& socketRef = connection.GetSocketRef();
+ auto ev = AskProxy<TEvSecureSocket>(MakeHolder<TEvGetSecureSocket>(socketRef), "AskProxy(TEvSecureContext)");
+ TIntrusivePtr<NInterconnect::TSecureSocket> secure = std::move(ev->Get()->Socket); // remember for further use
+ socketRef = secure; // replace the socket within the connection
+ connection.RegisterInPoller(); // re-register in poller
+
const ui32 myNodeId = GetActorSystem()->NodeId;
const bool server = myNodeId < PeerNodeId; // keep server/client role permanent to enable easy TLS session resuming
for (;;) {
TString err;
- auto& secure = static_cast<NInterconnect::TSecureSocket&>(*Socket);
- switch (secure.Establish(server, Params.AuthOnly, err)) {
+ switch (secure->Establish(server, Params.AuthOnly, err)) {
case NInterconnect::TSecureSocket::EStatus::SUCCESS:
if (Params.AuthOnly) {
Params.Encryption = false;
- Params.AuthCN = secure.GetPeerCommonName();
- Y_VERIFY(PollerToken && PollerToken->RefCount() == 1);
- PollerToken.Reset();
- Socket = secure.Detach();
+ Params.AuthCN = secure->GetPeerCommonName();
+ connection.ResetPollerToken();
+ socketRef = secure->Detach();
+ connection.RegisterInPoller();
}
return;
@@ -241,11 +406,11 @@ namespace NActors {
[[fallthrough]];
case NInterconnect::TSecureSocket::EStatus::WANT_READ:
- WaitPoller(true, false, "ReadEstablish");
+ connection.WaitPoller(true, false, "ReadEstablish");
break;
case NInterconnect::TSecureSocket::EStatus::WANT_WRITE:
- WaitPoller(false, true, "WriteEstablish");
+ connection.WaitPoller(false, true, "WriteEstablish");
break;
}
}
@@ -437,12 +602,6 @@ namespace NActors {
}
bool CheckPeerCookie(const TString& cookie, TString *error) {
- // create a temporary socket to connect to the peer
- TSocketPtr tempSocket;
- std::swap(tempSocket, Socket);
- TPollerToken::TPtr tempPollerToken;
- std::swap(tempPollerToken, PollerToken);
-
// set up virtual self id to ensure peer will not drop our connection
char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'};
SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12));
@@ -450,12 +609,13 @@ namespace NActors {
bool success = true;
try {
// issue connection and send initial packet
- Connect(false);
- SendInitialPacket();
+ TConnection tempConnection(this, nullptr);
+ tempConnection.Connect(nullptr);
+ SendInitialPacket(tempConnection);
// wait for basic response
TInitialPacket response;
- ReceiveData(&response, sizeof(response), "ReceiveResponse");
+ tempConnection.ReceiveData(&response, sizeof(response), "ReceiveResponse");
if (!response.Check()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
} else if (response.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
@@ -472,11 +632,11 @@ namespace NActors {
request.SetSenderActorId(TString());
request.SetCookie(cookie);
request.SetDoCheckCookie(true);
- SendExBlock(request, "SendExBlockDoCheckCookie");
+ SendExBlock(tempConnection, request, "ExBlockDoCheckCookie");
// process cookie check reply
NActorsInterconnect::THandshakeReply reply;
- if (!reply.ParseFromString(ReceiveExBlock("ReceiveExBlockDoCheckCookie"))) {
+ if (!reply.ParseFromString(ReceiveExBlock(tempConnection, "ExBlockDoCheckCookie"))) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer");
} else if (reply.HasCookieCheckResult() && !reply.GetCookieCheckResult()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Cookie check error -- possible network problem");
@@ -488,8 +648,6 @@ namespace NActors {
// restore state
SelfVirtualId = TActorId();
- std::swap(tempSocket, Socket);
- std::swap(tempPollerToken, PollerToken);
return success;
}
@@ -497,14 +655,16 @@ namespace NActors {
LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH01", NLog::PRI_DEBUG,
"starting outgoing handshake");
- // perform connection
- Connect(true);
+ // perform connection and log its result
+ MainChannel.Connect(&PeerAddr);
+ auto logPriority = std::exchange(LastLogNotice, std::nullopt) ? NActors::NLog::PRI_NOTICE : NActors::NLog::PRI_DEBUG;
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer");
// send initial request packet
- SendInitialPacket();
+ SendInitialPacket(MainChannel);
TInitialPacket response;
- ReceiveData(&response, sizeof(response), "ReceiveResponse");
+ MainChannel.ReceiveData(&response, sizeof(response), "ReceiveResponse");
if (!response.Check()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
} else if (response.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
@@ -568,10 +728,10 @@ namespace NActors {
request.SetRequestExtendedTraceFmt(true);
request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel);
- SendExBlock(request, "ExRequest");
+ SendExBlock(MainChannel, request, "ExRequest");
NActorsInterconnect::THandshakeReply reply;
- if (!reply.ParseFromString(ReceiveExBlock("ExReply"))) {
+ if (!reply.ParseFromString(ReceiveExBlock(MainChannel, "ExReply"))) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect THandshakeReply");
}
ReportProto(reply, "ReceiveExBlock ExReply");
@@ -621,12 +781,12 @@ namespace NActors {
"starting incoming handshake");
// set up incoming socket
- SetupSocket();
- RegisterInPoller();
+ MainChannel.SetupSocket();
+ MainChannel.RegisterInPoller();
// wait for initial request packet
TInitialPacket request;
- ReceiveData(&request, sizeof(request), "ReceiveRequest");
+ MainChannel.ReceiveData(&request, sizeof(request), "ReceiveRequest");
if (!request.Check()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
} else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
@@ -670,18 +830,18 @@ namespace NActors {
}
// issue response to the peer
- SendInitialPacket();
+ SendInitialPacket(MainChannel);
} else {
// peer wants a new session, clear fields and send initial packet
SelfVirtualId = TActorId();
PeerVirtualId = TActorId();
NextPacketToPeer = 0;
- SendInitialPacket();
+ SendInitialPacket(MainChannel);
// wait for extended request
auto ev = MakeHolder<TEvHandshakeRequest>();
auto& request = ev->Record;
- if (!request.ParseFromString(ReceiveExBlock("ExRequest"))) {
+ if (!request.ParseFromString(ReceiveExBlock(MainChannel, "ExRequest"))) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect THandshakeRequest");
}
ReportProto(request, "ReceiveExBlock ExRequest");
@@ -690,7 +850,7 @@ namespace NActors {
// issue reply to the peer to prevent repeating connection retries
NActorsInterconnect::THandshakeReply reply;
reply.SetErrorExplaination(msg);
- SendExBlock(reply, "ExReply");
+ SendExBlock(MainChannel, reply, "ExReply");
// terminate ths handshake
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, msg);
@@ -701,7 +861,7 @@ namespace NActors {
if (request.HasDoCheckCookie()) {
NActorsInterconnect::THandshakeReply reply;
reply.SetCookieCheckResult(request.GetCookie() == Common->Cookie);
- SendExBlock(reply, "ExReplyDoCheckCookie");
+ SendExBlock(MainChannel, reply, "ExReplyDoCheckCookie");
throw TExHandshakeFailed();
} else if (request.HasCookie() && !CheckPeerCookie(request.GetCookie(), &error)) {
generateError(TStringBuilder() << "Peer connectivity-checking failed, error# " << error);
@@ -785,14 +945,14 @@ namespace NActors {
success.SetAuthOnly(Params.AuthOnly);
success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt);
success.SetUseExternalDataChannel(Params.UseExternalDataChannel);
- SendExBlock(record, "ExReply");
+ SendExBlock(MainChannel, record, "ExReply");
// extract sender actor id (self virtual id)
const auto& str = success.GetSenderActorId();
SelfVirtualId.Parse(str.data(), str.size());
} else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) {
// in case of error just send reply to the peer and terminate handshake
- SendExBlock(ev->Record, "ExReply");
+ SendExBlock(MainChannel, ev->Record, "ExReply");
ProgramInfo.Clear(); // do not issue reply to the proxy
} else {
Y_FAIL("unexpected event Type# 0x%08" PRIx32, reply->GetTypeRewrite());
@@ -801,7 +961,7 @@ namespace NActors {
}
template <typename T>
- void SendExBlock(const T& proto, const char* what) {
+ void SendExBlock(TConnection& connection, const T& proto, const char* what) {
TString data;
Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&data);
Y_VERIFY(data.size() <= TExHeader::MaxSize);
@@ -811,20 +971,20 @@ namespace NActors {
TExHeader header;
header.Size = data.size();
header.Sign(data.data(), data.size());
- SendData(&header, sizeof(header), Sprintf("Send%sHeader", what));
- SendData(data.data(), data.size(), Sprintf("Send%sData", what));
+ connection.SendData(&header, sizeof(header), Sprintf("Send%sHeader", what));
+ connection.SendData(data.data(), data.size(), Sprintf("Send%sData", what));
}
- TString ReceiveExBlock(const char* what) {
+ TString ReceiveExBlock(TConnection& connection, const char* what) {
TExHeader header;
- ReceiveData(&header, sizeof(header), Sprintf("Receive%sHeader", what));
+ connection.ReceiveData(&header, sizeof(header), Sprintf("Receive%sHeader", what));
if (header.Size > TExHeader::MaxSize) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect extended header size");
}
TString data;
data.resize(header.Size);
- ReceiveData(data.Detach(), data.size(), Sprintf("Receive%sData", what));
+ connection.ReceiveData(data.Detach(), data.size(), Sprintf("Receive%sData", what));
if (!header.Check(data.data(), data.size())) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Extended header CRC error");
@@ -864,17 +1024,15 @@ namespace NActors {
}
void Fail(TEvHandshakeFail::EnumHandshakeFail reason, TString explanation, bool network = false) {
- TString msg = Sprintf("%s Peer# %s(%s) %s%s", HandshakeKind.data(), PeerHostName ? PeerHostName.data() : "<unknown>",
- PeerAddr.size() ? PeerAddr.data() : "<unknown>", ResolveTimedOut ? "[resolve timeout] " : "",
- explanation.data());
+ TString msg = Sprintf("%s Peer# %s(%s) %s", HandshakeKind.data(), PeerHostName ? PeerHostName.data() : "<unknown>",
+ PeerAddr.size() ? PeerAddr.data() : "<unknown>", explanation.data());
if (network) {
TInstant now = Now();
- TInstant prevLog = LastLogNotice[PeerNodeId];
NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG;
- if (now - prevLog > MuteDuration) {
+ if (!LastLogNotice || now - *LastLogNotice > MuteDuration) {
logPriority = NActors::NLog::PRI_NOTICE;
- LastLogNotice[PeerNodeId] = now;
+ LastLogNotice.emplace(now);
}
LOG_LOG_NET_X(logPriority, PeerNodeId, "network-related error occured on handshake: %s", msg.data());
} else {
@@ -898,159 +1056,9 @@ namespace NActors {
// COMMUNICATION BLOCK
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- void Connect(bool updatePeerAddr) {
- // issue request to a nameservice to resolve peer node address
- const auto mono = TActivationContext::Monotonic();
- Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono)));
-
- // wait for the result
- auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode",
- mono + ResolveTimeout);
-
- // extract address from the result
- std::vector<NInterconnect::TAddress> addresses;
- if (!ev) {
- ResolveTimedOut = true;
- if (auto peerNodeInfo = GetPeerNodeInfo(); peerNodeInfo && peerNodeInfo->Address) {
- addresses.emplace_back(peerNodeInfo->Address, peerNodeInfo->Port);
- } else {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out and no static address defined", true);
- }
- } else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) {
- addresses = std::move(p->Addresses);
- if (addresses.empty()) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
- }
- } else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) {
- const auto& r = p->Record;
- if (!r.HasAddress() || !r.HasPort()) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true);
- }
- addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort()));
- } else {
- Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError));
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain
- + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true);
- }
-
- for (const NInterconnect::TAddress& address : addresses) {
- // create the socket with matching address family
- int err = 0;
- Socket = NInterconnect::TStreamSocket::Make(address.GetFamily(), &err);
- if (err == EAFNOSUPPORT) {
- Socket.Reset();
- continue;
- } else if (*Socket == -1) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket");
- }
-
- // extract peer address
- if (updatePeerAddr) {
- PeerAddr = address.ToString();
- }
-
- // set up socket parameters
- SetupSocket();
-
- // start connecting
- err = -Socket->Connect(address);
- if (err == EINPROGRESS) {
- RegisterInPoller();
- WaitPoller(false, true, "WaitConnect");
- err = Socket->GetConnectStatus();
- } else if (!err) {
- RegisterInPoller();
- }
-
- // check if connection succeeded
- if (err) {
- Socket.Reset();
- PollerToken.Reset();
- } else {
- break;
- }
- }
-
- if (!Socket) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Couldn't connect to any resolved address", true);
- }
-
- auto it = LastLogNotice.find(PeerNodeId);
- NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG;
- if (it != LastLogNotice.end()) {
- LastLogNotice.erase(it);
- logPriority = NActors::NLog::PRI_NOTICE;
- }
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer");
- }
-
- void SetupSocket() {
- // switch to nonblocking mode
- try {
- SetNonBlock(*Socket);
- SetNoDelay(*Socket, true);
- } catch (...) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: can't up nonblocking mode for socket");
- }
-
- // setup send buffer size
- Socket->SetSendBufferSize(Common->Settings.GetSendBufferSize());
- }
-
- void RegisterInPoller() {
- const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, SelfActorId, SelfActorId));
- Y_VERIFY(success);
- auto result = WaitForSpecificEvent<TEvPollerRegisterResult>("RegisterPoller");
- PollerToken = std::move(result->Get()->PollerToken);
- Y_VERIFY(PollerToken);
- Y_VERIFY(PollerToken->RefCount() == 1); // ensure exclusive ownership
- }
-
- void SendInitialPacket() {
+ void SendInitialPacket(TConnection& connection) {
TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_PROTOCOL_VERSION);
- SendData(&packet, sizeof(packet), "SendInitialPacket");
- }
-
- void WaitPoller(bool read, bool write, TString state) {
- PollerToken->Request(read, write);
- WaitForSpecificEvent<TEvPollerReady>(std::move(state));
- }
-
- template <typename TDataPtr, typename TSendRecvFunc>
- void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) {
- Y_VERIFY(Socket);
- NInterconnect::TStreamSocket* sock = Socket.Get();
- ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv;
- size_t processed = 0;
-
- auto error = [&](TString msg) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Socket error# %s state# %s processed# %zu remain# %zu",
- msg.data(), state.data(), processed, len), true);
- };
-
- while (len) {
- TString err;
- ssize_t nbytes = (sock->*pfn)(buffer, len, &err);
- if (nbytes > 0) {
- buffer = (char*)buffer + nbytes;
- len -= nbytes;
- processed += nbytes;
- } else if (-nbytes == EAGAIN || -nbytes == EWOULDBLOCK) {
- WaitPoller(read, write, state);
- } else if (!nbytes) {
- error("connection unexpectedly closed");
- } else if (-nbytes != EINTR) {
- error(err ? err : TString(strerror(-nbytes)));
- }
- }
- }
-
- void SendData(const void* buffer, size_t len, TString state) {
- Process(buffer, len, &NInterconnect::TStreamSocket::Send, false, true, std::move(state));
- }
-
- void ReceiveData(void* buffer, size_t len, TString state) {
- Process(buffer, len, &NInterconnect::TStreamSocket::Recv, true, false, std::move(state));
+ connection.SendData(&packet, sizeof(packet), "SendInitialPacket");
}
THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() {