diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-13 17:44:21 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-13 17:44:21 +0300 |
commit | edbc6cad1fc50b1237b88cd16a78fd44777a6601 (patch) | |
tree | e708ea5a81108faed9173c71c1f37e09b29cef53 | |
parent | 782ed55b35b9805318a8d26ef040ff62a3875b42 (diff) | |
download | ydb-edbc6cad1fc50b1237b88cd16a78fd44777a6601.tar.gz |
Implement external data channel: connection
9 files changed, 206 insertions, 14 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 966cdb763e..465899c335 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -56,6 +56,10 @@ namespace NActors { HandshakeBrokerFree, HandshakeBrokerPermit, + // external data channel messages + EvSubscribeForConnection, + EvReportConnection, + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -197,13 +201,15 @@ namespace NActors { const TActorId& self, ui64 nextPacket, TAutoPtr<TProgramInfo>&& programInfo, - TSessionParams params) + TSessionParams params, + TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket) : Socket(std::move(socket)) , Peer(peer) , Self(self) , NextPacket(nextPacket) , ProgramInfo(std::move(programInfo)) , Params(std::move(params)) + , XdcSocket(std::move(xdcSocket)) { } @@ -213,6 +219,7 @@ namespace NActors { const ui64 NextPacket; TAutoPtr<TProgramInfo> ProgramInfo; const TSessionParams Params; + TIntrusivePtr<NInterconnect::TStreamSocket> XdcSocket; }; struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> { @@ -409,4 +416,23 @@ namespace NActors { {} }; + struct TEvSubscribeForConnection : TEventLocal<TEvSubscribeForConnection, (ui32)ENetwork::EvSubscribeForConnection> { + TString HandshakeId; + bool Subscribe; + + TEvSubscribeForConnection(TString handshakeId, bool subscribe) + : HandshakeId(std::move(handshakeId)) + , Subscribe(subscribe) + {} + }; + + struct TEvReportConnection : TEventLocal<TEvReportConnection, (ui32)ENetwork::EvReportConnection> { + TString HandshakeId; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + + TEvReportConnection(TString handshakeId, TIntrusivePtr<NInterconnect::TStreamSocket> socket) + : HandshakeId(std::move(handshakeId)) + , Socket(std::move(socket)) + {} + }; } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 797d709749..c75e27ff5e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/protos/services_common.pb.h> #include <util/system/getpid.h> +#include <util/random/entropy.h> #include <google/protobuf/text_format.h> @@ -256,6 +257,7 @@ namespace NActors { TString PeerHostName; TString PeerAddr; TConnection MainChannel; + TConnection ExternalDataChannel; TString State; TString HandshakeKind; TMaybe<THolder<TProgramInfo>> ProgramInfo; // filled in in case of successful handshake; even if null @@ -265,6 +267,8 @@ namespace NActors { TMonotonic Deadline; TActorId HandshakeBroker; std::optional<TBrokerLeaseHolder> BrokerLeaseHolder; + std::optional<TString> HandshakeId; // for XDC + bool SubscribedForConnection = false; public: THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, @@ -277,6 +281,7 @@ namespace NActors { , NextPacketToPeer(nextPacket) , PeerHostName(std::move(peerHostName)) , MainChannel(this, nullptr) + , ExternalDataChannel(this, nullptr) , HandshakeKind("outgoing handshake") , Params(std::move(params)) { @@ -284,12 +289,17 @@ namespace NActors { Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); HandshakeBroker = MakeHandshakeBrokerOutId(); + + // generate random handshake id + HandshakeId = TString::Uninitialized(32); + EntropyPool().Read(HandshakeId->Detach(), HandshakeId->size()); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) : TActorCoroImpl(StackSize, true) , Common(std::move(common)) , MainChannel(this, std::move(socket)) + , ExternalDataChannel(this, nullptr) , HandshakeKind("incoming handshake") { Y_VERIFY(MainChannel); @@ -314,9 +324,12 @@ namespace NActors { } throw; } catch (const TExPoison&) { - return; // just stop execution + // just stop execution, do nothing } catch (...) { - throw; + Y_FAIL("unhandled exception"); + } + if (SubscribedForConnection) { + SendToProxy(MakeHolder<TEvSubscribeForConnection>(*HandshakeId, false)); } } @@ -348,7 +361,8 @@ namespace NActors { Schedule(Deadline, new TEvents::TEvWakeup); try { - if (MainChannel) { + const bool incoming = MainChannel; + if (incoming) { PerformIncomingHandshake(); } else { PerformOutgoingHandshake(); @@ -356,8 +370,26 @@ namespace NActors { // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings if (ProgramInfo) { + if (Params.UseExternalDataChannel) { + if (incoming) { + Y_VERIFY(SubscribedForConnection); + auto ev = WaitForSpecificEvent<TEvReportConnection>("WaitInboundXdcStream"); + SubscribedForConnection = false; + if (ev->Get()->HandshakeId != *HandshakeId) { + Y_VERIFY_DEBUG(false); + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Mismatching HandshakeId in external data channel"); + } + ExternalDataChannel.GetSocketRef() = std::move(ev->Get()->Socket); + } else { + EstablishExternalDataChannel(); + } + } + if (Params.Encryption) { EstablishSecureConnection(MainChannel); + if (ExternalDataChannel) { + EstablishSecureConnection(ExternalDataChannel); + } } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); } @@ -370,11 +402,14 @@ namespace NActors { LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); Y_VERIFY(NextPacketFromPeer); MainChannel.ResetPollerToken(); + ExternalDataChannel.ResetPollerToken(); + Y_VERIFY(!ExternalDataChannel == !Params.UseExternalDataChannel); SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(MainChannel.GetSocketRef()), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params), std::move(ExternalDataChannel.GetSocketRef()))); } MainChannel.Reset(); + ExternalDataChannel.Reset(); } void EstablishSecureConnection(TConnection& connection) { @@ -651,6 +686,16 @@ namespace NActors { return success; } + void EstablishExternalDataChannel() { + ExternalDataChannel.Connect(nullptr); + char buf[12] = {'x', 'd', 'c', ' ', 'p', 'a', 'y', 'l', 'o', 'a', 'd', 0}; + TInitialPacket packet(TActorId(SelfActorId.NodeId(), TStringBuf(buf, 12)), {}, 0, INTERCONNECT_XDC_STREAM_VERSION); + ExternalDataChannel.SendData(&packet, sizeof(packet), "SendXdcStream"); + NActorsInterconnect::TExternalDataChannelParams params; + params.SetHandshakeId(*HandshakeId); + SendExBlock(ExternalDataChannel, params, "ExternalDataChannelParams"); + } + void PerformOutgoingHandshake() { LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH01", NLog::PRI_DEBUG, "starting outgoing handshake"); @@ -661,7 +706,16 @@ namespace NActors { LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH05", logPriority, "connected to peer"); // send initial request packet - SendInitialPacket(MainChannel); + if (Params.UseExternalDataChannel && PeerVirtualId) { // special case for XDC continuation + TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_XDC_CONTINUATION_VERSION); + MainChannel.SendData(&packet, sizeof(packet), "SendInitialPacket"); + NActorsInterconnect::TContinuationParams request; + request.SetHandshakeId(*HandshakeId); + SendExBlock(MainChannel, request, "ExRequest"); + } else { + TInitialPacket packet(SelfVirtualId, PeerVirtualId, NextPacketToPeer, INTERCONNECT_PROTOCOL_VERSION); + MainChannel.SendData(&packet, sizeof(packet), "SendInitialPacket"); + } TInitialPacket response; MainChannel.ReceiveData(&response, sizeof(response), "ReceiveResponse"); @@ -727,6 +781,7 @@ namespace NActors { request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly); request.SetRequestExtendedTraceFmt(true); request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel); + request.SetHandshakeId(*HandshakeId); SendExBlock(MainChannel, request, "ExRequest"); @@ -789,7 +844,9 @@ namespace NActors { MainChannel.ReceiveData(&request, sizeof(request), "ReceiveRequest"); if (!request.Check()) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT, "Initial packet CRC error"); - } else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION) { + } else if (request.Header.Version != INTERCONNECT_PROTOCOL_VERSION && + request.Header.Version != INTERCONNECT_XDC_CONTINUATION_VERSION && + request.Header.Version != INTERCONNECT_XDC_STREAM_VERSION) { Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, Sprintf("Incompatible protocol %" PRIu64, request.Header.Version)); } @@ -804,6 +861,29 @@ namespace NActors { // extract next packet NextPacketFromPeer = request.Header.NextPacket; + // process some extra payload, if necessary + switch (request.Header.Version) { + case INTERCONNECT_XDC_CONTINUATION_VERSION: { + NActorsInterconnect::TContinuationParams params; + if (!params.ParseFromString(ReceiveExBlock(MainChannel, "ContinuationParams")) || !params.HasHandshakeId()) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer"); + } + HandshakeId = params.GetHandshakeId(); + SendToProxy(MakeHolder<TEvSubscribeForConnection>(*HandshakeId, true)); + SubscribedForConnection = true; + break; + } + case INTERCONNECT_XDC_STREAM_VERSION: { + NActorsInterconnect::TExternalDataChannelParams params; + if (!params.ParseFromString(ReceiveExBlock(MainChannel, "ExternalDataChannelParams")) || !params.HasHandshakeId()) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Incorrect packet from peer"); + } + MainChannel.ResetPollerToken(); + SendToProxy(MakeHolder<TEvReportConnection>(params.GetHandshakeId(), std::move(MainChannel.GetSocketRef()))); + throw TExHandshakeFailed(); + } + } + if (request.Header.PeerVirtualId) { // issue request to the proxy and wait for the response auto reply = AskProxy<TEvHandshakeAck, TEvHandshakeNak>(MakeHolder<TEvHandshakeAsk>( @@ -918,6 +998,16 @@ namespace NActors { Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt(); Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel; + if (Params.UseExternalDataChannel) { + if (request.HasHandshakeId()) { + HandshakeId = request.GetHandshakeId(); + SendToProxy(MakeHolder<TEvSubscribeForConnection>(*HandshakeId, true)); + SubscribedForConnection = true; + } else { + generateError("Peer has requested ExternalDataChannel feature, but did not provide HandshakeId"); + } + } + if (request.HasClientScopeId()) { ParsePeerScopeId(request.GetClientScopeId()); } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index fc37f11251..c8a7437b80 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -12,6 +12,8 @@ namespace NActors { static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(5); static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2; + static constexpr ui64 INTERCONNECT_XDC_CONTINUATION_VERSION = 3; + static constexpr ui64 INTERCONNECT_XDC_STREAM_VERSION = 4; using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 32f10c727b..ee85f72dd0 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -7,11 +7,12 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, - TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, - std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, ui64 lastConfirmed, - TDuration deadPeerTimeout, TSessionParams params) + TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket, TIntrusivePtr<TReceiveContext> context, + TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, + ui64 lastConfirmed, TDuration deadPeerTimeout, TSessionParams params) : SessionId(sessionId) , Socket(std::move(socket)) + , XdcSocket(std::move(xdcSocket)) , Context(std::move(context)) , Common(std::move(common)) , NodeId(nodeId) @@ -62,7 +63,12 @@ namespace NActors { } void TInputSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { - PollerToken = std::move(ev->Get()->PollerToken); + const auto& sk = ev->Get()->Socket; + if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) { + *token = std::move(ev->Get()->PollerToken); + } else { + return; + } ReceiveData(); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 8562f6a440..d70ea1aa73 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -339,6 +339,9 @@ namespace NActors { return; } + // drop any pending XDC subscriptions + ConnectionSubscriptions.clear(); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); SwitchToState(__LINE__, "StateWork", &TThis::StateWork); @@ -886,7 +889,7 @@ namespace NActors { stats.LastSessionDieTime = LastSessionDieTime; stats.TotalOutputQueueSize = Session ? Session->TotalOutputQueueSize : 0; stats.Connected = Session ? (bool)Session->Socket : false; - stats.ExternalDataChannel = Session && Session->Params.UseExternalDataChannel; + stats.ExternalDataChannel = Session && Session->XdcSocket; stats.Host = TechnicalPeerHostName; stats.Port = 0; ui32 rep = 0; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 71edfccbe2..8b30dde0ae 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -126,6 +126,8 @@ namespace NActors { hFunc(TEvQueryStats, Handle) \ cFunc(TEvInterconnect::EvTerminate, HandleTerminate) \ cFunc(EvPassAwayIfNeeded, HandlePassAwayIfNeeded) \ + hFunc(TEvSubscribeForConnection, Handle); \ + hFunc(TEvReportConnection, Handle); \ default: \ Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); \ } \ @@ -491,6 +493,28 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + std::unordered_map<TString, TActorId> ConnectionSubscriptions; + + void Handle(TEvSubscribeForConnection::TPtr ev) { + auto& msg = *ev->Get(); + if (msg.Subscribe) { + if (const auto [it, inserted] = ConnectionSubscriptions.emplace(msg.HandshakeId, ev->Sender); !inserted) { + Y_VERIFY_DEBUG(false); + ConnectionSubscriptions.erase(it); // collision happened somehow? + } + } else { + ConnectionSubscriptions.erase(msg.HandshakeId); + } + } + + void Handle(TEvReportConnection::TPtr ev) { + if (auto nh = ConnectionSubscriptions.extract(ev->Get()->HandshakeId)) { + TActivationContext::Send(IEventHandle::Forward(ev, nh.mapped())); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + TActorId OutgoingHandshakeActor; TInstant OutgoingHandshakeActorCreated; TInstant OutgoingHandshakeActorReset; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index a336e4a89f..9d6a8ba9c1 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -47,6 +47,9 @@ namespace NActors { if (Socket) { Socket->Shutdown(SHUT_RDWR); } + if (XdcSocket) { + XdcSocket->Shutdown(SHUT_RDWR); + } } void TInterconnectSessionTCP::Init() { @@ -224,6 +227,7 @@ namespace NActors { SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); Socket = std::move(ev->Get()->Socket); + XdcSocket = std::move(ev->Get()->XdcSocket); // there may be a race const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); @@ -237,7 +241,7 @@ namespace NActors { LOG_INFO_IC_SESSION("ICS10", "traffic start"); // create input session actor - auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, + auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); @@ -246,6 +250,10 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); Y_VERIFY(success); + if (XdcSocket) { + const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId())); + Y_VERIFY(success); + } ReceiveContext->WriteBlockedByFullSendBuffer = false; LostConnectionWatchdog.Disarm(); @@ -499,6 +507,10 @@ namespace NActors { Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } + if (XdcSocket) { + XdcSocket->Shutdown(SHUT_RDWR); + XdcSocket.Reset(); + } } void TInterconnectSessionTCP::ReestablishConnectionExecute() { @@ -531,7 +543,13 @@ namespace NActors { } void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { - PollerToken = std::move(ev->Get()->PollerToken); + const auto& sk = ev->Get()->Socket; + if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) { + *token = std::move(ev->Get()->PollerToken); + } else { + return; + } + if (ReceiveContext->WriteBlockedByFullSendBuffer) { if (Params.Encryption) { auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); @@ -1175,6 +1193,14 @@ namespace NActors { str << (Socket ? i64(*Socket) : -1); } } + TABLER() { + TABLED() { + str << "XDC socket"; + } + TABLED() { + str << (XdcSocket ? i64(*XdcSocket) : -1); + } + } ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 598a5c9220..f836bdd1af 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -180,6 +180,7 @@ namespace NActors { TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, + TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, @@ -208,7 +209,9 @@ namespace NActors { const TActorId SessionId; TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> XdcSocket; TPollerToken::TPtr PollerToken; + TPollerToken::TPtr XdcPollerToken; TIntrusivePtr<TReceiveContext> Context; TInterconnectProxyCommon::TPtr Common; const ui32 NodeId; @@ -473,7 +476,9 @@ namespace NActors { TInstant LastHandshakeDone; TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> XdcSocket; TPollerToken::TPtr PollerToken; + TPollerToken::TPtr XdcPollerToken; ui32 SendBufferSize; ui64 InflightDataAmount = 0; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index eed00fc551..3108b4f24c 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -75,6 +75,8 @@ message THandshakeRequest { optional bool RequestExternalDataChannel = 21; optional bytes CompatibilityInfo = 22; + + optional bytes HandshakeId = 23; } message THandshakeSuccess { @@ -117,3 +119,11 @@ message TEvLoadMessage { optional string Id = 3; // message identifier optional bytes Payload = 4; // data payload } + +message TContinuationParams { + optional bytes HandshakeId = 1; +} + +message TExternalDataChannelParams { + optional bytes HandshakeId = 1; +} |