aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp588
1 files changed, 294 insertions, 294 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 65c8ae6fa5..7e2d8ccb94 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -7,17 +7,17 @@
#include <util/system/getpid.h>
namespace NActors {
- static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5);
+ static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5);
- static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
- static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10);
- static constexpr ui32 SleepRetryMultiplier = 4;
+ static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
+ static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10);
+ static constexpr ui32 SleepRetryMultiplier = 4;
- static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) {
- TStringBuf token;
- TStringBuf(longName).NextTok('.', token);
- return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port);
- }
+ static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) {
+ TStringBuf token;
+ TStringBuf(longName).NextTok('.', token);
+ return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port);
+ }
TInterconnectProxyTCP::TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common,
IActor **dynamicPtr)
@@ -27,14 +27,14 @@ namespace NActors {
, Common(std::move(common))
, SecureContext(new NInterconnect::TSecureSocketContext(Common->Settings.Certificate, Common->Settings.PrivateKey,
Common->Settings.CaFilePath, Common->Settings.CipherList))
- {
+ {
Y_VERIFY(Common);
Y_VERIFY(Common->NameserviceId);
if (DynamicPtr) {
Y_VERIFY(!*DynamicPtr);
*DynamicPtr = this;
}
- }
+ }
void TInterconnectProxyTCP::Bootstrap() {
SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId));
@@ -54,20 +54,20 @@ namespace NActors {
TString path = Sprintf("peer%04" PRIu32, PeerNodeId);
TString title = Sprintf("Peer #%04" PRIu32, PeerNodeId);
mon(path, title, sys, SelfId());
- }
+ }
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingActivation
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingActivation
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void TInterconnectProxyTCP::RequestNodeInfo(STATEFN_SIG) {
ICPROXY_PROFILED;
- Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents);
+ Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents);
EnqueueSessionEvent(ev);
StartConfiguring();
- }
+ }
void TInterconnectProxyTCP::RequestNodeInfoForIncomingHandshake(STATEFN_SIG) {
ICPROXY_PROFILED;
@@ -77,37 +77,37 @@ namespace NActors {
EnqueueIncomingHandshakeEvent(ev);
StartConfiguring();
}
- }
+ }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // PendingNodeInfo
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PendingNodeInfo
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void TInterconnectProxyTCP::StartConfiguring() {
ICPROXY_PROFILED;
- Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
+ Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
- // issue node info request
+ // issue node info request
Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId));
- // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other
- // wakeup events in flight
+ // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other
+ // wakeup events in flight
SwitchToState(__LINE__, "PendingNodeInfo", &TThis::PendingNodeInfo, GetNodeRequestTimeout,
- ConfigureTimeoutCookie = new TEvents::TEvWakeup);
- }
+ ConfigureTimeoutCookie = new TEvents::TEvWakeup);
+ }
void TInterconnectProxyTCP::Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev) {
ICPROXY_PROFILED;
Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !Session);
- if (!ev->Get()->Node) {
+ if (!ev->Get()->Node) {
TransitToErrorState("cannot get node info");
- } else {
- auto& info = *ev->Get()->Node;
+ } else {
+ auto& info = *ev->Get()->Node;
TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port);
- TechnicalPeerHostName = info.Host;
+ TechnicalPeerHostName = info.Host;
if (!Metrics) {
Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common);
}
@@ -122,9 +122,9 @@ namespace NActors {
void TInterconnectProxyTCP::ConfigureTimeout(TEvents::TEvWakeup::TPtr& ev) {
ICPROXY_PROFILED;
- if (ev->Get() == ConfigureTimeoutCookie) {
+ if (ev->Get() == ConfigureTimeoutCookie) {
TransitToErrorState("timed out while waiting for node info");
- }
+ }
}
void TInterconnectProxyTCP::ProcessConfigured() {
@@ -152,72 +152,72 @@ namespace NActors {
void TInterconnectProxyTCP::StartInitialHandshake() {
ICPROXY_PROFILED;
- // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any
+ // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any
DropHandshakes();
- // create and register handshake actor
+ // create and register handshake actor
OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, GenerateSessionVirtualId(),
TActorId(), PeerNodeId, 0, TechnicalPeerHostName, TSessionParams()), TMailboxType::ReadAsFilled);
OutgoingHandshakeActorCreated = TActivationContext::Now();
// prepare for new handshake
PrepareNewSessionHandshake();
- }
+ }
void TInterconnectProxyTCP::StartResumeHandshake(ui64 inputCounter) {
ICPROXY_PROFILED;
- // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful
+ // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful
DropOutgoingHandshake();
- // ensure that we have session
- Y_VERIFY(Session);
+ // ensure that we have session
+ Y_VERIFY(Session);
- // ensure that we have both virtual ids
- Y_VERIFY(SessionVirtualId);
- Y_VERIFY(RemoteSessionVirtualId);
+ // ensure that we have both virtual ids
+ Y_VERIFY(SessionVirtualId);
+ Y_VERIFY(RemoteSessionVirtualId);
- // create and register handshake actor
+ // create and register handshake actor
OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, SessionVirtualId,
RemoteSessionVirtualId, PeerNodeId, inputCounter, TechnicalPeerHostName, Session->Params),
TMailboxType::ReadAsFilled);
OutgoingHandshakeActorCreated = TActivationContext::Now();
- }
+ }
void TInterconnectProxyTCP::IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId,
THolder<IEventBase> event) {
ICPROXY_PROFILED;
- Y_VERIFY(!IncomingHandshakeActor);
- IncomingHandshakeActor = handshakeId;
+ Y_VERIFY(!IncomingHandshakeActor);
+ IncomingHandshakeActor = handshakeId;
IncomingHandshakeActorFilledIn = TActivationContext::Now();
Y_VERIFY(!LastSerialFromIncomingHandshake || *LastSerialFromIncomingHandshake <= peerLocalId);
LastSerialFromIncomingHandshake = peerLocalId;
if (OutgoingHandshakeActor && SelfId().NodeId() < PeerNodeId) {
- // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake
- // incoming handshake must be held till outgoing handshake is complete or failed
+ // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake
+ // incoming handshake must be held till outgoing handshake is complete or failed
LOG_DEBUG_IC("ICP06", "reply for incoming handshake (actor %s) is held", IncomingHandshakeActor.ToString().data());
- HeldHandshakeReply = std::move(event);
+ HeldHandshakeReply = std::move(event);
- // Check that we are in one of acceptable states that would properly handle handshake statuses.
- const auto state = CurrentStateFunc();
- Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State);
- } else {
- LOG_DEBUG_IC("ICP07", "issued incoming handshake reply");
+ // Check that we are in one of acceptable states that would properly handle handshake statuses.
+ const auto state = CurrentStateFunc();
+ Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State);
+ } else {
+ LOG_DEBUG_IC("ICP07", "issued incoming handshake reply");
- // No race, so we can send reply immediately.
- Y_VERIFY(!HeldHandshakeReply);
+ // No race, so we can send reply immediately.
+ Y_VERIFY(!HeldHandshakeReply);
Send(IncomingHandshakeActor, event.Release());
- // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't
- // switch from working state.
- if (!Session) {
+ // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't
+ // switch from working state.
+ if (!Session) {
LOG_INFO_IC("ICP08", "No active sessions, becoming PendingConnection");
SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection);
- } else {
- Y_VERIFY(CurrentStateFunc() == &TThis::StateWork);
- }
+ } else {
+ Y_VERIFY(CurrentStateFunc() == &TThis::StateWork);
+ }
}
}
@@ -243,8 +243,8 @@ namespace NActors {
} else {
// if we already have incoming handshake, then terminate existing one
DropIncomingHandshake();
-
- // issue reply to the sender, possibly holding it while outgoing handshake is at race
+
+ // issue reply to the sender, possibly holding it while outgoing handshake is at race
THolder<IEventBase> reply = IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::ProcessHandshakeRequest, ev);
return IssueIncomingHandshakeReply(ev->Sender, RemoteSessionVirtualId.LocalId(), std::move(reply));
}
@@ -263,27 +263,27 @@ namespace NActors {
ui64 remoteStartTime = record.GetProgramStartTime();
ui64 remoteSerial = record.GetSerial();
- if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) {
- if (remoteSerial < RemoteProgramInfo->Serial) {
+ if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) {
+ if (remoteSerial < RemoteProgramInfo->Serial) {
LOG_INFO_IC("ICP18", "handshake (actor %s) is too old", ev->Sender.ToString().data());
Send(ev->Sender, new TEvents::TEvPoisonPill);
- return;
- } else {
- RemoteProgramInfo->Serial = remoteSerial;
- }
+ return;
+ } else {
+ RemoteProgramInfo->Serial = remoteSerial;
+ }
} else {
- const auto ptr = new TProgramInfo;
- ptr->PID = remotePID;
- ptr->StartTime = remoteStartTime;
- ptr->Serial = remoteSerial;
- RemoteProgramInfo.Reset(ptr);
+ const auto ptr = new TProgramInfo;
+ ptr->PID = remotePID;
+ ptr->StartTime = remoteStartTime;
+ ptr->Serial = remoteSerial;
+ RemoteProgramInfo.Reset(ptr);
}
- /* Let's check peer technical hostname */
+ /* Let's check peer technical hostname */
if (record.HasSenderHostName() && TechnicalPeerHostName != record.GetSenderHostName()) {
Send(ev->Sender, new TEvHandshakeReplyError("host name mismatch"));
- return;
- }
+ return;
+ }
// check sender actor id and check if it is not very old
if (LastSerialFromIncomingHandshake) {
@@ -303,60 +303,60 @@ namespace NActors {
}
}
- // drop incoming handshake as this is definitely more recent
+ // drop incoming handshake as this is definitely more recent
DropIncomingHandshake();
- // prepare for new session
+ // prepare for new session
PrepareNewSessionHandshake();
- auto event = MakeHolder<TEvHandshakeReplyOK>();
- auto* pb = event->Record.MutableSuccess();
+ auto event = MakeHolder<TEvHandshakeReplyOK>();
+ auto* pb = event->Record.MutableSuccess();
const TActorId virtualId = GenerateSessionVirtualId();
- pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
+ pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION);
pb->SetSenderActorId(virtualId.ToString());
- pb->SetProgramPID(GetPID());
+ pb->SetProgramPID(GetPID());
pb->SetProgramStartTime(Common->StartTime);
- pb->SetSerial(virtualId.LocalId());
+ pb->SetSerial(virtualId.LocalId());
IssueIncomingHandshakeReply(ev->Sender, 0, std::move(event));
- }
+ }
void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeDone::TPtr& ev) {
ICPROXY_PROFILED;
TEvHandshakeDone *msg = ev->Get();
- // Terminate handshake actor working in opposite direction, if set up.
- if (ev->Sender == IncomingHandshakeActor) {
- LOG_INFO_IC("ICP19", "incoming handshake succeeded");
+ // Terminate handshake actor working in opposite direction, if set up.
+ if (ev->Sender == IncomingHandshakeActor) {
+ LOG_INFO_IC("ICP19", "incoming handshake succeeded");
DropIncomingHandshake(false);
DropOutgoingHandshake();
- } else if (ev->Sender == OutgoingHandshakeActor) {
- LOG_INFO_IC("ICP20", "outgoing handshake succeeded");
+ } else if (ev->Sender == OutgoingHandshakeActor) {
+ LOG_INFO_IC("ICP20", "outgoing handshake succeeded");
DropIncomingHandshake();
DropOutgoingHandshake(false);
- } else {
+ } else {
/* It seems to be an old handshake. */
- return;
+ return;
}
- Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
+ Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor);
SwitchToState(__LINE__, "StateWork", &TThis::StateWork);
- if (Session) {
+ if (Session) {
// this is continuation request, check that virtual ids match
Y_VERIFY(SessionVirtualId == msg->Self && RemoteSessionVirtualId == msg->Peer);
- } else {
+ } else {
// this is initial request, check that we have virtual ids not filled in
Y_VERIFY(!SessionVirtualId && !RemoteSessionVirtualId);
- }
+ }
auto error = [&](const char* description) {
TransitToErrorState(description);
};
- // If session is not created, then create new one.
- if (!Session) {
+ // If session is not created, then create new one.
+ if (!Session) {
RemoteProgramInfo = std::move(msg->ProgramInfo);
if (!RemoteProgramInfo) {
// we have received resume handshake, but session was closed concurrently while handshaking
@@ -374,15 +374,15 @@ namespace NActors {
// ensure that we have session local/peer virtual ids
Y_VERIFY(Session && SessionVirtualId && RemoteSessionVirtualId);
- // Set up new connection for the session.
+ // Set up new connection for the session.
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::SetNewConnection, ev);
- // Reset retry timer
- HoldByErrorWakeupDuration = TDuration::Zero();
+ // Reset retry timer
+ HoldByErrorWakeupDuration = TDuration::Zero();
- /* Forward all held events */
+ /* Forward all held events */
ProcessPendingSessionEvents();
- }
+ }
void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeFail::TPtr& ev) {
ICPROXY_PROFILED;
@@ -392,42 +392,42 @@ namespace NActors {
(IncomingHandshakeActor && OutgoingHandshakeActor);
LogHandshakeFail(ev, inconclusive);
- if (ev->Sender == IncomingHandshakeActor) {
- LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s",
+ if (ev->Sender == IncomingHandshakeActor) {
+ LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s",
ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), OutgoingHandshakeActor.ToString().data());
DropIncomingHandshake(false);
- } else if (ev->Sender == OutgoingHandshakeActor) {
- LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s",
+ } else if (ev->Sender == OutgoingHandshakeActor) {
+ LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s",
ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), IncomingHandshakeActor.ToString().data(),
- HeldHandshakeReply ? "yes" : "no");
+ HeldHandshakeReply ? "yes" : "no");
DropOutgoingHandshake(false);
- if (IEventBase* reply = HeldHandshakeReply.Release()) {
- Y_VERIFY(IncomingHandshakeActor);
+ if (IEventBase* reply = HeldHandshakeReply.Release()) {
+ Y_VERIFY(IncomingHandshakeActor);
LOG_DEBUG_IC("ICP26", "sent held handshake reply to %s", IncomingHandshakeActor.ToString().data());
Send(IncomingHandshakeActor, reply);
- }
+ }
// if we have no current session, then we have to drop all pending events as the outgoing handshake has failed
ProcessPendingSessionEvents();
- } else {
- /* It seems to be an old fail, just ignore it */
+ } else {
+ /* It seems to be an old fail, just ignore it */
LOG_NOTICE_IC("ICP27", "obsolete handshake fail ignored");
- return;
+ return;
}
if (Metrics) {
Metrics->IncHandshakeFails();
- }
+ }
- if (IncomingHandshakeActor || OutgoingHandshakeActor) {
- // one of handshakes is still going on
- LOG_DEBUG_IC("ICP28", "other handshake is still going on");
- return;
- }
+ if (IncomingHandshakeActor || OutgoingHandshakeActor) {
+ // one of handshakes is still going on
+ LOG_DEBUG_IC("ICP28", "other handshake is still going on");
+ return;
+ }
- switch (ev->Get()->Temporary) {
- case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT:
+ switch (ev->Get()->Temporary) {
+ case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT:
if (!Session) {
if (PendingSessionEvents) {
// try to start outgoing handshake as we have some events enqueued
@@ -444,22 +444,22 @@ namespace NActors {
// we have no active connection in that session, so just restart handshake from last known position
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::StartHandshake);
}
- break;
-
- case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH:
+ break;
+
+ case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH:
StartInitialHandshake();
- break;
-
- case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT:
+ break;
+
+ case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT:
TString timeExplanation = " LastSessionDieTime# " + LastSessionDieTime.ToString();
- if (Session) {
+ if (Session) {
InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate,
TDisconnectReason::HandshakeFailPermanent());
}
TransitToErrorState(ev->Get()->Explanation + timeExplanation, false);
- break;
- }
- }
+ break;
+ }
+ }
void TInterconnectProxyTCP::LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive) {
ICPROXY_PROFILED;
@@ -487,69 +487,69 @@ namespace NActors {
void TInterconnectProxyTCP::ProcessPendingSessionEvents() {
ICPROXY_PROFILED;
- while (PendingSessionEvents) {
+ while (PendingSessionEvents) {
TPendingSessionEvent ev = std::move(PendingSessionEvents.front());
PendingSessionEventsSize -= ev.Size;
TAutoPtr<IEventHandle> event(ev.Event.Release());
- PendingSessionEvents.pop_front();
+ PendingSessionEvents.pop_front();
if (Session) {
ForwardSessionEventToSession(event);
- } else {
+ } else {
DropSessionEvent(event);
}
- }
+ }
}
void TInterconnectProxyTCP::DropSessionEvent(STATEFN_SIG) {
ICPROXY_PROFILED;
ValidateEvent(ev, "DropSessionEvent");
- switch (ev->GetTypeRewrite()) {
- case TEvInterconnect::EvForward:
- if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvInterconnect::EvForward:
+ if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
- }
+ }
TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected));
- break;
+ break;
- case TEvInterconnect::TEvConnectNode::EventType:
- case TEvents::TEvSubscribe::EventType:
+ case TEvInterconnect::TEvConnectNode::EventType:
+ case TEvents::TEvSubscribe::EventType:
Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie);
- break;
+ break;
- case TEvents::TEvUnsubscribe::EventType:
- /* Do nothing */
- break;
+ case TEvents::TEvUnsubscribe::EventType:
+ /* Do nothing */
+ break;
- default:
- Y_FAIL("Unexpected type of event in held event queue");
- }
+ default:
+ Y_FAIL("Unexpected type of event in held event queue");
+ }
}
void TInterconnectProxyTCP::UnregisterSession(TInterconnectSessionTCP* session) {
ICPROXY_PROFILED;
- Y_VERIFY(Session && Session == session && SessionID);
+ Y_VERIFY(Session && Session == session && SessionID);
LOG_INFO_IC("ICP30", "unregister session Session# %s VirtualId# %s", SessionID.ToString().data(),
SessionVirtualId.ToString().data());
- Session = nullptr;
+ Session = nullptr;
SessionID = TActorId();
- // drop all pending events as we are closed
+ // drop all pending events as we are closed
ProcessPendingSessionEvents();
- // reset virtual ids as this session is terminated
+ // reset virtual ids as this session is terminated
SessionVirtualId = TActorId();
RemoteSessionVirtualId = TActorId();
if (Metrics) {
Metrics->IncSessionDeaths();
- }
+ }
LastSessionDieTime = TActivationContext::Now();
-
+
if (IncomingHandshakeActor || OutgoingHandshakeActor) {
PrepareNewSessionHandshake();
} else {
@@ -566,22 +566,22 @@ namespace NActors {
PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev);
ScheduleCleanupEventQueue();
CleanupEventQueue();
- }
+ }
void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(STATEFN_SIG) {
ICPROXY_PROFILED;
// enqueue handshake request
Y_UNUSED();
- PendingIncomingHandshakeEvents.emplace_back(ev);
- }
+ PendingIncomingHandshakeEvents.emplace_back(ev);
+ }
void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& /*ev*/) {
ICPROXY_PROFILED;
// TEvHandshakeDone can't get into the queue, because we have to process handshake request first; this may be the
// race with the previous handshakes, so simply ignore it
- }
+ }
void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev) {
ICPROXY_PROFILED;
@@ -599,140 +599,140 @@ namespace NActors {
break;
}
}
- }
+ }
void TInterconnectProxyTCP::ForwardSessionEventToSession(STATEFN_SIG) {
ICPROXY_PROFILED;
- Y_VERIFY(Session && SessionID);
+ Y_VERIFY(Session && SessionID);
ValidateEvent(ev, "ForwardSessionEventToSession");
InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID));
- }
+ }
void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) {
ICPROXY_PROFILED;
LOG_INFO_IC("ICP31", "proxy http called");
- TStringStream str;
-
- HTML(str) {
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Proxy";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
- TABLER() {
- TABLEH() {
- str << "Sensor";
- }
- TABLEH() {
- str << "Value";
- }
- }
- }
-#define MON_VAR(NAME) \
- TABLER() { \
- TABLED() { \
- str << #NAME; \
- } \
- TABLED() { \
- str << NAME; \
- } \
- }
-
- TABLEBODY() {
+ TStringStream str;
+
+ HTML(str) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Proxy";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() {
+ str << "Sensor";
+ }
+ TABLEH() {
+ str << "Value";
+ }
+ }
+ }
+#define MON_VAR(NAME) \
+ TABLER() { \
+ TABLED() { \
+ str << #NAME; \
+ } \
+ TABLED() { \
+ str << NAME; \
+ } \
+ }
+
+ TABLEBODY() {
MON_VAR(TActivationContext::Now())
- MON_VAR(SessionID)
- MON_VAR(LastSessionDieTime)
- MON_VAR(IncomingHandshakeActor)
- MON_VAR(IncomingHandshakeActorFilledIn)
- MON_VAR(IncomingHandshakeActorReset)
- MON_VAR(OutgoingHandshakeActor)
- MON_VAR(OutgoingHandshakeActorCreated)
- MON_VAR(OutgoingHandshakeActorReset)
- MON_VAR(State)
- MON_VAR(StateSwitchTime)
+ MON_VAR(SessionID)
+ MON_VAR(LastSessionDieTime)
+ MON_VAR(IncomingHandshakeActor)
+ MON_VAR(IncomingHandshakeActorFilledIn)
+ MON_VAR(IncomingHandshakeActorReset)
+ MON_VAR(OutgoingHandshakeActor)
+ MON_VAR(OutgoingHandshakeActorCreated)
+ MON_VAR(OutgoingHandshakeActorReset)
+ MON_VAR(State)
+ MON_VAR(StateSwitchTime)
}
}
}
}
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Error Log";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Error Log";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
TABLER() {
- TABLEH() {
- str << "Timestamp";
+ TABLEH() {
+ str << "Timestamp";
}
- TABLEH() {
- str << "Elapsed";
+ TABLEH() {
+ str << "Elapsed";
}
- TABLEH() {
- str << "Kind";
+ TABLEH() {
+ str << "Kind";
}
- TABLEH() {
- str << "Explanation";
+ TABLEH() {
+ str << "Explanation";
}
}
}
- TABLEBODY() {
+ TABLEBODY() {
const TInstant now = TActivationContext::Now();
- const TInstant barrier = now - TDuration::Minutes(1);
- for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) {
- auto wrapper = [&](const auto& lambda) {
- if (std::get<0>(*it) > barrier) {
- str << "<strong>";
- lambda();
- str << "</strong>";
- } else {
- lambda();
- }
- };
- TABLER() {
- TABLED() {
- wrapper([&] {
- str << std::get<0>(*it);
- });
- }
- TABLED() {
- wrapper([&] {
- str << now - std::get<0>(*it);
- });
- }
- TABLED() {
- wrapper([&] {
- str << std::get<1>(*it);
- });
- }
- TABLED() {
- wrapper([&] {
- str << std::get<2>(*it);
- });
+ const TInstant barrier = now - TDuration::Minutes(1);
+ for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) {
+ auto wrapper = [&](const auto& lambda) {
+ if (std::get<0>(*it) > barrier) {
+ str << "<strong>";
+ lambda();
+ str << "</strong>";
+ } else {
+ lambda();
+ }
+ };
+ TABLER() {
+ TABLED() {
+ wrapper([&] {
+ str << std::get<0>(*it);
+ });
+ }
+ TABLED() {
+ wrapper([&] {
+ str << now - std::get<0>(*it);
+ });
+ }
+ TABLED() {
+ wrapper([&] {
+ str << std::get<1>(*it);
+ });
+ }
+ TABLED() {
+ wrapper([&] {
+ str << std::get<2>(*it);
+ });
ui32 rep = std::get<3>(*it);
if (rep != 1) {
str << " <strong>x" << rep << "</strong>";
}
- }
- }
- }
- }
+ }
+ }
+ }
+ }
}
}
}
}
- if (Session != nullptr) {
+ if (Session != nullptr) {
Session->GenerateHttpInfo(str);
- }
-
+ }
+
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}
@@ -746,16 +746,16 @@ namespace NActors {
UpdateErrorStateLog(TActivationContext::Now(), "permanent conclusive", explanation);
}
- Y_VERIFY(Session == nullptr);
- Y_VERIFY(!SessionID);
+ Y_VERIFY(Session == nullptr);
+ Y_VERIFY(!SessionID);
- // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we
- // sleep N times longer than the previous try, but not longer than desired number of seconds
- HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero()
- ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep)
- : FirstErrorSleep;
+ // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we
+ // sleep N times longer than the previous try, but not longer than desired number of seconds
+ HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero()
+ ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep)
+ : FirstErrorSleep;
- // transit to required state and arm wakeup timer
+ // transit to required state and arm wakeup timer
if (Terminated) {
// switch to this state permanently
SwitchToState(__LINE__, "HoldByError", &TThis::HoldByError);
@@ -765,21 +765,21 @@ namespace NActors {
HoldByErrorWakeupCookie = new TEvents::TEvWakeup);
}
- /* Process all pending events. */
+ /* Process all pending events. */
ProcessPendingSessionEvents();
- /* Terminate handshakes */
+ /* Terminate handshakes */
DropHandshakes();
- /* Terminate pending incoming handshake requests. */
- for (auto& ev : PendingIncomingHandshakeEvents) {
+ /* Terminate pending incoming handshake requests. */
+ for (auto& ev : PendingIncomingHandshakeEvents) {
Send(ev->Sender, new TEvents::TEvPoisonPill);
if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) {
TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release()));
LogHandshakeFail(tmp, true);
}
- }
- PendingIncomingHandshakeEvents.clear();
+ }
+ PendingIncomingHandshakeEvents.clear();
}
void TInterconnectProxyTCP::WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev) {
@@ -787,39 +787,39 @@ namespace NActors {
LOG_INFO_IC("ICP33", "wake up from error state");
- if (ev->Get() == HoldByErrorWakeupCookie) {
+ if (ev->Get() == HoldByErrorWakeupCookie) {
SwitchToInitialState();
- }
+ }
}
void TInterconnectProxyTCP::Disconnect() {
ICPROXY_PROFILED;
- // terminate handshakes (if any)
+ // terminate handshakes (if any)
DropHandshakes();
- if (Session) {
+ if (Session) {
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::UserRequest());
- } else {
+ } else {
TransitToErrorState("forced disconnect");
- }
+ }
}
void TInterconnectProxyTCP::ScheduleCleanupEventQueue() {
ICPROXY_PROFILED;
- if (!CleanupEventQueueScheduled && PendingSessionEvents) {
+ if (!CleanupEventQueueScheduled && PendingSessionEvents) {
// apply batching at 50 ms granularity
Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue);
- CleanupEventQueueScheduled = true;
- }
+ CleanupEventQueueScheduled = true;
+ }
}
void TInterconnectProxyTCP::HandleCleanupEventQueue() {
ICPROXY_PROFILED;
- Y_VERIFY(CleanupEventQueueScheduled);
- CleanupEventQueueScheduled = false;
+ Y_VERIFY(CleanupEventQueueScheduled);
+ CleanupEventQueueScheduled = false;
CleanupEventQueue();
ScheduleCleanupEventQueue();
}
@@ -838,24 +838,24 @@ namespace NActors {
} else {
break;
}
- }
+ }
}
void TInterconnectProxyTCP::HandleClosePeerSocket() {
ICPROXY_PROFILED;
- if (Session && Session->Socket) {
+ if (Session && Session->Socket) {
LOG_INFO_IC("ICP34", "closed connection by debug command");
- Session->Socket->Shutdown(SHUT_RDWR);
- }
+ Session->Socket->Shutdown(SHUT_RDWR);
+ }
}
void TInterconnectProxyTCP::HandleCloseInputSession() {
ICPROXY_PROFILED;
- if (Session) {
+ if (Session) {
IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::CloseInputSession);
- }
+ }
}
void TInterconnectProxyTCP::HandlePoisonSession() {
@@ -869,11 +869,11 @@ namespace NActors {
void TInterconnectProxyTCP::HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev) {
ICPROXY_PROFILED;
- ui64 bufSize = 0;
- if (Session) {
- bufSize = Session->TotalOutputQueueSize;
- }
-
+ ui64 bufSize = 0;
+ if (Session) {
+ bufSize = Session->TotalOutputQueueSize;
+ }
+
Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize));
}