summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-04-13 17:44:21 +0300
committeralexvru <[email protected]>2023-04-13 17:44:21 +0300
commitedbc6cad1fc50b1237b88cd16a78fd44777a6601 (patch)
treee708ea5a81108faed9173c71c1f37e09b29cef53 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parent782ed55b35b9805318a8d26ef040ff62a3875b42 (diff)
Implement external data channel: connection
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp30
1 files changed, 28 insertions, 2 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index a336e4a89fe..9d6a8ba9c1e 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -47,6 +47,9 @@ namespace NActors {
if (Socket) {
Socket->Shutdown(SHUT_RDWR);
}
+ if (XdcSocket) {
+ XdcSocket->Shutdown(SHUT_RDWR);
+ }
}
void TInterconnectSessionTCP::Init() {
@@ -224,6 +227,7 @@ namespace NActors {
SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
Socket = std::move(ev->Get()->Socket);
+ XdcSocket = std::move(ev->Get()->XdcSocket);
// there may be a race
const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
@@ -237,7 +241,7 @@ namespace NActors {
LOG_INFO_IC_SESSION("ICS10", "traffic start");
// create input session actor
- auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
+ auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
@@ -246,6 +250,10 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
Y_VERIFY(success);
+ if (XdcSocket) {
+ const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId()));
+ Y_VERIFY(success);
+ }
ReceiveContext->WriteBlockedByFullSendBuffer = false;
LostConnectionWatchdog.Disarm();
@@ -499,6 +507,10 @@ namespace NActors {
Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
+ if (XdcSocket) {
+ XdcSocket->Shutdown(SHUT_RDWR);
+ XdcSocket.Reset();
+ }
}
void TInterconnectSessionTCP::ReestablishConnectionExecute() {
@@ -531,7 +543,13 @@ namespace NActors {
}
void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- PollerToken = std::move(ev->Get()->PollerToken);
+ const auto& sk = ev->Get()->Socket;
+ if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) {
+ *token = std::move(ev->Get()->PollerToken);
+ } else {
+ return;
+ }
+
if (ReceiveContext->WriteBlockedByFullSendBuffer) {
if (Params.Encryption) {
auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
@@ -1175,6 +1193,14 @@ namespace NActors {
str << (Socket ? i64(*Socket) : -1);
}
}
+ TABLER() {
+ TABLED() {
+ str << "XDC socket";
+ }
+ TABLED() {
+ str << (XdcSocket ? i64(*XdcSocket) : -1);
+ }
+ }
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;