aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-13 17:44:21 +0300
committeralexvru <alexvru@ydb.tech>2023-04-13 17:44:21 +0300
commitedbc6cad1fc50b1237b88cd16a78fd44777a6601 (patch)
treee708ea5a81108faed9173c71c1f37e09b29cef53
parent782ed55b35b9805318a8d26ef040ff62a3875b42 (diff)
downloadydb-edbc6cad1fc50b1237b88cd16a78fd44777a6601.tar.gz
Implement external data channel: connection
-rw-r--r--library/cpp/actors/interconnect/events_local.h28
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp102
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp14
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp5
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h24
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp30
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h5
-rw-r--r--library/cpp/actors/protos/interconnect.proto10
9 files changed, 206 insertions, 14 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 966cdb763e..465899c335 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -56,6 +56,10 @@ namespace NActors {
HandshakeBrokerFree,
HandshakeBrokerPermit,
+ // external data channel messages
+ EvSubscribeForConnection,
+ EvReportConnection,
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -197,13 +201,15 @@ namespace NActors {
const TActorId& self,
ui64 nextPacket,
TAutoPtr<TProgramInfo>&& programInfo,
- TSessionParams params)
+ TSessionParams params,
+ TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket)
: Socket(std::move(socket))
, Peer(peer)
, Self(self)
, NextPacket(nextPacket)
, ProgramInfo(std::move(programInfo))
, Params(std::move(params))
+ , XdcSocket(std::move(xdcSocket))
{
}
@@ -213,6 +219,7 @@ namespace NActors {
const ui64 NextPacket;
TAutoPtr<TProgramInfo> ProgramInfo;
const TSessionParams Params;
+ TIntrusivePtr<NInterconnect::TStreamSocket> XdcSocket;
};
struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> {
@@ -409,4 +416,23 @@ namespace NActors {
{}
};
+ struct TEvSubscribeForConnection : TEventLocal<TEvSubscribeForConnection, (ui32)ENetwork::EvSubscribeForConnection> {
+ TString HandshakeId;
+ bool Subscribe;
+
+ TEvSubscribeForConnection(TString handshakeId, bool subscribe)
+ : HandshakeId(std::move(handshakeId))
+ , Subscribe(subscribe)
+ {}
+ };
+
+ struct TEvReportConnection : TEventLocal<TEvReportConnection, (ui32)ENetwork::EvReportConnection> {
+ TString HandshakeId;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+
+ TEvReportConnection(TString handshakeId, TIntrusivePtr<NInterconnect::TStreamSocket> socket)
+ : HandshakeId(std::move(handshakeId))
+ , Socket(std::move(socket))
+ {}
+ };
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 797d709749..c75e27ff5e 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <util/system/getpid.h>
+#include <util/random/entropy.h>
#include <google/protobuf/text_format.h>
@@ -256,6 +257,7 @@ namespace NActors {
TString PeerHostName;
TString PeerAddr;
TConnection MainChannel;
+ TConnection ExternalDataChannel;
TString State;
TString HandshakeKind;
TMaybe<THolder<TProgramInfo>> ProgramInfo; // filled in in case of successful handshake; even if null
@@ -265,6 +267,8 @@ namespace NActors {
TMonotonic Deadline;
TActorId HandshakeBroker;
std::optional<TBrokerLeaseHolder> BrokerLeaseHolder;
+ std::optional<TString> HandshakeId; // for XDC
+ bool SubscribedForConnection = false;
public:
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
@@ -277,6 +281,7 @@ namespace NActors {
, NextPacketToPeer(nextPacket)
, PeerHostName(std::move(peerHostName))
, MainChannel(this, nullptr)
+ , ExternalDataChannel(this, nullptr)
, HandshakeKind("outgoing handshake")
, Params(std::move(params))
{
@@ -284,12 +289,17 @@ namespace NActors {
Y_VERIFY(SelfVirtualId.NodeId());
Y_VERIFY(PeerNodeId);
HandshakeBroker = MakeHandshakeBrokerOutId();
+
+ // generate random handshake id
+ HandshakeId = TString::Uninitialized(32);
+ EntropyPool().Read(HandshakeId->Detach(), HandshakeId->size());
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
: TActorCoroImpl(StackSize, true)
, Common(std::move(common))
, MainChannel(this, std::move(socket))
+ , ExternalDataChannel(this, nullptr)
, HandshakeKind("incoming handshake")
{
Y_VERIFY(MainChannel);
@@ -314,9 +324,12 @@ namespace NActors {
}
throw;
} catch (const TExPoison&) {
- return; // just stop execution
+ // just stop execution, do nothing
} catch (...) {
- throw;
+ Y_FAIL("unhandled exception");
+ }
+ if (SubscribedForConnection) {
+ SendToProxy(MakeHolder<TEvSubscribeForConnection>(*HandshakeId, false));
}
}
@@ -348,7 +361,8 @@ namespace NActors {
Schedule(Deadline, new TEvents::TEvWakeup);
try {
- if (MainChannel) {
+ const bool incoming = MainChannel;
+ if (incoming) {
PerformIncomingHandshake();
} else {
PerformOutgoingHandshake();
@@ -356,8 +370,26 @@ namespace NActors {
// establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
if (ProgramInfo) {
+ if (Params.UseExternalDataChannel) {
+ if (incoming) {
+ Y_VERIFY(SubscribedForConnection);
+ auto ev = WaitForSpecificEvent<TEvReportConnection>("WaitInboundXdcStream");
+ SubscribedForConnection = false;
+ if (ev->Get()->HandshakeId != *HandshakeId) {
+ Y_VERIFY_DEBUG(false);
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Mismatching HandshakeId in external data channel");
+ }
+ ExternalDataChannel.GetSocketRef() = std::move(ev->Get()->Socket);
+ } else {
+ EstablishExternalDataChannel();
+ }
+ }
+
if (Params.Encryption) {
EstablishSecureConnection(MainChannel);
+ if (ExternalDataChannel) {
+ EstablishSecureConnection(ExternalDataChannel);
+ }
} else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
}
@@ -370,11 +402,14 @@ namespace NActors {
LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
Y_VERIFY(NextPacketFromPeer);
MainChannel.ResetPollerToken();
+ ExternalDataChannel.ResetPollerToken();
+ Y_VERIFY(!ExternalDataChannel == !Params.UseExternalDataChannel);
SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(MainChannel.GetSocketRef()), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params), std::move(ExternalDataChannel.GetSocketRef())));
}
MainChannel.Reset();
+ ExternalDataChannel.Reset();
}
void EstablishSecureConnection(TConnection& connection) {
@@ -651,6 +686,16 @@ namespace NActors {
return success;
}
+ void EstablishExternalDataChannel() {
+ ExternalDataChannel.Connect(nullptr);
+ char buf[12] = {'x', 'd', 'c', ' ', 'p', 'a', 'y', 'l', 'o', 'a', 'd', 0};
+ TInitialPacket packet(TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)), {}, 0, INTERCONNECT_XDC_STREAM_VERSION);
+ ExternalDataChannel.SendData(&packet, sizeof(packet), "SendXdcStream");
+ NActorsInterconnect::TExternalDataChannelParams params;
+ params.SetHandshakeId(*HandshakeId);
+ SendExBlock(ExternalDataChannel, params, "ExternalDataChannelParams");
+ }
+
void PerformOutgoingHandshake() {
LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH01", NLog::PRI_DEBUG,
"starting outgoing handshake");
@@ -661,7 +706,16 @@ namespace NActors {
LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer");
// send initial request packet
- SendInitialPacket(MainChannel);
+ if (Params.UseExternalDataChannel && PeerVirtualId) { // special case for XDC continuation
+ TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_XDC_CONTINUATION_VERSION);
+ MainChannel.SendData(&packet, sizeof(packet), "SendInitialPacket");
+ NActorsInterconnect::TContinuationParams request;
+ request.SetHandshakeId(*HandshakeId);
+ SendExBlock(MainChannel, request, "ExRequest");
+ } else {
+ TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_PROTOCOL_VERSION);
+ MainChannel.SendData(&packet, sizeof(packet), "SendInitialPacket");
+ }
TInitialPacket response;
MainChannel.ReceiveData(&response, sizeof(response), "ReceiveResponse");
@@ -727,6 +781,7 @@ namespace NActors {
request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly);
request.SetRequestExtendedTraceFmt(true);
request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel);
+ request.SetHandshakeId(*HandshakeId);
SendExBlock(MainChannel, request, "ExRequest");
@@ -789,7 +844,9 @@ namespace NActors {
MainChannel.ReceiveData(&request, sizeof(request), "ReceiveRequest");
if (!request.Check()) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error");
- } else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION) {
+ } else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION &&
+ request.Header.Version != INTERCONNECT_XDC_CONTINUATION_VERSION &&
+ request.Header.Version != INTERCONNECT_XDC_STREAM_VERSION) {
Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Incompatible protocol %" PRIu64, request.Header.Version));
}
@@ -804,6 +861,29 @@ namespace NActors {
// extract next packet
NextPacketFromPeer = request.Header.NextPacket;
+ // process some extra payload, if necessary
+ switch (request.Header.Version) {
+ case INTERCONNECT_XDC_CONTINUATION_VERSION: {
+ NActorsInterconnect::TContinuationParams params;
+ if (!params.ParseFromString(ReceiveExBlock(MainChannel, "ContinuationParams")) || !params.HasHandshakeId()) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer");
+ }
+ HandshakeId = params.GetHandshakeId();
+ SendToProxy(MakeHolder<TEvSubscribeForConnection>(*HandshakeId, true));
+ SubscribedForConnection = true;
+ break;
+ }
+ case INTERCONNECT_XDC_STREAM_VERSION: {
+ NActorsInterconnect::TExternalDataChannelParams params;
+ if (!params.ParseFromString(ReceiveExBlock(MainChannel, "ExternalDataChannelParams")) || !params.HasHandshakeId()) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer");
+ }
+ MainChannel.ResetPollerToken();
+ SendToProxy(MakeHolder<TEvReportConnection>(params.GetHandshakeId(), std::move(MainChannel.GetSocketRef())));
+ throw TExHandshakeFailed();
+ }
+ }
+
if (request.Header.PeerVirtualId) {
// issue request to the proxy and wait for the response
auto reply = AskProxy<TEvHandshakeAck, TEvHandshakeNak>(MakeHolder<TEvHandshakeAsk>(
@@ -918,6 +998,16 @@ namespace NActors {
Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt();
Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel;
+ if (Params.UseExternalDataChannel) {
+ if (request.HasHandshakeId()) {
+ HandshakeId = request.GetHandshakeId();
+ SendToProxy(MakeHolder<TEvSubscribeForConnection>(*HandshakeId, true));
+ SubscribedForConnection = true;
+ } else {
+ generateError("Peer has requested ExternalDataChannel feature, but did not provide HandshakeId");
+ }
+ }
+
if (request.HasClientScopeId()) {
ParsePeerScopeId(request.GetClientScopeId());
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h
index fc37f11251..c8a7437b80 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.h
+++ b/library/cpp/actors/interconnect/interconnect_handshake.h
@@ -12,6 +12,8 @@
namespace NActors {
static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5);
static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2;
+ static constexpr ui64 INTERCONNECT_XDC_CONTINUATION_VERSION = 3;
+ static constexpr ui64 INTERCONNECT_XDC_STREAM_VERSION = 4;
using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 32f10c727b..ee85f72dd0 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -7,11 +7,12 @@ namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
- TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common,
- std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed,
- TDuration deadPeerTimeout, TSessionParams params)
+ TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket, TIntrusivePtr<TReceiveContext> context,
+ TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId,
+ ui64 lastConfirmed, TDuration deadPeerTimeout, TSessionParams params)
: SessionId(sessionId)
, Socket(std::move(socket))
+ , XdcSocket(std::move(xdcSocket))
, Context(std::move(context))
, Common(std::move(common))
, NodeId(nodeId)
@@ -62,7 +63,12 @@ namespace NActors {
}
void TInputSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- PollerToken = std::move(ev->Get()->PollerToken);
+ const auto& sk = ev->Get()->Socket;
+ if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) {
+ *token = std::move(ev->Get()->PollerToken);
+ } else {
+ return;
+ }
ReceiveData();
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 8562f6a440..d70ea1aa73 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -339,6 +339,9 @@ namespace NActors {
return;
}
+ // drop any pending XDC subscriptions
+ ConnectionSubscriptions.clear();
+
Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
SwitchToState(__LINE__, "StateWork", &TThis::StateWork);
@@ -886,7 +889,7 @@ namespace NActors {
stats.LastSessionDieTime = LastSessionDieTime;
stats.TotalOutputQueueSize = Session ? Session->TotalOutputQueueSize : 0;
stats.Connected = Session ? (bool)Session->Socket : false;
- stats.ExternalDataChannel = Session && Session->Params.UseExternalDataChannel;
+ stats.ExternalDataChannel = Session && Session->XdcSocket;
stats.Host = TechnicalPeerHostName;
stats.Port = 0;
ui32 rep = 0;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 71edfccbe2..8b30dde0ae 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -126,6 +126,8 @@ namespace NActors {
hFunc(TEvQueryStats, Handle) \
cFunc(TEvInterconnect::EvTerminate, HandleTerminate) \
cFunc(EvPassAwayIfNeeded, HandlePassAwayIfNeeded) \
+ hFunc(TEvSubscribeForConnection, Handle); \
+ hFunc(TEvReportConnection, Handle); \
default: \
Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); \
} \
@@ -491,6 +493,28 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ std::unordered_map<TString, TActorId> ConnectionSubscriptions;
+
+ void Handle(TEvSubscribeForConnection::TPtr ev) {
+ auto& msg = *ev->Get();
+ if (msg.Subscribe) {
+ if (const auto [it, inserted] = ConnectionSubscriptions.emplace(msg.HandshakeId, ev->Sender); !inserted) {
+ Y_VERIFY_DEBUG(false);
+ ConnectionSubscriptions.erase(it); // collision happened somehow?
+ }
+ } else {
+ ConnectionSubscriptions.erase(msg.HandshakeId);
+ }
+ }
+
+ void Handle(TEvReportConnection::TPtr ev) {
+ if (auto nh = ConnectionSubscriptions.extract(ev->Get()->HandshakeId)) {
+ TActivationContext::Send(IEventHandle::Forward(ev, nh.mapped()));
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
TActorId OutgoingHandshakeActor;
TInstant OutgoingHandshakeActorCreated;
TInstant OutgoingHandshakeActorReset;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index a336e4a89f..9d6a8ba9c1 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -47,6 +47,9 @@ namespace NActors {
if (Socket) {
Socket->Shutdown(SHUT_RDWR);
}
+ if (XdcSocket) {
+ XdcSocket->Shutdown(SHUT_RDWR);
+ }
}
void TInterconnectSessionTCP::Init() {
@@ -224,6 +227,7 @@ namespace NActors {
SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
Socket = std::move(ev->Get()->Socket);
+ XdcSocket = std::move(ev->Get()->XdcSocket);
// there may be a race
const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
@@ -237,7 +241,7 @@ namespace NActors {
LOG_INFO_IC_SESSION("ICS10", "traffic start");
// create input session actor
- auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
+ auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
@@ -246,6 +250,10 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
Y_VERIFY(success);
+ if (XdcSocket) {
+ const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId()));
+ Y_VERIFY(success);
+ }
ReceiveContext->WriteBlockedByFullSendBuffer = false;
LostConnectionWatchdog.Disarm();
@@ -499,6 +507,10 @@ namespace NActors {
Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
+ if (XdcSocket) {
+ XdcSocket->Shutdown(SHUT_RDWR);
+ XdcSocket.Reset();
+ }
}
void TInterconnectSessionTCP::ReestablishConnectionExecute() {
@@ -531,7 +543,13 @@ namespace NActors {
}
void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- PollerToken = std::move(ev->Get()->PollerToken);
+ const auto& sk = ev->Get()->Socket;
+ if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) {
+ *token = std::move(ev->Get()->PollerToken);
+ } else {
+ return;
+ }
+
if (ReceiveContext->WriteBlockedByFullSendBuffer) {
if (Params.Encryption) {
auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
@@ -1175,6 +1193,14 @@ namespace NActors {
str << (Socket ? i64(*Socket) : -1);
}
}
+ TABLER() {
+ TABLED() {
+ str << "XDC socket";
+ }
+ TABLED() {
+ str << (XdcSocket ? i64(*XdcSocket) : -1);
+ }
+ }
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 598a5c9220..f836bdd1af 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -180,6 +180,7 @@ namespace NActors {
TInputSessionTCP(const TActorId& sessionId,
TIntrusivePtr<NInterconnect::TStreamSocket> socket,
+ TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket,
TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common,
std::shared_ptr<IInterconnectMetrics> metrics,
@@ -208,7 +209,9 @@ namespace NActors {
const TActorId SessionId;
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> XdcSocket;
TPollerToken::TPtr PollerToken;
+ TPollerToken::TPtr XdcPollerToken;
TIntrusivePtr<TReceiveContext> Context;
TInterconnectProxyCommon::TPtr Common;
const ui32 NodeId;
@@ -473,7 +476,9 @@ namespace NActors {
TInstant LastHandshakeDone;
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> XdcSocket;
TPollerToken::TPtr PollerToken;
+ TPollerToken::TPtr XdcPollerToken;
ui32 SendBufferSize;
ui64 InflightDataAmount = 0;
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index eed00fc551..3108b4f24c 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -75,6 +75,8 @@ message THandshakeRequest {
optional bool RequestExternalDataChannel = 21;
optional bytes CompatibilityInfo = 22;
+
+ optional bytes HandshakeId = 23;
}
message THandshakeSuccess {
@@ -117,3 +119,11 @@ message TEvLoadMessage {
optional string Id = 3; // message identifier
optional bytes Payload = 4; // data payload
}
+
+message TContinuationParams {
+ optional bytes HandshakeId = 1;
+}
+
+message TExternalDataChannelParams {
+ optional bytes HandshakeId = 1;
+}