diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-11 17:47:06 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-11 17:47:06 +0300 |
commit | 1fcae74aa73236d42a80519af04e8f5d0ef2592a (patch) | |
tree | d33561771718a91dab06f2c7bbc46f9a41c4dad7 /library/cpp | |
parent | 8c676fc6282c40bc884f51ed1255e99deb6bffa9 (diff) | |
download | ydb-1fcae74aa73236d42a80519af04e8f5d0ef2592a.tar.gz |
Prepare for XDC support
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_handshake.cpp | 456 |
1 files changed, 232 insertions, 224 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index b0c4e79b323..797d7097494 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -80,6 +80,173 @@ namespace NActors { #pragma pack(pop) private: + class TConnection : TNonCopyable { + THandshakeActor *Actor = nullptr; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TPollerToken::TPtr PollerToken; + + public: + TConnection(THandshakeActor *actor, TIntrusivePtr<NInterconnect::TStreamSocket> socket) + : Actor(actor) + , Socket(std::move(socket)) + {} + + void Connect(TString *peerAddr) { + // issue request to a nameservice to resolve peer node address + const auto mono = TActivationContext::Monotonic(); + Actor->Send(Actor->Common->NameserviceId, new TEvInterconnect::TEvResolveNode(Actor->PeerNodeId, + TActivationContext::Now() + (Actor->Deadline - mono))); + + // wait for the result + auto ev = Actor->WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>( + "ResolveNode", mono + ResolveTimeout); + + // extract address from the result + std::vector<NInterconnect::TAddress> addresses; + if (!ev) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out", true); + } else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) { + addresses = std::move(p->Addresses); + if (addresses.empty()) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true); + } + } else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) { + const auto& r = p->Record; + if (!r.HasAddress() || !r.HasPort()) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true); + } + addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort())); + } else { + Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError)); + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain + + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true); + } + + for (const NInterconnect::TAddress& address : addresses) { + // create the socket with matching address family + int err = 0; + Socket = NInterconnect::TStreamSocket::Make(address.GetFamily(), &err); + if (err == EAFNOSUPPORT) { + Reset(); + continue; + } else if (*Socket == -1) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket"); + } + + // extract peer address + if (peerAddr) { + *peerAddr = address.ToString(); + } + + // set up socket parameters + SetupSocket(); + + // start connecting + err = -Socket->Connect(address); + if (err == EINPROGRESS) { + RegisterInPoller(); + WaitPoller(false, true, "WaitConnect"); + err = Socket->GetConnectStatus(); + } else if (!err) { + RegisterInPoller(); + } + + // check if connection succeeded + if (err) { + Reset(); + } else { + break; + } + } + + if (!Socket) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Couldn't connect to any resolved address", true); + } + } + + void Reset() { + Socket.Reset(); + PollerToken.Reset(); + } + + void SetupSocket() { + // switch to nonblocking mode + try { + SetNonBlock(*Socket); + SetNoDelay(*Socket, true); + } catch (...) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: can't up nonblocking mode for socket"); + } + + // setup send buffer size + Socket->SetSendBufferSize(Actor->Common->Settings.GetSendBufferSize()); + } + + void RegisterInPoller() { + Y_VERIFY(!PollerToken); + const bool success = Actor->Send(MakePollerActorId(), new TEvPollerRegister(Socket, Actor->SelfActorId, Actor->SelfActorId)); + Y_VERIFY(success); + auto result = Actor->WaitForSpecificEvent<TEvPollerRegisterResult>("RegisterPoller"); + PollerToken = std::move(result->Get()->PollerToken); + Y_VERIFY(PollerToken); + Y_VERIFY(PollerToken->RefCount() == 1); // ensure exclusive ownership + } + + void WaitPoller(bool read, bool write, TString state) { + PollerToken->Request(read, write); + Actor->WaitForSpecificEvent<TEvPollerReady>(std::move(state)); + } + + template <typename TDataPtr, typename TSendRecvFunc> + void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) { + Y_VERIFY(Socket); + NInterconnect::TStreamSocket* sock = Socket.Get(); + ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv; + size_t processed = 0; + + auto error = [&](TString msg) { + Actor->Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, + Sprintf("Socket error# %s state# %s processed# %zu remain# %zu", + msg.data(), state.data(), processed, len), true); + }; + + while (len) { + TString err; + ssize_t nbytes = (sock->*pfn)(buffer, len, &err); + if (nbytes > 0) { + buffer = (char*)buffer + nbytes; + len -= nbytes; + processed += nbytes; + } else if (-nbytes == EAGAIN || -nbytes == EWOULDBLOCK) { + WaitPoller(read, write, state); + } else if (!nbytes) { + error("connection unexpectedly closed"); + } else if (-nbytes != EINTR) { + error(err ? err : TString(strerror(-nbytes))); + } + } + } + + void SendData(const void* buffer, size_t len, TString state) { + Process(buffer, len, &NInterconnect::TStreamSocket::Send, false, true, std::move(state)); + } + + void ReceiveData(void* buffer, size_t len, TString state) { + Process(buffer, len, &NInterconnect::TStreamSocket::Recv, true, false, std::move(state)); + } + + void ResetPollerToken() { + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + } + } + + TIntrusivePtr<NInterconnect::TStreamSocket>& GetSocketRef() { return Socket; } + operator bool() const { return static_cast<bool>(Socket); } + }; + + private: TInterconnectProxyCommon::TPtr Common; TActorId SelfVirtualId; TActorId PeerVirtualId; @@ -88,14 +255,12 @@ namespace NActors { TMaybe<ui64> NextPacketFromPeer; // will be obtained from incoming initial packet TString PeerHostName; TString PeerAddr; - TSocketPtr Socket; - TPollerToken::TPtr PollerToken; + TConnection MainChannel; TString State; TString HandshakeKind; TMaybe<THolder<TProgramInfo>> ProgramInfo; // filled in in case of successful handshake; even if null TSessionParams Params; - bool ResolveTimedOut = false; - THashMap<ui32, TInstant> LastLogNotice; + std::optional<TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TMonotonic Deadline; TActorId HandshakeBroker; @@ -111,6 +276,7 @@ namespace NActors { , PeerNodeId(nodeId) , NextPacketToPeer(nextPacket) , PeerHostName(std::move(peerHostName)) + , MainChannel(this, nullptr) , HandshakeKind("outgoing handshake") , Params(std::move(params)) { @@ -123,12 +289,12 @@ namespace NActors { THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) : TActorCoroImpl(StackSize, true) , Common(std::move(common)) - , Socket(std::move(socket)) + , MainChannel(this, std::move(socket)) , HandshakeKind("incoming handshake") { - Y_VERIFY(Socket); + Y_VERIFY(MainChannel); PeerAddr = TString::Uninitialized(1024); - if (GetRemoteAddr(*Socket, PeerAddr.Detach(), PeerAddr.size())) { + if (GetRemoteAddr(*MainChannel.GetSocketRef(), PeerAddr.Detach(), PeerAddr.size())) { PeerAddr.resize(strlen(PeerAddr.data())); } else { PeerAddr.clear(); @@ -157,7 +323,7 @@ namespace NActors { void RunImpl() { UpdatePrefix(); - if (!Socket && Common->OutgoingHandshakeInflightLimit) { + if (!MainChannel && Common->OutgoingHandshakeInflightLimit) { // Create holder, which sends request to broker and automatically frees the place when destroyed BrokerLeaseHolder.emplace(SelfActorId, HandshakeBroker); } @@ -173,7 +339,7 @@ namespace NActors { } timeout += ResolveTimeout * 2; - if (Socket) { + if (MainChannel) { // Incoming handshakes have shorter timeout than outgoing timeout *= 0.9; } @@ -182,7 +348,7 @@ namespace NActors { Schedule(Deadline, new TEvents::TEvWakeup); try { - if (Socket) { + if (MainChannel) { PerformIncomingHandshake(); } else { PerformOutgoingHandshake(); @@ -191,7 +357,7 @@ namespace NActors { // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings if (ProgramInfo) { if (Params.Encryption) { - EstablishSecureConnection(); + EstablishSecureConnection(MainChannel); } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); } @@ -203,36 +369,35 @@ namespace NActors { if (ProgramInfo) { LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor - } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + MainChannel.ResetPollerToken(); + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(MainChannel.GetSocketRef()), PeerVirtualId, SelfVirtualId, *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } - Socket.Reset(); + MainChannel.Reset(); } - void EstablishSecureConnection() { - Y_VERIFY(PollerToken && PollerToken->RefCount() == 1); - PollerToken.Reset(); - auto ev = AskProxy<TEvSecureSocket>(MakeHolder<TEvGetSecureSocket>(Socket), "AskProxy(TEvSecureContext)"); - Socket = std::move(ev->Get()->Socket); - RegisterInPoller(); + void EstablishSecureConnection(TConnection& connection) { + // wrap current socket with secure one + connection.ResetPollerToken(); + TIntrusivePtr<NInterconnect::TStreamSocket>& socketRef = connection.GetSocketRef(); + auto ev = AskProxy<TEvSecureSocket>(MakeHolder<TEvGetSecureSocket>(socketRef), "AskProxy(TEvSecureContext)"); + TIntrusivePtr<NInterconnect::TSecureSocket> secure = std::move(ev->Get()->Socket); // remember for further use + socketRef = secure; // replace the socket within the connection + connection.RegisterInPoller(); // re-register in poller + const ui32 myNodeId = GetActorSystem()->NodeId; const bool server = myNodeId < PeerNodeId; // keep server/client role permanent to enable easy TLS session resuming for (;;) { TString err; - auto& secure = static_cast<NInterconnect::TSecureSocket&>(*Socket); - switch (secure.Establish(server, Params.AuthOnly, err)) { + switch (secure->Establish(server, Params.AuthOnly, err)) { case NInterconnect::TSecureSocket::EStatus::SUCCESS: if (Params.AuthOnly) { Params.Encryption = false; - Params.AuthCN = secure.GetPeerCommonName(); - Y_VERIFY(PollerToken && PollerToken->RefCount() == 1); - PollerToken.Reset(); - Socket = secure.Detach(); + Params.AuthCN = secure->GetPeerCommonName(); + connection.ResetPollerToken(); + socketRef = secure->Detach(); + connection.RegisterInPoller(); } return; @@ -241,11 +406,11 @@ namespace NActors { [[fallthrough]]; case NInterconnect::TSecureSocket::EStatus::WANT_READ: - WaitPoller(true, false, "ReadEstablish"); + connection.WaitPoller(true, false, "ReadEstablish"); break; case NInterconnect::TSecureSocket::EStatus::WANT_WRITE: - WaitPoller(false, true, "WriteEstablish"); + connection.WaitPoller(false, true, "WriteEstablish"); break; } } @@ -437,12 +602,6 @@ namespace NActors { } bool CheckPeerCookie(const TString& cookie, TString *error) { - // create a temporary socket to connect to the peer - TSocketPtr tempSocket; - std::swap(tempSocket, Socket); - TPollerToken::TPtr tempPollerToken; - std::swap(tempPollerToken, PollerToken); - // set up virtual self id to ensure peer will not drop our connection char buf[12] = {'c', 'o', 'o', 'k', 'i', 'e', ' ', 'c', 'h', 'e', 'c', 'k'}; SelfVirtualId = TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)); @@ -450,12 +609,13 @@ namespace NActors { bool success = true; try { // issue connection and send initial packet - Connect(false); - SendInitialPacket(); + TConnection tempConnection(this, nullptr); + tempConnection.Connect(nullptr); + SendInitialPacket(tempConnection); // wait for basic response TInitialPacket response; - ReceiveData(&response, sizeof(response), "ReceiveResponse"); + tempConnection.ReceiveData(&response, sizeof(response), "ReceiveResponse"); if (!response.Check()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error"); } else if (response.Header.Version != INTERCONNECT_PROTOCOL_VERSION) { @@ -472,11 +632,11 @@ namespace NActors { request.SetSenderActorId(TString()); request.SetCookie(cookie); request.SetDoCheckCookie(true); - SendExBlock(request, "SendExBlockDoCheckCookie"); + SendExBlock(tempConnection, request, "ExBlockDoCheckCookie"); // process cookie check reply NActorsInterconnect::THandshakeReply reply; - if (!reply.ParseFromString(ReceiveExBlock("ReceiveExBlockDoCheckCookie"))) { + if (!reply.ParseFromString(ReceiveExBlock(tempConnection, "ExBlockDoCheckCookie"))) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer"); } else if (reply.HasCookieCheckResult() && !reply.GetCookieCheckResult()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Cookie check error -- possible network problem"); @@ -488,8 +648,6 @@ namespace NActors { // restore state SelfVirtualId = TActorId(); - std::swap(tempSocket, Socket); - std::swap(tempPollerToken, PollerToken); return success; } @@ -497,14 +655,16 @@ namespace NActors { LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH01", NLog::PRI_DEBUG, "starting outgoing handshake"); - // perform connection - Connect(true); + // perform connection and log its result + MainChannel.Connect(&PeerAddr); + auto logPriority = std::exchange(LastLogNotice, std::nullopt) ? NActors::NLog::PRI_NOTICE : NActors::NLog::PRI_DEBUG; + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer"); // send initial request packet - SendInitialPacket(); + SendInitialPacket(MainChannel); TInitialPacket response; - ReceiveData(&response, sizeof(response), "ReceiveResponse"); + MainChannel.ReceiveData(&response, sizeof(response), "ReceiveResponse"); if (!response.Check()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error"); } else if (response.Header.Version != INTERCONNECT_PROTOCOL_VERSION) { @@ -568,10 +728,10 @@ namespace NActors { request.SetRequestExtendedTraceFmt(true); request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel); - SendExBlock(request, "ExRequest"); + SendExBlock(MainChannel, request, "ExRequest"); NActorsInterconnect::THandshakeReply reply; - if (!reply.ParseFromString(ReceiveExBlock("ExReply"))) { + if (!reply.ParseFromString(ReceiveExBlock(MainChannel, "ExReply"))) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect THandshakeReply"); } ReportProto(reply, "ReceiveExBlock ExReply"); @@ -621,12 +781,12 @@ namespace NActors { "starting incoming handshake"); // set up incoming socket - SetupSocket(); - RegisterInPoller(); + MainChannel.SetupSocket(); + MainChannel.RegisterInPoller(); // wait for initial request packet TInitialPacket request; - ReceiveData(&request, sizeof(request), "ReceiveRequest"); + MainChannel.ReceiveData(&request, sizeof(request), "ReceiveRequest"); if (!request.Check()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error"); } else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION) { @@ -670,18 +830,18 @@ namespace NActors { } // issue response to the peer - SendInitialPacket(); + SendInitialPacket(MainChannel); } else { // peer wants a new session, clear fields and send initial packet SelfVirtualId = TActorId(); PeerVirtualId = TActorId(); NextPacketToPeer = 0; - SendInitialPacket(); + SendInitialPacket(MainChannel); // wait for extended request auto ev = MakeHolder<TEvHandshakeRequest>(); auto& request = ev->Record; - if (!request.ParseFromString(ReceiveExBlock("ExRequest"))) { + if (!request.ParseFromString(ReceiveExBlock(MainChannel, "ExRequest"))) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect THandshakeRequest"); } ReportProto(request, "ReceiveExBlock ExRequest"); @@ -690,7 +850,7 @@ namespace NActors { // issue reply to the peer to prevent repeating connection retries NActorsInterconnect::THandshakeReply reply; reply.SetErrorExplaination(msg); - SendExBlock(reply, "ExReply"); + SendExBlock(MainChannel, reply, "ExReply"); // terminate ths handshake Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, msg); @@ -701,7 +861,7 @@ namespace NActors { if (request.HasDoCheckCookie()) { NActorsInterconnect::THandshakeReply reply; reply.SetCookieCheckResult(request.GetCookie() == Common->Cookie); - SendExBlock(reply, "ExReplyDoCheckCookie"); + SendExBlock(MainChannel, reply, "ExReplyDoCheckCookie"); throw TExHandshakeFailed(); } else if (request.HasCookie() && !CheckPeerCookie(request.GetCookie(), &error)) { generateError(TStringBuilder() << "Peer connectivity-checking failed, error# " << error); @@ -785,14 +945,14 @@ namespace NActors { success.SetAuthOnly(Params.AuthOnly); success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt); success.SetUseExternalDataChannel(Params.UseExternalDataChannel); - SendExBlock(record, "ExReply"); + SendExBlock(MainChannel, record, "ExReply"); // extract sender actor id (self virtual id) const auto& str = success.GetSenderActorId(); SelfVirtualId.Parse(str.data(), str.size()); } else if (auto ev = reply->CastAsLocal<TEvHandshakeReplyError>()) { // in case of error just send reply to the peer and terminate handshake - SendExBlock(ev->Record, "ExReply"); + SendExBlock(MainChannel, ev->Record, "ExReply"); ProgramInfo.Clear(); // do not issue reply to the proxy } else { Y_FAIL("unexpected event Type# 0x%08" PRIx32, reply->GetTypeRewrite()); @@ -801,7 +961,7 @@ namespace NActors { } template <typename T> - void SendExBlock(const T& proto, const char* what) { + void SendExBlock(TConnection& connection, const T& proto, const char* what) { TString data; Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&data); Y_VERIFY(data.size() <= TExHeader::MaxSize); @@ -811,20 +971,20 @@ namespace NActors { TExHeader header; header.Size = data.size(); header.Sign(data.data(), data.size()); - SendData(&header, sizeof(header), Sprintf("Send%sHeader", what)); - SendData(data.data(), data.size(), Sprintf("Send%sData", what)); + connection.SendData(&header, sizeof(header), Sprintf("Send%sHeader", what)); + connection.SendData(data.data(), data.size(), Sprintf("Send%sData", what)); } - TString ReceiveExBlock(const char* what) { + TString ReceiveExBlock(TConnection& connection, const char* what) { TExHeader header; - ReceiveData(&header, sizeof(header), Sprintf("Receive%sHeader", what)); + connection.ReceiveData(&header, sizeof(header), Sprintf("Receive%sHeader", what)); if (header.Size > TExHeader::MaxSize) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect extended header size"); } TString data; data.resize(header.Size); - ReceiveData(data.Detach(), data.size(), Sprintf("Receive%sData", what)); + connection.ReceiveData(data.Detach(), data.size(), Sprintf("Receive%sData", what)); if (!header.Check(data.data(), data.size())) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Extended header CRC error"); @@ -864,17 +1024,15 @@ namespace NActors { } void Fail(TEvHandshakeFail::EnumHandshakeFail reason, TString explanation, bool network = false) { - TString msg = Sprintf("%s Peer# %s(%s) %s%s", HandshakeKind.data(), PeerHostName ? PeerHostName.data() : "<unknown>", - PeerAddr.size() ? PeerAddr.data() : "<unknown>", ResolveTimedOut ? "[resolve timeout] " : "", - explanation.data()); + TString msg = Sprintf("%s Peer# %s(%s) %s", HandshakeKind.data(), PeerHostName ? PeerHostName.data() : "<unknown>", + PeerAddr.size() ? PeerAddr.data() : "<unknown>", explanation.data()); if (network) { TInstant now = Now(); - TInstant prevLog = LastLogNotice[PeerNodeId]; NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG; - if (now - prevLog > MuteDuration) { + if (!LastLogNotice || now - *LastLogNotice > MuteDuration) { logPriority = NActors::NLog::PRI_NOTICE; - LastLogNotice[PeerNodeId] = now; + LastLogNotice.emplace(now); } LOG_LOG_NET_X(logPriority, PeerNodeId, "network-related error occured on handshake: %s", msg.data()); } else { @@ -898,159 +1056,9 @@ namespace NActors { // COMMUNICATION BLOCK //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - void Connect(bool updatePeerAddr) { - // issue request to a nameservice to resolve peer node address - const auto mono = TActivationContext::Monotonic(); - Send(Common->NameserviceId, new TEvInterconnect::TEvResolveNode(PeerNodeId, TActivationContext::Now() + (Deadline - mono))); - - // wait for the result - auto ev = WaitForSpecificEvent<TEvResolveError, TEvLocalNodeInfo, TEvInterconnect::TEvNodeAddress>("ResolveNode", - mono + ResolveTimeout); - - // extract address from the result - std::vector<NInterconnect::TAddress> addresses; - if (!ev) { - ResolveTimedOut = true; - if (auto peerNodeInfo = GetPeerNodeInfo(); peerNodeInfo && peerNodeInfo->Address) { - addresses.emplace_back(peerNodeInfo->Address, peerNodeInfo->Port); - } else { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve timed out and no static address defined", true); - } - } else if (auto *p = ev->CastAsLocal<TEvLocalNodeInfo>()) { - addresses = std::move(p->Addresses); - if (addresses.empty()) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true); - } - } else if (auto *p = ev->CastAsLocal<TEvInterconnect::TEvNodeAddress>()) { - const auto& r = p->Record; - if (!r.HasAddress() || !r.HasPort()) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: no address returned", true); - } - addresses.emplace_back(r.GetAddress(), static_cast<ui16>(r.GetPort())); - } else { - Y_VERIFY(ev->GetTypeRewrite() == ui32(ENetwork::ResolveError)); - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "DNS resolve error: " + ev->Get<TEvResolveError>()->Explain - + ", Unresolved host# " + ev->Get<TEvResolveError>()->Host, true); - } - - for (const NInterconnect::TAddress& address : addresses) { - // create the socket with matching address family - int err = 0; - Socket = NInterconnect::TStreamSocket::Make(address.GetFamily(), &err); - if (err == EAFNOSUPPORT) { - Socket.Reset(); - continue; - } else if (*Socket == -1) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: failed to create socket"); - } - - // extract peer address - if (updatePeerAddr) { - PeerAddr = address.ToString(); - } - - // set up socket parameters - SetupSocket(); - - // start connecting - err = -Socket->Connect(address); - if (err == EINPROGRESS) { - RegisterInPoller(); - WaitPoller(false, true, "WaitConnect"); - err = Socket->GetConnectStatus(); - } else if (!err) { - RegisterInPoller(); - } - - // check if connection succeeded - if (err) { - Socket.Reset(); - PollerToken.Reset(); - } else { - break; - } - } - - if (!Socket) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Couldn't connect to any resolved address", true); - } - - auto it = LastLogNotice.find(PeerNodeId); - NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG; - if (it != LastLogNotice.end()) { - LastLogNotice.erase(it); - logPriority = NActors::NLog::PRI_NOTICE; - } - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer"); - } - - void SetupSocket() { - // switch to nonblocking mode - try { - SetNonBlock(*Socket); - SetNoDelay(*Socket, true); - } catch (...) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "System error: can't up nonblocking mode for socket"); - } - - // setup send buffer size - Socket->SetSendBufferSize(Common->Settings.GetSendBufferSize()); - } - - void RegisterInPoller() { - const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, SelfActorId, SelfActorId)); - Y_VERIFY(success); - auto result = WaitForSpecificEvent<TEvPollerRegisterResult>("RegisterPoller"); - PollerToken = std::move(result->Get()->PollerToken); - Y_VERIFY(PollerToken); - Y_VERIFY(PollerToken->RefCount() == 1); // ensure exclusive ownership - } - - void SendInitialPacket() { + void SendInitialPacket(TConnection& connection) { TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_PROTOCOL_VERSION); - SendData(&packet, sizeof(packet), "SendInitialPacket"); - } - - void WaitPoller(bool read, bool write, TString state) { - PollerToken->Request(read, write); - WaitForSpecificEvent<TEvPollerReady>(std::move(state)); - } - - template <typename TDataPtr, typename TSendRecvFunc> - void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) { - Y_VERIFY(Socket); - NInterconnect::TStreamSocket* sock = Socket.Get(); - ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv; - size_t processed = 0; - - auto error = [&](TString msg) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, Sprintf("Socket error# %s state# %s processed# %zu remain# %zu", - msg.data(), state.data(), processed, len), true); - }; - - while (len) { - TString err; - ssize_t nbytes = (sock->*pfn)(buffer, len, &err); - if (nbytes > 0) { - buffer = (char*)buffer + nbytes; - len -= nbytes; - processed += nbytes; - } else if (-nbytes == EAGAIN || -nbytes == EWOULDBLOCK) { - WaitPoller(read, write, state); - } else if (!nbytes) { - error("connection unexpectedly closed"); - } else if (-nbytes != EINTR) { - error(err ? err : TString(strerror(-nbytes))); - } - } - } - - void SendData(const void* buffer, size_t len, TString state) { - Process(buffer, len, &NInterconnect::TStreamSocket::Send, false, true, std::move(state)); - } - - void ReceiveData(void* buffer, size_t len, TString state) { - Process(buffer, len, &NInterconnect::TStreamSocket::Recv, true, false, std::move(state)); + connection.SendData(&packet, sizeof(packet), "SendInitialPacket"); } THolder<TEvInterconnect::TNodeInfo> GetPeerNodeInfo() { |