diff options
author | alexvru <[email protected]> | 2023-04-13 17:44:21 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2023-04-13 17:44:21 +0300 |
commit | edbc6cad1fc50b1237b88cd16a78fd44777a6601 (patch) | |
tree | e708ea5a81108faed9173c71c1f37e09b29cef53 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | 782ed55b35b9805318a8d26ef040ff62a3875b42 (diff) |
Implement external data channel: connection
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 30 |
1 files changed, 28 insertions, 2 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index a336e4a89fe..9d6a8ba9c1e 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -47,6 +47,9 @@ namespace NActors { if (Socket) { Socket->Shutdown(SHUT_RDWR); } + if (XdcSocket) { + XdcSocket->Shutdown(SHUT_RDWR); + } } void TInterconnectSessionTCP::Init() { @@ -224,6 +227,7 @@ namespace NActors { SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); Socket = std::move(ev->Get()->Socket); + XdcSocket = std::move(ev->Get()->XdcSocket); // there may be a race const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); @@ -237,7 +241,7 @@ namespace NActors { LOG_INFO_IC_SESSION("ICS10", "traffic start"); // create input session actor - auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, + auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); @@ -246,6 +250,10 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); Y_VERIFY(success); + if (XdcSocket) { + const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId())); + Y_VERIFY(success); + } ReceiveContext->WriteBlockedByFullSendBuffer = false; LostConnectionWatchdog.Disarm(); @@ -499,6 +507,10 @@ namespace NActors { Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } + if (XdcSocket) { + XdcSocket->Shutdown(SHUT_RDWR); + XdcSocket.Reset(); + } } void TInterconnectSessionTCP::ReestablishConnectionExecute() { @@ -531,7 +543,13 @@ namespace NActors { } void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { - PollerToken = std::move(ev->Get()->PollerToken); + const auto& sk = ev->Get()->Socket; + if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) { + *token = std::move(ev->Get()->PollerToken); + } else { + return; + } + if (ReceiveContext->WriteBlockedByFullSendBuffer) { if (Params.Encryption) { auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); @@ -1175,6 +1193,14 @@ namespace NActors { str << (Socket ? i64(*Socket) : -1); } } + TABLER() { + TABLED() { + str << "XDC socket"; + } + TABLED() { + str << (XdcSocket ? i64(*XdcSocket) : -1); + } + } ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; |