diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp | 588 |
1 files changed, 294 insertions, 294 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 65c8ae6fa5..7e2d8ccb94 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -7,17 +7,17 @@ #include <util/system/getpid.h> namespace NActors { - static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5); + static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5); - static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10); - static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10); - static constexpr ui32 SleepRetryMultiplier = 4; + static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10); + static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10); + static constexpr ui32 SleepRetryMultiplier = 4; - static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) { - TStringBuf token; - TStringBuf(longName).NextTok('.', token); - return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port); - } + static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) { + TStringBuf token; + TStringBuf(longName).NextTok('.', token); + return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port); + } TInterconnectProxyTCP::TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr) @@ -27,14 +27,14 @@ namespace NActors { , Common(std::move(common)) , SecureContext(new NInterconnect::TSecureSocketContext(Common->Settings.Certificate, Common->Settings.PrivateKey, Common->Settings.CaFilePath, Common->Settings.CipherList)) - { + { Y_VERIFY(Common); Y_VERIFY(Common->NameserviceId); if (DynamicPtr) { Y_VERIFY(!*DynamicPtr); *DynamicPtr = this; } - } + } void TInterconnectProxyTCP::Bootstrap() { SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId)); @@ -54,20 +54,20 @@ namespace NActors { TString path = Sprintf("peer%04" PRIu32, PeerNodeId); TString title = Sprintf("Peer #%04" PRIu32, PeerNodeId); mon(path, title, sys, SelfId()); - } + } } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingActivation - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingActivation + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void TInterconnectProxyTCP::RequestNodeInfo(STATEFN_SIG) { ICPROXY_PROFILED; - Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents); EnqueueSessionEvent(ev); StartConfiguring(); - } + } void TInterconnectProxyTCP::RequestNodeInfoForIncomingHandshake(STATEFN_SIG) { ICPROXY_PROFILED; @@ -77,37 +77,37 @@ namespace NActors { EnqueueIncomingHandshakeEvent(ev); StartConfiguring(); } - } + } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingNodeInfo - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingNodeInfo + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void TInterconnectProxyTCP::StartConfiguring() { ICPROXY_PROFILED; - Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); - // issue node info request + // issue node info request Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId)); - // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other - // wakeup events in flight + // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other + // wakeup events in flight SwitchToState(__LINE__, "PendingNodeInfo", &TThis::PendingNodeInfo, GetNodeRequestTimeout, - ConfigureTimeoutCookie = new TEvents::TEvWakeup); - } + ConfigureTimeoutCookie = new TEvents::TEvWakeup); + } void TInterconnectProxyTCP::Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev) { ICPROXY_PROFILED; Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !Session); - if (!ev->Get()->Node) { + if (!ev->Get()->Node) { TransitToErrorState("cannot get node info"); - } else { - auto& info = *ev->Get()->Node; + } else { + auto& info = *ev->Get()->Node; TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port); - TechnicalPeerHostName = info.Host; + TechnicalPeerHostName = info.Host; if (!Metrics) { Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common); } @@ -122,9 +122,9 @@ namespace NActors { void TInterconnectProxyTCP::ConfigureTimeout(TEvents::TEvWakeup::TPtr& ev) { ICPROXY_PROFILED; - if (ev->Get() == ConfigureTimeoutCookie) { + if (ev->Get() == ConfigureTimeoutCookie) { TransitToErrorState("timed out while waiting for node info"); - } + } } void TInterconnectProxyTCP::ProcessConfigured() { @@ -152,72 +152,72 @@ namespace NActors { void TInterconnectProxyTCP::StartInitialHandshake() { ICPROXY_PROFILED; - // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any + // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any DropHandshakes(); - // create and register handshake actor + // create and register handshake actor OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, GenerateSessionVirtualId(), TActorId(), PeerNodeId, 0, TechnicalPeerHostName, TSessionParams()), TMailboxType::ReadAsFilled); OutgoingHandshakeActorCreated = TActivationContext::Now(); // prepare for new handshake PrepareNewSessionHandshake(); - } + } void TInterconnectProxyTCP::StartResumeHandshake(ui64 inputCounter) { ICPROXY_PROFILED; - // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful + // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful DropOutgoingHandshake(); - // ensure that we have session - Y_VERIFY(Session); + // ensure that we have session + Y_VERIFY(Session); - // ensure that we have both virtual ids - Y_VERIFY(SessionVirtualId); - Y_VERIFY(RemoteSessionVirtualId); + // ensure that we have both virtual ids + Y_VERIFY(SessionVirtualId); + Y_VERIFY(RemoteSessionVirtualId); - // create and register handshake actor + // create and register handshake actor OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, SessionVirtualId, RemoteSessionVirtualId, PeerNodeId, inputCounter, TechnicalPeerHostName, Session->Params), TMailboxType::ReadAsFilled); OutgoingHandshakeActorCreated = TActivationContext::Now(); - } + } void TInterconnectProxyTCP::IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, THolder<IEventBase> event) { ICPROXY_PROFILED; - Y_VERIFY(!IncomingHandshakeActor); - IncomingHandshakeActor = handshakeId; + Y_VERIFY(!IncomingHandshakeActor); + IncomingHandshakeActor = handshakeId; IncomingHandshakeActorFilledIn = TActivationContext::Now(); Y_VERIFY(!LastSerialFromIncomingHandshake || *LastSerialFromIncomingHandshake <= peerLocalId); LastSerialFromIncomingHandshake = peerLocalId; if (OutgoingHandshakeActor && SelfId().NodeId() < PeerNodeId) { - // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake - // incoming handshake must be held till outgoing handshake is complete or failed + // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake + // incoming handshake must be held till outgoing handshake is complete or failed LOG_DEBUG_IC("ICP06", "reply for incoming handshake (actor %s) is held", IncomingHandshakeActor.ToString().data()); - HeldHandshakeReply = std::move(event); + HeldHandshakeReply = std::move(event); - // Check that we are in one of acceptable states that would properly handle handshake statuses. - const auto state = CurrentStateFunc(); - Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State); - } else { - LOG_DEBUG_IC("ICP07", "issued incoming handshake reply"); + // Check that we are in one of acceptable states that would properly handle handshake statuses. + const auto state = CurrentStateFunc(); + Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State); + } else { + LOG_DEBUG_IC("ICP07", "issued incoming handshake reply"); - // No race, so we can send reply immediately. - Y_VERIFY(!HeldHandshakeReply); + // No race, so we can send reply immediately. + Y_VERIFY(!HeldHandshakeReply); Send(IncomingHandshakeActor, event.Release()); - // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't - // switch from working state. - if (!Session) { + // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't + // switch from working state. + if (!Session) { LOG_INFO_IC("ICP08", "No active sessions, becoming PendingConnection"); SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection); - } else { - Y_VERIFY(CurrentStateFunc() == &TThis::StateWork); - } + } else { + Y_VERIFY(CurrentStateFunc() == &TThis::StateWork); + } } } @@ -243,8 +243,8 @@ namespace NActors { } else { // if we already have incoming handshake, then terminate existing one DropIncomingHandshake(); - - // issue reply to the sender, possibly holding it while outgoing handshake is at race + + // issue reply to the sender, possibly holding it while outgoing handshake is at race THolder<IEventBase> reply = IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::ProcessHandshakeRequest, ev); return IssueIncomingHandshakeReply(ev->Sender, RemoteSessionVirtualId.LocalId(), std::move(reply)); } @@ -263,27 +263,27 @@ namespace NActors { ui64 remoteStartTime = record.GetProgramStartTime(); ui64 remoteSerial = record.GetSerial(); - if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) { - if (remoteSerial < RemoteProgramInfo->Serial) { + if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) { + if (remoteSerial < RemoteProgramInfo->Serial) { LOG_INFO_IC("ICP18", "handshake (actor %s) is too old", ev->Sender.ToString().data()); Send(ev->Sender, new TEvents::TEvPoisonPill); - return; - } else { - RemoteProgramInfo->Serial = remoteSerial; - } + return; + } else { + RemoteProgramInfo->Serial = remoteSerial; + } } else { - const auto ptr = new TProgramInfo; - ptr->PID = remotePID; - ptr->StartTime = remoteStartTime; - ptr->Serial = remoteSerial; - RemoteProgramInfo.Reset(ptr); + const auto ptr = new TProgramInfo; + ptr->PID = remotePID; + ptr->StartTime = remoteStartTime; + ptr->Serial = remoteSerial; + RemoteProgramInfo.Reset(ptr); } - /* Let's check peer technical hostname */ + /* Let's check peer technical hostname */ if (record.HasSenderHostName() && TechnicalPeerHostName != record.GetSenderHostName()) { Send(ev->Sender, new TEvHandshakeReplyError("host name mismatch")); - return; - } + return; + } // check sender actor id and check if it is not very old if (LastSerialFromIncomingHandshake) { @@ -303,60 +303,60 @@ namespace NActors { } } - // drop incoming handshake as this is definitely more recent + // drop incoming handshake as this is definitely more recent DropIncomingHandshake(); - // prepare for new session + // prepare for new session PrepareNewSessionHandshake(); - auto event = MakeHolder<TEvHandshakeReplyOK>(); - auto* pb = event->Record.MutableSuccess(); + auto event = MakeHolder<TEvHandshakeReplyOK>(); + auto* pb = event->Record.MutableSuccess(); const TActorId virtualId = GenerateSessionVirtualId(); - pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION); + pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION); pb->SetSenderActorId(virtualId.ToString()); - pb->SetProgramPID(GetPID()); + pb->SetProgramPID(GetPID()); pb->SetProgramStartTime(Common->StartTime); - pb->SetSerial(virtualId.LocalId()); + pb->SetSerial(virtualId.LocalId()); IssueIncomingHandshakeReply(ev->Sender, 0, std::move(event)); - } + } void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeDone::TPtr& ev) { ICPROXY_PROFILED; TEvHandshakeDone *msg = ev->Get(); - // Terminate handshake actor working in opposite direction, if set up. - if (ev->Sender == IncomingHandshakeActor) { - LOG_INFO_IC("ICP19", "incoming handshake succeeded"); + // Terminate handshake actor working in opposite direction, if set up. + if (ev->Sender == IncomingHandshakeActor) { + LOG_INFO_IC("ICP19", "incoming handshake succeeded"); DropIncomingHandshake(false); DropOutgoingHandshake(); - } else if (ev->Sender == OutgoingHandshakeActor) { - LOG_INFO_IC("ICP20", "outgoing handshake succeeded"); + } else if (ev->Sender == OutgoingHandshakeActor) { + LOG_INFO_IC("ICP20", "outgoing handshake succeeded"); DropIncomingHandshake(); DropOutgoingHandshake(false); - } else { + } else { /* It seems to be an old handshake. */ - return; + return; } - Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); SwitchToState(__LINE__, "StateWork", &TThis::StateWork); - if (Session) { + if (Session) { // this is continuation request, check that virtual ids match Y_VERIFY(SessionVirtualId == msg->Self && RemoteSessionVirtualId == msg->Peer); - } else { + } else { // this is initial request, check that we have virtual ids not filled in Y_VERIFY(!SessionVirtualId && !RemoteSessionVirtualId); - } + } auto error = [&](const char* description) { TransitToErrorState(description); }; - // If session is not created, then create new one. - if (!Session) { + // If session is not created, then create new one. + if (!Session) { RemoteProgramInfo = std::move(msg->ProgramInfo); if (!RemoteProgramInfo) { // we have received resume handshake, but session was closed concurrently while handshaking @@ -374,15 +374,15 @@ namespace NActors { // ensure that we have session local/peer virtual ids Y_VERIFY(Session && SessionVirtualId && RemoteSessionVirtualId); - // Set up new connection for the session. + // Set up new connection for the session. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::SetNewConnection, ev); - // Reset retry timer - HoldByErrorWakeupDuration = TDuration::Zero(); + // Reset retry timer + HoldByErrorWakeupDuration = TDuration::Zero(); - /* Forward all held events */ + /* Forward all held events */ ProcessPendingSessionEvents(); - } + } void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeFail::TPtr& ev) { ICPROXY_PROFILED; @@ -392,42 +392,42 @@ namespace NActors { (IncomingHandshakeActor && OutgoingHandshakeActor); LogHandshakeFail(ev, inconclusive); - if (ev->Sender == IncomingHandshakeActor) { - LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s", + if (ev->Sender == IncomingHandshakeActor) { + LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s", ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), OutgoingHandshakeActor.ToString().data()); DropIncomingHandshake(false); - } else if (ev->Sender == OutgoingHandshakeActor) { - LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s", + } else if (ev->Sender == OutgoingHandshakeActor) { + LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s", ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), IncomingHandshakeActor.ToString().data(), - HeldHandshakeReply ? "yes" : "no"); + HeldHandshakeReply ? "yes" : "no"); DropOutgoingHandshake(false); - if (IEventBase* reply = HeldHandshakeReply.Release()) { - Y_VERIFY(IncomingHandshakeActor); + if (IEventBase* reply = HeldHandshakeReply.Release()) { + Y_VERIFY(IncomingHandshakeActor); LOG_DEBUG_IC("ICP26", "sent held handshake reply to %s", IncomingHandshakeActor.ToString().data()); Send(IncomingHandshakeActor, reply); - } + } // if we have no current session, then we have to drop all pending events as the outgoing handshake has failed ProcessPendingSessionEvents(); - } else { - /* It seems to be an old fail, just ignore it */ + } else { + /* It seems to be an old fail, just ignore it */ LOG_NOTICE_IC("ICP27", "obsolete handshake fail ignored"); - return; + return; } if (Metrics) { Metrics->IncHandshakeFails(); - } + } - if (IncomingHandshakeActor || OutgoingHandshakeActor) { - // one of handshakes is still going on - LOG_DEBUG_IC("ICP28", "other handshake is still going on"); - return; - } + if (IncomingHandshakeActor || OutgoingHandshakeActor) { + // one of handshakes is still going on + LOG_DEBUG_IC("ICP28", "other handshake is still going on"); + return; + } - switch (ev->Get()->Temporary) { - case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT: + switch (ev->Get()->Temporary) { + case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT: if (!Session) { if (PendingSessionEvents) { // try to start outgoing handshake as we have some events enqueued @@ -444,22 +444,22 @@ namespace NActors { // we have no active connection in that session, so just restart handshake from last known position IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::StartHandshake); } - break; - - case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH: + break; + + case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH: StartInitialHandshake(); - break; - - case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT: + break; + + case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT: TString timeExplanation = " LastSessionDieTime# " + LastSessionDieTime.ToString(); - if (Session) { + if (Session) { InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::HandshakeFailPermanent()); } TransitToErrorState(ev->Get()->Explanation + timeExplanation, false); - break; - } - } + break; + } + } void TInterconnectProxyTCP::LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive) { ICPROXY_PROFILED; @@ -487,69 +487,69 @@ namespace NActors { void TInterconnectProxyTCP::ProcessPendingSessionEvents() { ICPROXY_PROFILED; - while (PendingSessionEvents) { + while (PendingSessionEvents) { TPendingSessionEvent ev = std::move(PendingSessionEvents.front()); PendingSessionEventsSize -= ev.Size; TAutoPtr<IEventHandle> event(ev.Event.Release()); - PendingSessionEvents.pop_front(); + PendingSessionEvents.pop_front(); if (Session) { ForwardSessionEventToSession(event); - } else { + } else { DropSessionEvent(event); } - } + } } void TInterconnectProxyTCP::DropSessionEvent(STATEFN_SIG) { ICPROXY_PROFILED; ValidateEvent(ev, "DropSessionEvent"); - switch (ev->GetTypeRewrite()) { - case TEvInterconnect::EvForward: - if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { + switch (ev->GetTypeRewrite()) { + case TEvInterconnect::EvForward: + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie); - } + } TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); - break; + break; - case TEvInterconnect::TEvConnectNode::EventType: - case TEvents::TEvSubscribe::EventType: + case TEvInterconnect::TEvConnectNode::EventType: + case TEvents::TEvSubscribe::EventType: Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie); - break; + break; - case TEvents::TEvUnsubscribe::EventType: - /* Do nothing */ - break; + case TEvents::TEvUnsubscribe::EventType: + /* Do nothing */ + break; - default: - Y_FAIL("Unexpected type of event in held event queue"); - } + default: + Y_FAIL("Unexpected type of event in held event queue"); + } } void TInterconnectProxyTCP::UnregisterSession(TInterconnectSessionTCP* session) { ICPROXY_PROFILED; - Y_VERIFY(Session && Session == session && SessionID); + Y_VERIFY(Session && Session == session && SessionID); LOG_INFO_IC("ICP30", "unregister session Session# %s VirtualId# %s", SessionID.ToString().data(), SessionVirtualId.ToString().data()); - Session = nullptr; + Session = nullptr; SessionID = TActorId(); - // drop all pending events as we are closed + // drop all pending events as we are closed ProcessPendingSessionEvents(); - // reset virtual ids as this session is terminated + // reset virtual ids as this session is terminated SessionVirtualId = TActorId(); RemoteSessionVirtualId = TActorId(); if (Metrics) { Metrics->IncSessionDeaths(); - } + } LastSessionDieTime = TActivationContext::Now(); - + if (IncomingHandshakeActor || OutgoingHandshakeActor) { PrepareNewSessionHandshake(); } else { @@ -566,22 +566,22 @@ namespace NActors { PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev); ScheduleCleanupEventQueue(); CleanupEventQueue(); - } + } void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(STATEFN_SIG) { ICPROXY_PROFILED; // enqueue handshake request Y_UNUSED(); - PendingIncomingHandshakeEvents.emplace_back(ev); - } + PendingIncomingHandshakeEvents.emplace_back(ev); + } void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& /*ev*/) { ICPROXY_PROFILED; // TEvHandshakeDone can't get into the queue, because we have to process handshake request first; this may be the // race with the previous handshakes, so simply ignore it - } + } void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev) { ICPROXY_PROFILED; @@ -599,140 +599,140 @@ namespace NActors { break; } } - } + } void TInterconnectProxyTCP::ForwardSessionEventToSession(STATEFN_SIG) { ICPROXY_PROFILED; - Y_VERIFY(Session && SessionID); + Y_VERIFY(Session && SessionID); ValidateEvent(ev, "ForwardSessionEventToSession"); InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID)); - } + } void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) { ICPROXY_PROFILED; LOG_INFO_IC("ICP31", "proxy http called"); - TStringStream str; - - HTML(str) { - DIV_CLASS("panel panel-info") { - DIV_CLASS("panel-heading") { - str << "Proxy"; - } - DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { - TABLER() { - TABLEH() { - str << "Sensor"; - } - TABLEH() { - str << "Value"; - } - } - } -#define MON_VAR(NAME) \ - TABLER() { \ - TABLED() { \ - str << #NAME; \ - } \ - TABLED() { \ - str << NAME; \ - } \ - } - - TABLEBODY() { + TStringStream str; + + HTML(str) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Proxy"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { + str << "Sensor"; + } + TABLEH() { + str << "Value"; + } + } + } +#define MON_VAR(NAME) \ + TABLER() { \ + TABLED() { \ + str << #NAME; \ + } \ + TABLED() { \ + str << NAME; \ + } \ + } + + TABLEBODY() { MON_VAR(TActivationContext::Now()) - MON_VAR(SessionID) - MON_VAR(LastSessionDieTime) - MON_VAR(IncomingHandshakeActor) - MON_VAR(IncomingHandshakeActorFilledIn) - MON_VAR(IncomingHandshakeActorReset) - MON_VAR(OutgoingHandshakeActor) - MON_VAR(OutgoingHandshakeActorCreated) - MON_VAR(OutgoingHandshakeActorReset) - MON_VAR(State) - MON_VAR(StateSwitchTime) + MON_VAR(SessionID) + MON_VAR(LastSessionDieTime) + MON_VAR(IncomingHandshakeActor) + MON_VAR(IncomingHandshakeActorFilledIn) + MON_VAR(IncomingHandshakeActorReset) + MON_VAR(OutgoingHandshakeActor) + MON_VAR(OutgoingHandshakeActorCreated) + MON_VAR(OutgoingHandshakeActorReset) + MON_VAR(State) + MON_VAR(StateSwitchTime) } } } } - DIV_CLASS("panel panel-info") { - DIV_CLASS("panel-heading") { - str << "Error Log"; - } - DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Error Log"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { TABLER() { - TABLEH() { - str << "Timestamp"; + TABLEH() { + str << "Timestamp"; } - TABLEH() { - str << "Elapsed"; + TABLEH() { + str << "Elapsed"; } - TABLEH() { - str << "Kind"; + TABLEH() { + str << "Kind"; } - TABLEH() { - str << "Explanation"; + TABLEH() { + str << "Explanation"; } } } - TABLEBODY() { + TABLEBODY() { const TInstant now = TActivationContext::Now(); - const TInstant barrier = now - TDuration::Minutes(1); - for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) { - auto wrapper = [&](const auto& lambda) { - if (std::get<0>(*it) > barrier) { - str << "<strong>"; - lambda(); - str << "</strong>"; - } else { - lambda(); - } - }; - TABLER() { - TABLED() { - wrapper([&] { - str << std::get<0>(*it); - }); - } - TABLED() { - wrapper([&] { - str << now - std::get<0>(*it); - }); - } - TABLED() { - wrapper([&] { - str << std::get<1>(*it); - }); - } - TABLED() { - wrapper([&] { - str << std::get<2>(*it); - }); + const TInstant barrier = now - TDuration::Minutes(1); + for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) { + auto wrapper = [&](const auto& lambda) { + if (std::get<0>(*it) > barrier) { + str << "<strong>"; + lambda(); + str << "</strong>"; + } else { + lambda(); + } + }; + TABLER() { + TABLED() { + wrapper([&] { + str << std::get<0>(*it); + }); + } + TABLED() { + wrapper([&] { + str << now - std::get<0>(*it); + }); + } + TABLED() { + wrapper([&] { + str << std::get<1>(*it); + }); + } + TABLED() { + wrapper([&] { + str << std::get<2>(*it); + }); ui32 rep = std::get<3>(*it); if (rep != 1) { str << " <strong>x" << rep << "</strong>"; } - } - } - } - } + } + } + } + } } } } } - if (Session != nullptr) { + if (Session != nullptr) { Session->GenerateHttpInfo(str); - } - + } + Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); } @@ -746,16 +746,16 @@ namespace NActors { UpdateErrorStateLog(TActivationContext::Now(), "permanent conclusive", explanation); } - Y_VERIFY(Session == nullptr); - Y_VERIFY(!SessionID); + Y_VERIFY(Session == nullptr); + Y_VERIFY(!SessionID); - // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we - // sleep N times longer than the previous try, but not longer than desired number of seconds - HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero() - ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep) - : FirstErrorSleep; + // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we + // sleep N times longer than the previous try, but not longer than desired number of seconds + HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero() + ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep) + : FirstErrorSleep; - // transit to required state and arm wakeup timer + // transit to required state and arm wakeup timer if (Terminated) { // switch to this state permanently SwitchToState(__LINE__, "HoldByError", &TThis::HoldByError); @@ -765,21 +765,21 @@ namespace NActors { HoldByErrorWakeupCookie = new TEvents::TEvWakeup); } - /* Process all pending events. */ + /* Process all pending events. */ ProcessPendingSessionEvents(); - /* Terminate handshakes */ + /* Terminate handshakes */ DropHandshakes(); - /* Terminate pending incoming handshake requests. */ - for (auto& ev : PendingIncomingHandshakeEvents) { + /* Terminate pending incoming handshake requests. */ + for (auto& ev : PendingIncomingHandshakeEvents) { Send(ev->Sender, new TEvents::TEvPoisonPill); if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) { TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release())); LogHandshakeFail(tmp, true); } - } - PendingIncomingHandshakeEvents.clear(); + } + PendingIncomingHandshakeEvents.clear(); } void TInterconnectProxyTCP::WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev) { @@ -787,39 +787,39 @@ namespace NActors { LOG_INFO_IC("ICP33", "wake up from error state"); - if (ev->Get() == HoldByErrorWakeupCookie) { + if (ev->Get() == HoldByErrorWakeupCookie) { SwitchToInitialState(); - } + } } void TInterconnectProxyTCP::Disconnect() { ICPROXY_PROFILED; - // terminate handshakes (if any) + // terminate handshakes (if any) DropHandshakes(); - if (Session) { + if (Session) { IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::UserRequest()); - } else { + } else { TransitToErrorState("forced disconnect"); - } + } } void TInterconnectProxyTCP::ScheduleCleanupEventQueue() { ICPROXY_PROFILED; - if (!CleanupEventQueueScheduled && PendingSessionEvents) { + if (!CleanupEventQueueScheduled && PendingSessionEvents) { // apply batching at 50 ms granularity Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue); - CleanupEventQueueScheduled = true; - } + CleanupEventQueueScheduled = true; + } } void TInterconnectProxyTCP::HandleCleanupEventQueue() { ICPROXY_PROFILED; - Y_VERIFY(CleanupEventQueueScheduled); - CleanupEventQueueScheduled = false; + Y_VERIFY(CleanupEventQueueScheduled); + CleanupEventQueueScheduled = false; CleanupEventQueue(); ScheduleCleanupEventQueue(); } @@ -838,24 +838,24 @@ namespace NActors { } else { break; } - } + } } void TInterconnectProxyTCP::HandleClosePeerSocket() { ICPROXY_PROFILED; - if (Session && Session->Socket) { + if (Session && Session->Socket) { LOG_INFO_IC("ICP34", "closed connection by debug command"); - Session->Socket->Shutdown(SHUT_RDWR); - } + Session->Socket->Shutdown(SHUT_RDWR); + } } void TInterconnectProxyTCP::HandleCloseInputSession() { ICPROXY_PROFILED; - if (Session) { + if (Session) { IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::CloseInputSession); - } + } } void TInterconnectProxyTCP::HandlePoisonSession() { @@ -869,11 +869,11 @@ namespace NActors { void TInterconnectProxyTCP::HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev) { ICPROXY_PROFILED; - ui64 bufSize = 0; - if (Session) { - bufSize = Session->TotalOutputQueueSize; - } - + ui64 bufSize = 0; + if (Session) { + bufSize = Session->TotalOutputQueueSize; + } + Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize)); } |