diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-10 16:47:40 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:40 +0300 |
commit | 667a4ee7da2e004784b9c3cfab824a81e96f4d66 (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | f3646f91e0de459836a7800b9ce3e8dc57a2ab3a (diff) | |
download | ydb-667a4ee7da2e004784b9c3cfab824a81e96f4d66.tar.gz |
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 1620 |
1 files changed, 810 insertions, 810 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 8f13d89f35..2ded7f9f53 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -1,40 +1,40 @@ -#include "interconnect_tcp_proxy.h" -#include "interconnect_tcp_session.h" -#include "interconnect_handshake.h" - +#include "interconnect_tcp_proxy.h" +#include "interconnect_tcp_session.h" +#include "interconnect_handshake.h" + #include <library/cpp/actors/core/probes.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/monlib/service/pages/templates.h> - -namespace NActors { + +namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - + DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); - - template<typename T> - T Coalesce(T&& x) { - return x; + + template<typename T> + T Coalesce(T&& x) { + return x; } - - template<typename T, typename T2, typename... TRest> - typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) { - if (first != typename std::remove_reference<T>::type()) { - return first; - } else { - return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...); - } + + template<typename T, typename T2, typename... TRest> + typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) { + if (first != typename std::remove_reference<T>::type()) { + return first; + } else { + return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...); + } } - - TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params) + + TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params) : TActor(&TInterconnectSessionTCP::StateFunc) - , Created(TInstant::Now()) + , Created(TInstant::Now()) , Proxy(proxy) - , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this)) - , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this)) - , Params(std::move(params)) + , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this)) + , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this)) + , Params(std::move(params)) , TotalOutputQueueSize(0) , OutputStuckFlag(false) , OutputQueueUtilization(16) @@ -42,92 +42,92 @@ namespace NActors { { Proxy->Metrics->SetConnected(0); ReceiveContext.Reset(new TReceiveContext); - } - - TInterconnectSessionTCP::~TInterconnectSessionTCP() { - // close socket ASAP when actor system is being shut down - if (Socket) { - Socket->Shutdown(SHUT_RDWR); - } - } - - void TInterconnectSessionTCP::Init() { - auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder<IEventBase> event) { - as->Send(id, event.Release()); - }; - Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback)); + } + + TInterconnectSessionTCP::~TInterconnectSessionTCP() { + // close socket ASAP when actor system is being shut down + if (Socket) { + Socket->Shutdown(SHUT_RDWR); + } + } + + void TInterconnectSessionTCP::Init() { + auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder<IEventBase> event) { + as->Send(id, event.Release()); + }; + Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback)); ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool, - Proxy->Common->Settings.MaxSerializedEventSize, Params); - - LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); + Proxy->Common->Settings.MaxSerializedEventSize, Params); + + LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId)); - SendUpdateToWhiteboard(); + SendUpdateToWhiteboard(); } - - void TInterconnectSessionTCP::CloseInputSession() { - Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession); - } - - void TInterconnectSessionTCP::Handle(TEvTerminate::TPtr& ev) { - Terminate(ev->Get()->Reason); - } - - void TInterconnectSessionTCP::HandlePoison() { - Terminate(TDisconnectReason()); - } - - void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) { - LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1)); - - IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this); - ShutdownSocket(std::move(reason)); - + + void TInterconnectSessionTCP::CloseInputSession() { + Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession); + } + + void TInterconnectSessionTCP::Handle(TEvTerminate::TPtr& ev) { + Terminate(ev->Get()->Reason); + } + + void TInterconnectSessionTCP::HandlePoison() { + Terminate(TDisconnectReason()); + } + + void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) { + LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1)); + + IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this); + ShutdownSocket(std::move(reason)); + for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); } Proxy->Metrics->SubSubscribersCount(Subscribers.size()); - Subscribers.clear(); - - ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { - channel.NotifyUndelivered(); - }); - - if (ReceiverId) { - Send(ReceiverId, new TEvents::TEvPoisonPill); - } - - SendUpdateToWhiteboard(false); - + Subscribers.clear(); + + ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { + channel.NotifyUndelivered(); + }); + + if (ReceiverId) { + Send(ReceiverId, new TEvents::TEvPoisonPill); + } + + SendUpdateToWhiteboard(false); + Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize); Proxy->Metrics->SubInflightDataAmount(InflightDataAmount); - - LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId); - if (!Subscribers.empty()) { + LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId); + + if (!Subscribers.empty()) { Proxy->Metrics->SubSubscribersCount(Subscribers.size()); - } - - TActor::PassAway(); + } + + TActor::PassAway(); + } + + void TInterconnectSessionTCP::PassAway() { + Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); } - - void TInterconnectSessionTCP::PassAway() { - Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); - } - - void TInterconnectSessionTCP::Forward(STATEFN_SIG) { - Proxy->ValidateEvent(ev, "Forward"); - + + void TInterconnectSessionTCP::Forward(STATEFN_SIG) { + Proxy->ValidateEvent(ev, "Forward"); + LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); ++MessagesGot; - + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { - Subscribe(ev); + Subscribe(ev); } - + ui16 evChannel = ev->GetChannel(); - auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); - const bool wasWorking = oChannel.IsWorking(); - + auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); + const bool wasWorking = oChannel.IsWorking(); + const auto [dataSize, event] = oChannel.Push(*ev); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); @@ -137,268 +137,268 @@ namespace NActors { // this channel has returned to work -- it was empty and this we have just put first event in the queue ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); } - + SetOutputStuckFlag(true); - ++NumEventsInReadyChannels; - + ++NumEventsInReadyChannels; + LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); - WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, + WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, QueueSizeInEvents = oChannel.GetQueueSize(), - QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); - + QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); + // check for overloaded queues - ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); + ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) { - LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64, + LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64, Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit); - return Terminate(TDisconnectReason::QueueOverload()); + return Terminate(TDisconnectReason::QueueOverload()); } - - ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20); + + ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20); if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { - LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size"); - if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) { - CreateSessionKillingActor(Proxy->Common); + LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size"); + if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) { + CreateSessionKillingActor(Proxy->Common); } } - - if (RamInQueue && !RamInQueue->Batching) { - // we have pending TEvRam, so GenerateTraffic will be called no matter what - } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) { - // we can't issue more traffic now; GenerateTraffic will be called upon unblocking - } else if (TotalOutputQueueSize >= 64 * 1024) { - // output queue size is quite big to issue some traffic - GenerateTraffic(); - } else if (!RamInQueue) { - Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); - RamInQueue = new TEvRam(true); - auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); - const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; - if (batchPeriod != TDuration()) { - TActivationContext::Schedule(batchPeriod, ev); - } else { - TActivationContext::Send(ev); - } + + if (RamInQueue && !RamInQueue->Batching) { + // we have pending TEvRam, so GenerateTraffic will be called no matter what + } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) { + // we can't issue more traffic now; GenerateTraffic will be called upon unblocking + } else if (TotalOutputQueueSize >= 64 * 1024) { + // output queue size is quite big to issue some traffic + GenerateTraffic(); + } else if (!RamInQueue) { + Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); + RamInQueue = new TEvRam(true); + auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); + const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; + if (batchPeriod != TDuration()) { + TActivationContext::Schedule(batchPeriod, ev); + } else { + TActivationContext::Send(ev); + } LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat()); - LOG_DEBUG_IC_SESSION("ICS17", "batching started"); - } + LOG_DEBUG_IC_SESSION("ICS17", "batching started"); + } } - void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { + void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); - if (inserted) { + if (inserted) { Proxy->Metrics->IncSubscribersCount(); } else { it->second = ev->Cookie; - } + } Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie); - } - - void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { + } + + void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } - - THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) { - TEvHandshakeAsk *msg = ev->Get(); - + + THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) { + TEvHandshakeAsk *msg = ev->Get(); + // close existing input session, if any, and do nothing upon its destruction - ReestablishConnection({}, false, TDisconnectReason::NewSession()); - const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial(); - - LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64, - msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial); - - return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params); + ReestablishConnection({}, false, TDisconnectReason::NewSession()); + const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial(); + + LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64, + msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial); + + return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params); } - - void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) { + + void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) { if (ReceiverId) { // upon destruction of input session actor invoke this callback again - ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession()); + ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession()); return; } - - LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64, - ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(), - i64(*ev->Get()->Socket)); - - NewConnectionSet = TActivationContext::Now(); + + LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64, + ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(), + i64(*ev->Get()->Socket)); + + NewConnectionSet = TActivationContext::Now(); PacketsWrittenToSocket = 0; - - SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); - Socket = std::move(ev->Get()->Socket); - - // there may be a race - const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); - + + SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); + Socket = std::move(ev->Get()->Socket); + + // there may be a race + const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); + // arm watchdogs - CloseOnIdleWatchdog.Arm(SelfId()); - + CloseOnIdleWatchdog.Arm(SelfId()); + // reset activity timestamps - LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); - - LOG_INFO_IC_SESSION("ICS10", "traffic start"); - + LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); + + LOG_INFO_IC_SESSION("ICS10", "traffic start"); + // create input session actor - auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, + auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); - ReceiveContext->UnlockLastProcessedPacketSerial(); + ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); - + // register our socket in poller actor - LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); - const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); - Y_VERIFY(success); - ReceiveContext->WriteBlockedByFullSendBuffer = false; - - LostConnectionWatchdog.Disarm(); + LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); + const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); + Y_VERIFY(success); + ReceiveContext->WriteBlockedByFullSendBuffer = false; + + LostConnectionWatchdog.Disarm(); Proxy->Metrics->SetConnected(1); - LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId); - + LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId); + // arm pinger timer - ResetFlushLogic(); - + ResetFlushLogic(); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // REINITIALIZE SEND QUEUE // // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other // auxiliary packets; also reset packet metrics to zero to start sending from the beginning // also reset SendQueuePos - + // drop confirmed packets first as we do not need unwanted retransmissions SendQueuePos = SendQueue.end(); - DropConfirmed(nextPacket); - - for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) { + DropConfirmed(nextPacket); + + for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) { const TSendQueue::iterator next = std::next(it); if (it->IsEmpty()) { - SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it); + SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it); } else { it->ResetBufs(); } it = next; } - TrimSendQueueCache(); + TrimSendQueueCache(); SendQueuePos = SendQueue.begin(); - - TMaybe<ui64> s; - for (auto it = SendQueuePos; it != SendQueue.end(); ++it) { - if (!it->IsEmpty()) { - s = it->GetSerial(); - } - } - const ui64 serial = s.GetOrElse(Max<ui64>()); - + + TMaybe<ui64> s; + for (auto it = SendQueuePos; it != SendQueue.end(); ++it) { + if (!it->IsEmpty()) { + s = it->GetSerial(); + } + } + const ui64 serial = s.GetOrElse(Max<ui64>()); + Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed); - LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n", - SendQueue.size(), LastConfirmed, serial); - + LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n", + SendQueue.size(), LastConfirmed, serial); + BytesUnwritten = 0; for (const auto& packet : SendQueue) { - BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + - packet.GetDataSize(); + BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + + packet.GetDataSize(); } - + SwitchStuckPeriod(); - - LastHandshakeDone = TActivationContext::Now(); - - RamInQueue = nullptr; - GenerateTraffic(); - } - - void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) { + + LastHandshakeDone = TActivationContext::Now(); + + RamInQueue = nullptr; + GenerateTraffic(); + } + + void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) { if (ev->Sender == ReceiverId) { TEvUpdateFromInputSession& msg = *ev->Get(); - - // update ping time - Ping = msg.Ping; + + // update ping time + Ping = msg.Ping; LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat()); - + bool needConfirm = false; - + // update activity timer for dead peer checker - LastInputActivityTimestamp = TActivationContext::Now(); - - if (msg.NumDataBytes) { + LastInputActivityTimestamp = TActivationContext::Now(); + + if (msg.NumDataBytes) { UnconfirmedBytes += msg.NumDataBytes; - if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) { + if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) { needConfirm = true; } else { - SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod); + SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod); } - + // reset payload watchdog that controls close-on-idle behaviour - LastPayloadActivityTimestamp = TActivationContext::Now(); - CloseOnIdleWatchdog.Reset(); + LastPayloadActivityTimestamp = TActivationContext::Now(); + CloseOnIdleWatchdog.Reset(); } - - bool unblockedSomething = false; - LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { - unblockedSomething = DropConfirmed(msg.ConfirmedByInput); + + bool unblockedSomething = false; + LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { + unblockedSomething = DropConfirmed(msg.ConfirmedByInput); } - - // generate more traffic if we have unblocked state now - if (unblockedSomething) { + + // generate more traffic if we have unblocked state now + if (unblockedSomething) { LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); - GenerateTraffic(); + GenerateTraffic(); } - + // if we haven't generated any packets, then make a lone Flush packet without any data if (needConfirm && Socket) { ++ConfirmPacketsForcedBySize; - MakePacket(false); - } - - for (;;) { - switch (EUpdateState state = ReceiveContext->UpdateState) { - case EUpdateState::NONE: - case EUpdateState::CONFIRMING: - Y_FAIL("unexpected state"); - - case EUpdateState::INFLIGHT: - // this message we are processing was the only one in flight, so we can reset state to NONE here - if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) { - return; - } - break; - - case EUpdateState::INFLIGHT_AND_PENDING: - // there is more messages pending from the input session actor, so we have to inform it to release - // that message - if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) { - Send(ev->Sender, new TEvConfirmUpdate); - return; - } - break; - } - } - } + MakePacket(false); + } + + for (;;) { + switch (EUpdateState state = ReceiveContext->UpdateState) { + case EUpdateState::NONE: + case EUpdateState::CONFIRMING: + Y_FAIL("unexpected state"); + + case EUpdateState::INFLIGHT: + // this message we are processing was the only one in flight, so we can reset state to NONE here + if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) { + return; + } + break; + + case EUpdateState::INFLIGHT_AND_PENDING: + // there is more messages pending from the input session actor, so we have to inform it to release + // that message + if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) { + Send(ev->Sender, new TEvConfirmUpdate); + return; + } + break; + } + } + } } - - void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) { - if (ev->Get() == RamInQueue) { + + void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) { + if (ev->Get() == RamInQueue) { LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); - RamInQueue = nullptr; - GenerateTraffic(); - } - } - - void TInterconnectSessionTCP::GenerateTraffic() { - // generate ping request, if needed - IssuePingRequest(); - - if (RamInQueue && !RamInQueue->Batching) { + RamInQueue = nullptr; + GenerateTraffic(); + } + } + + void TInterconnectSessionTCP::GenerateTraffic() { + // generate ping request, if needed + IssuePingRequest(); + + if (RamInQueue && !RamInQueue->Batching) { LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0); - return; // we'll do it a bit later - } else { - RamInQueue = nullptr; - } - - LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic"); - + return; // we'll do it a bit later + } else { + RamInQueue = nullptr; + } + + LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic"); + // There is a tradeoff between fairness and efficiency. // The less traffic is generated here, the less buffering is after fair scheduler, // the more fair system is, the less latency is present. @@ -406,27 +406,27 @@ namespace NActors { // the less cpu is consumed. static const ui64 generateLimit = 64 * 1024; - const ui64 sizeBefore = TotalOutputQueueSize; + const ui64 sizeBefore = TotalOutputQueueSize; ui32 generatedPackets = 0; ui64 generatedBytes = 0; ui64 generateStarted = GetCycleCountFast(); - - // apply traffic changes - auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); }; - + + // apply traffic changes + auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); }; + // first, we create as many data packets as we can generate under certain conditions; they include presence // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions // we exit cycle while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) { if (generatedBytes >= generateLimit) { // resume later but ensure that we have issued at least one packet - RamInQueue = new TEvRam(false); - Send(SelfId(), RamInQueue); + RamInQueue = new TEvRam(false); + Send(SelfId(), RamInQueue); RamStartedCycles = GetCycleCountFast(); LWPROBE(StartRam, Proxy->PeerNodeId); break; - } - + } + try { generatedBytes += MakePacket(true); ++generatedPackets; @@ -435,292 +435,292 @@ namespace NActors { accountTraffic(); LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); return Terminate(TDisconnectReason::EventTooLarge()); - } - } - + } + } + if (Socket) { WriteData(); } LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes); - accountTraffic(); - EqualizeCounter += ChannelScheduler->Equalize(); - } - - void TInterconnectSessionTCP::StartHandshake() { - LOG_INFO_IC_SESSION("ICS15", "start handshake"); - IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial()); + accountTraffic(); + EqualizeCounter += ChannelScheduler->Equalize(); } - void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) { - ReestablishConnection({}, true, std::move(reason)); + void TInterconnectSessionTCP::StartHandshake() { + LOG_INFO_IC_SESSION("ICS15", "start handshake"); + IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial()); } - - void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose, - TDisconnectReason reason) { + + void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) { + ReestablishConnection({}, true, std::move(reason)); + } + + void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose, + TDisconnectReason reason) { if (Socket) { - LOG_INFO_IC_SESSION("ICS13", "reestablish connection"); - ShutdownSocket(std::move(reason)); // stop sending/receiving on socket - PendingHandshakeDoneEvent = std::move(ev); - StartHandshakeOnSessionClose = startHandshakeOnSessionClose; + LOG_INFO_IC_SESSION("ICS13", "reestablish connection"); + ShutdownSocket(std::move(reason)); // stop sending/receiving on socket + PendingHandshakeDoneEvent = std::move(ev); + StartHandshakeOnSessionClose = startHandshakeOnSessionClose; if (!ReceiverId) { - ReestablishConnectionExecute(); + ReestablishConnectionExecute(); } - } - } - - void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) { + } + } + + void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) { if (ev->Sender == ReceiverId) { const bool wasConnected(Socket); - LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data()); + LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data()); ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet if (wasConnected) { // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should // restart handshake process, closing our part of socket first - ShutdownSocket(ev->Get()->Reason); - StartHandshake(); + ShutdownSocket(ev->Get()->Reason); + StartHandshake(); } else { - ReestablishConnectionExecute(); + ReestablishConnectionExecute(); } - } - } - - void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { + } + } + + void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { if (Socket) { - if (const TString& s = reason.ToString()) { + if (const TString& s = reason.ToString()) { Proxy->Metrics->IncDisconnectByReason(s); - } - - LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data()); - Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data()); + } + + LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data()); + Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data()); Socket->Shutdown(SHUT_RDWR); Socket.Reset(); Proxy->Metrics->IncDisconnections(); CloseOnIdleWatchdog.Disarm(); - LostConnectionWatchdog.Arm(SelfId()); + LostConnectionWatchdog.Arm(SelfId()); Proxy->Metrics->SetConnected(0); - LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); + LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } - } - - void TInterconnectSessionTCP::ReestablishConnectionExecute() { - bool startHandshakeOnSessionClose = std::exchange(StartHandshakeOnSessionClose, false); - TEvHandshakeDone::TPtr ev = std::move(PendingHandshakeDoneEvent); - - if (startHandshakeOnSessionClose) { - StartHandshake(); - } else if (ev) { - SetNewConnection(ev); + } + + void TInterconnectSessionTCP::ReestablishConnectionExecute() { + bool startHandshakeOnSessionClose = std::exchange(StartHandshakeOnSessionClose, false); + TEvHandshakeDone::TPtr ev = std::move(PendingHandshakeDoneEvent); + + if (startHandshakeOnSessionClose) { + StartHandshake(); + } else if (ev) { + SetNewConnection(ev); } - } - - void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) { - LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s", - ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false"); - if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) { + } + + void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) { + LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s", + ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false"); + if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) { Proxy->Metrics->IncUsefulWriteWakeups(); ui64 nowCycles = GetCycleCountFast(); double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0; LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0); WriteBlockedTotal += TDuration::MicroSeconds(blockedUs); - GenerateTraffic(); - } else if (!ev->Cookie) { + GenerateTraffic(); + } else if (!ev->Cookie) { Proxy->Metrics->IncSpuriousWriteWakeups(); } - if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { - Send(ReceiverId, ev->Release().Release(), 0, 1); - } - } - - void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { - PollerToken = std::move(ev->Get()->PollerToken); - if (ReceiveContext->WriteBlockedByFullSendBuffer) { - if (Params.Encryption) { - auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); - PollerToken->Request(secure->WantRead(), secure->WantWrite()); - } else { - PollerToken->Request(false, true); - } - } - } - - void TInterconnectSessionTCP::WriteData() { - ui64 written = 0; - + if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { + Send(ReceiverId, ev->Release().Release(), 0, 1); + } + } + + void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { + PollerToken = std::move(ev->Get()->PollerToken); + if (ReceiveContext->WriteBlockedByFullSendBuffer) { + if (Params.Encryption) { + auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); + PollerToken->Request(secure->WantRead(), secure->WantWrite()); + } else { + PollerToken->Request(false, true); + } + } + } + + void TInterconnectSessionTCP::WriteData() { + ui64 written = 0; + Y_VERIFY(Socket); // ensure that socket wasn't closed - - LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { + + LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { constexpr ui32 iovLimit = 256; -#ifdef _linux_ - ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX)); -#else - ui32 maxElementsInIOV = 64; -#endif - if (Params.Encryption) { - maxElementsInIOV = 1; - } - +#ifdef _linux_ + ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX)); +#else + ui32 maxElementsInIOV = 64; +#endif + if (Params.Encryption) { + maxElementsInIOV = 1; + } + // vector of write buffers with preallocated stack space TStackVec<TConstIoVec, iovLimit> wbuffers; - - LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", - ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); - - // update last confirmed packet number if it has changed - if (SendQueuePos != SendQueue.end()) { - SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial()); - } - - while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) { - for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) { + + LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", + ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); + + // update last confirmed packet number if it has changed + if (SendQueuePos != SendQueue.end()) { + SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial()); + } + + while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) { + for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) { it->AppendToIoVector(wbuffers, maxElementsInIOV); - } - + } + const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); int iovcnt = wbuffers.size(); - + Y_VERIFY(iovcnt > 0); Y_VERIFY(iovec->iov_len > 0); - - TString err; - ssize_t r = 0; - do { -#ifndef _win_ - r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt); -#else - r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err); -#endif + + TString err; + ssize_t r = 0; + do { +#ifndef _win_ + r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt); +#else + r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err); +#endif Proxy->Metrics->IncSendSyscalls(); - } while (r == -EINTR); - - LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); - + } while (r == -EINTR); + + LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); + wbuffers.clear(); - + if (r > 0) { Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); BytesUnwritten -= r; - written += r; + written += r; ui64 packets = 0; - + // advance SendQueuePos to eat all processed items for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { - if (!SendQueuePos->IsEmpty()) { - LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial()); - } + if (!SendQueuePos->IsEmpty()) { + LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial()); + } ++PacketsWrittenToSocket; ++packets; LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); - } else if (-r != EAGAIN && -r != EWOULDBLOCK) { - const TString message = r == 0 ? "connection closed by peer" - : err ? err - : Sprintf("socket: %s", strerror(-r)); + } else if (-r != EAGAIN && -r != EWOULDBLOCK) { + const TString message = r == 0 ? "connection closed by peer" + : err ? err + : Sprintf("socket: %s", strerror(-r)); LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); - if (written) { + if (written) { Proxy->Metrics->AddTotalBytesWritten(written); - } - return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); + } + return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); } else { - // we have to do some hack for secure socket -- mark the packet as 'tried writing' - if (Params.Encryption) { - Y_VERIFY(SendQueuePos != SendQueue.end()); - SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL - } - + // we have to do some hack for secure socket -- mark the packet as 'tried writing' + if (Params.Encryption) { + Y_VERIFY(SendQueuePos != SendQueue.end()); + SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL + } + // we have received EAGAIN error code, this means that we can't issue more data until we have received // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event - Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); - ReceiveContext->WriteBlockedByFullSendBuffer = true; + Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); + ReceiveContext->WriteBlockedByFullSendBuffer = true; WriteBlockedCycles = GetCycleCountFast(); LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written); - LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit"); - - if (PollerToken) { - if (Params.Encryption) { - auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); - PollerToken->Request(secure->WantRead(), secure->WantWrite()); - } else { - PollerToken->Request(false, true); - } + LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit"); + + if (PollerToken) { + if (Params.Encryption) { + auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); + PollerToken->Request(secure->WantRead(), secure->WantWrite()); + } else { + PollerToken->Request(false, true); + } } - } - } - } - if (written) { + } + } + } + if (written) { Proxy->Metrics->AddTotalBytesWritten(written); - } - } - - void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { + } + } + + void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { if (period != TDuration::Max()) { - const TInstant when = TActivationContext::Now() + period; + const TInstant when = TActivationContext::Now() + period; if (when < ForcePacketTimestamp) { ForcePacketTimestamp = when; - ScheduleFlush(); + ScheduleFlush(); + } + } + } + + void TInterconnectSessionTCP::ScheduleFlush() { + if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { + Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); + FlushSchedule.push(ForcePacketTimestamp); + MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); + ++FlushEventsScheduled; + } + } + + void TInterconnectSessionTCP::HandleFlush() { + const TInstant now = TActivationContext::Now(); + while (FlushSchedule && now >= FlushSchedule.top()) { + FlushSchedule.pop(); + } + IssuePingRequest(); + if (Socket) { + if (now >= ForcePacketTimestamp) { + ++ConfirmPacketsForcedByTimeout; + ++FlushEventsProcessed; + MakePacket(false); // just generate confirmation packet if we have preconditions for this + } else if (ForcePacketTimestamp != TInstant::Max()) { + ScheduleFlush(); } - } - } - - void TInterconnectSessionTCP::ScheduleFlush() { - if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { - Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); - FlushSchedule.push(ForcePacketTimestamp); - MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); - ++FlushEventsScheduled; } - } - - void TInterconnectSessionTCP::HandleFlush() { - const TInstant now = TActivationContext::Now(); - while (FlushSchedule && now >= FlushSchedule.top()) { - FlushSchedule.pop(); - } - IssuePingRequest(); - if (Socket) { - if (now >= ForcePacketTimestamp) { - ++ConfirmPacketsForcedByTimeout; - ++FlushEventsProcessed; - MakePacket(false); // just generate confirmation packet if we have preconditions for this - } else if (ForcePacketTimestamp != TInstant::Max()) { - ScheduleFlush(); - } - } - } - - void TInterconnectSessionTCP::ResetFlushLogic() { + } + + void TInterconnectSessionTCP::ResetFlushLogic() { ForcePacketTimestamp = TInstant::Max(); UnconfirmedBytes = 0; - const TDuration ping = Proxy->Common->Settings.PingPeriod; - if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { - SetForcePacketTimestamp(ping); + const TDuration ping = Proxy->Common->Settings.PingPeriod; + if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { + SetForcePacketTimestamp(ping); + } + } + + void TInterconnectSessionTCP::TrimSendQueueCache() { + static constexpr size_t maxItems = 32; + static constexpr size_t trimThreshold = maxItems * 2; + if (SendQueueCache.size() >= trimThreshold) { + auto it = SendQueueCache.end(); + for (size_t n = SendQueueCache.size() - maxItems; n; --n) { + --it; + } + + auto ev = std::make_unique<TEvFreeItems>(); + ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end()); + ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask); + if (ev->GetInLineForDestruction(Proxy->Common)) { + Send(Proxy->Common->DestructorId, ev.release()); + } } - } - - void TInterconnectSessionTCP::TrimSendQueueCache() { - static constexpr size_t maxItems = 32; - static constexpr size_t trimThreshold = maxItems * 2; - if (SendQueueCache.size() >= trimThreshold) { - auto it = SendQueueCache.end(); - for (size_t n = SendQueueCache.size() - maxItems; n; --n) { - --it; - } - - auto ev = std::make_unique<TEvFreeItems>(); - ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end()); - ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask); - if (ev->GetInLineForDestruction(Proxy->Common)) { - Send(Proxy->Common->DestructorId, ev.release()); - } - } - } - + } + ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { Y_VERIFY(Socket); - + TSendQueue::iterator packet; if (SendQueueCache) { // we have entries in cache, take one and move it to the end of SendQueue @@ -729,191 +729,191 @@ namespace NActors { packet->Reuse(); // reset packet to initial state } else { // we have to allocate new packet, so just do it - LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) { - packet = SendQueue.emplace(SendQueue.end(), Params); + LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) { + packet = SendQueue.emplace(SendQueue.end(), Params); } - } - - // update send queue position - if (SendQueuePos == SendQueue.end()) { - SendQueuePos = packet; // start sending this packet if we are not sending anything for now - } - - ui64 serial = 0; - + } + + // update send queue position + if (SendQueuePos == SendQueue.end()) { + SendQueuePos = packet; // start sending this packet if we are not sending anything for now + } + + ui64 serial = 0; + if (data) { - // generate serial for this data packet - serial = ++OutputCounter; - - // fill the data packet - Y_VERIFY(NumEventsInReadyChannels); - LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { - FillSendingBuffer(*packet, serial); + // generate serial for this data packet + serial = ++OutputCounter; + + // fill the data packet + Y_VERIFY(NumEventsInReadyChannels); + LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { + FillSendingBuffer(*packet, serial); } - Y_VERIFY(!packet->IsEmpty()); - - InflightDataAmount += packet->GetDataSize(); + Y_VERIFY(!packet->IsEmpty()); + + InflightDataAmount += packet->GetDataSize(); Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); - if (InflightDataAmount > GetTotalInflightAmountOfData()) { + if (InflightDataAmount > GetTotalInflightAmountOfData()) { Proxy->Metrics->IncInflyLimitReach(); } - + if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast()); AtomicSet(ReceiveContext->ControlPacketId, OutputCounter); } // update payload activity timer - LastPayloadActivityTimestamp = TActivationContext::Now(); - } else if (pingMask) { - serial = *pingMask; - - // make this packet a priority one - if (SendQueuePos != packet) { - Y_VERIFY(SendQueuePos != SendQueue.end()); - if (SendQueuePos->IsAtBegin()) { - // insert this packet just before the next being sent and step back - SendQueue.splice(SendQueuePos, SendQueue, packet); - --SendQueuePos; - Y_VERIFY(SendQueuePos == packet); - } else { - // current packet is already being sent, so move new packet just after it - SendQueue.splice(std::next(SendQueuePos), SendQueue, packet); - } - } + LastPayloadActivityTimestamp = TActivationContext::Now(); + } else if (pingMask) { + serial = *pingMask; + + // make this packet a priority one + if (SendQueuePos != packet) { + Y_VERIFY(SendQueuePos != SendQueue.end()); + if (SendQueuePos->IsAtBegin()) { + // insert this packet just before the next being sent and step back + SendQueue.splice(SendQueuePos, SendQueue, packet); + --SendQueuePos; + Y_VERIFY(SendQueuePos == packet); + } else { + // current packet is already being sent, so move new packet just after it + SendQueue.splice(std::next(SendQueuePos), SendQueue, packet); + } + } } - - const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial(); - packet->SetMetadata(serial, lastInputSerial); + + const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial(); + packet->SetMetadata(serial, lastInputSerial); packet->Sign(); - + // count number of bytes pending for write ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize(); BytesUnwritten += packetSize; - - LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" - " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(), - InflightDataAmount, BytesUnwritten); - + + LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" + " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(), + InflightDataAmount, BytesUnwritten); + // reset forced packet sending timestamp as we have confirmed all received data - ResetFlushLogic(); - + ResetFlushLogic(); + ++PacketsGenerated; LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); - + if (!data) { - WriteData(); + WriteData(); } return packetSize; - } - - bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { - LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); - - Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, - "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64, - LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial); + } + + bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { + LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); + + Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, + "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64, + LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial); LastConfirmed = confirm; - + ui64 droppedDataAmount = 0; ui32 numDropped = 0; - + // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively // making Serial <= confirm true TSendQueue::iterator it; - ui64 lastDroppedSerial = 0; - for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) { - if (!it->IsEmpty()) { - lastDroppedSerial = it->GetSerial(); - } - droppedDataAmount += it->GetDataSize(); + ui64 lastDroppedSerial = 0; + for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) { + if (!it->IsEmpty()) { + lastDroppedSerial = it->GetSerial(); + } + droppedDataAmount += it->GetDataSize(); ++numDropped; - } - SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it); - TrimSendQueueCache(); - ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { - channel.DropConfirmed(lastDroppedSerial); - }); - + } + SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it); + TrimSendQueueCache(); + ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { + channel.DropConfirmed(lastDroppedSerial); + }); + const ui64 current = InflightDataAmount; const ui64 limit = GetTotalInflightAmountOfData(); const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount; - + PacketsConfirmed += numDropped; - InflightDataAmount -= droppedDataAmount; + InflightDataAmount -= droppedDataAmount; Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); - - LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes" - " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped); - - Pool->Trim(); // send any unsent free requests - - return unblockedSomething; + + LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes" + " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped); + + Pool->Trim(); // send any unsent free requests + + return unblockedSomething; } - - void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { - ui32 bytesGenerated = 0; - - Y_VERIFY(NumEventsInReadyChannels); - while (NumEventsInReadyChannels) { - TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight(); - Y_VERIFY_DEBUG(!channel->IsEmpty()); - - // generate some data within this channel - const ui64 netBefore = channel->GetBufferedAmountOfData(); - ui64 gross = 0; - const bool eventDone = channel->FeedBuf(task, serial, &gross); - channel->UnaccountedTraffic += gross; - const ui64 netAfter = channel->GetBufferedAmountOfData(); - Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink - const ui64 net = netBefore - netAfter; // number of net bytes serialized - + + void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { + ui32 bytesGenerated = 0; + + Y_VERIFY(NumEventsInReadyChannels); + while (NumEventsInReadyChannels) { + TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight(); + Y_VERIFY_DEBUG(!channel->IsEmpty()); + + // generate some data within this channel + const ui64 netBefore = channel->GetBufferedAmountOfData(); + ui64 gross = 0; + const bool eventDone = channel->FeedBuf(task, serial, &gross); + channel->UnaccountedTraffic += gross; + const ui64 netAfter = channel->GetBufferedAmountOfData(); + Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink + const ui64 net = netBefore - netAfter; // number of net bytes serialized + // adjust metrics for local and global queue size - TotalOutputQueueSize -= net; + TotalOutputQueueSize -= net; Proxy->Metrics->SubOutputBuffersTotalSize(net); - bytesGenerated += gross; - Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross); - - // return it back to queue or delete, depending on whether this channel is still working or not - ChannelScheduler->FinishPick(gross, EqualizeCounter); - - // update some stats if the packet was fully serialized - if (eventDone) { - ++MessagesWrittenToBuffer; - - Y_VERIFY(NumEventsInReadyChannels); - --NumEventsInReadyChannels; - - if (!NumEventsInReadyChannels) { - SetOutputStuckFlag(false); - } - } - - if (!gross) { // no progress -- almost full packet buffer - break; - } - } - + bytesGenerated += gross; + Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross); + + // return it back to queue or delete, depending on whether this channel is still working or not + ChannelScheduler->FinishPick(gross, EqualizeCounter); + + // update some stats if the packet was fully serialized + if (eventDone) { + ++MessagesWrittenToBuffer; + + Y_VERIFY(NumEventsInReadyChannels); + --NumEventsInReadyChannels; + + if (!NumEventsInReadyChannels) { + SetOutputStuckFlag(false); + } + } + + if (!gross) { // no progress -- almost full packet buffer + break; + } + } + LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal); - Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization - } - - ui32 TInterconnectSessionTCP::CalculateQueueUtilization() { - SwitchStuckPeriod(); - ui64 sumBusy = 0, sumPeriod = 0; - for (auto iter = OutputQueueUtilization.begin(); iter != OutputQueueUtilization.end() - 1; ++iter) { - sumBusy += iter->first; - sumPeriod += iter->second; - } - return sumBusy * 1000000 / sumPeriod; - } - - void TInterconnectSessionTCP::SendUpdateToWhiteboard(bool connected) { - const ui32 utilization = Socket ? CalculateQueueUtilization() : 0; - - if (const auto& callback = Proxy->Common->UpdateWhiteboard) { + Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization + } + + ui32 TInterconnectSessionTCP::CalculateQueueUtilization() { + SwitchStuckPeriod(); + ui64 sumBusy = 0, sumPeriod = 0; + for (auto iter = OutputQueueUtilization.begin(); iter != OutputQueueUtilization.end() - 1; ++iter) { + sumBusy += iter->first; + sumPeriod += iter->second; + } + return sumBusy * 1000000 / sumPeriod; + } + + void TInterconnectSessionTCP::SendUpdateToWhiteboard(bool connected) { + const ui32 utilization = Socket ? CalculateQueueUtilization() : 0; + + if (const auto& callback = Proxy->Common->UpdateWhiteboard) { enum class EFlag { GREEN, YELLOW, @@ -921,59 +921,59 @@ namespace NActors { RED, }; EFlag flagState = EFlag::RED; - + if (Socket) { flagState = EFlag::GREEN; - + do { - auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; + auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { flagState = EFlag::ORANGE; break; } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) { flagState = EFlag::YELLOW; } - + // check utilization - if (utilization > 875000) { // 7/8 + if (utilization > 875000) { // 7/8 flagState = EFlag::ORANGE; break; - } else if (utilization > 500000) { // 1/2 + } else if (utilization > 500000) { // 1/2 flagState = EFlag::YELLOW; } } while (false); } - + callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), connected, flagState == EFlag::GREEN, flagState == EFlag::YELLOW, flagState == EFlag::ORANGE, flagState == EFlag::RED, - TlsActivationContext->ExecutorThread.ActorSystem); - } - - if (connected) { - Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); - } - } - + TlsActivationContext->ExecutorThread.ActorSystem); + } + + if (connected) { + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); + } + } + void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) { if (OutputStuckFlag == state) return; - + if (OutputQueueUtilization.Size() == 0) return; - + auto& lastpair = OutputQueueUtilization.Last(); if (state) lastpair.first -= GetCycleCountFast(); else lastpair.first += GetCycleCountFast(); - + OutputStuckFlag = state; } - + void TInterconnectSessionTCP::SwitchStuckPeriod() { auto now = GetCycleCountFast(); if (OutputQueueUtilization.Size() != 0) { @@ -984,51 +984,51 @@ namespace NActors { } OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now)); - if (OutputStuckFlag) + if (OutputStuckFlag) OutputQueueUtilization.Last().first -= now; - } - - TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const { - return Coalesce(Proxy->Common->Settings.DeadPeer, DEFAULT_DEADPEER_TIMEOUT); - } - - TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const { - return Proxy->Common->Settings.CloseOnIdle; - } - - TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const { - return Coalesce(Proxy->Common->Settings.LostConnection, DEFAULT_LOST_CONNECTION_TIMEOUT); - } - - ui32 TInterconnectSessionTCP::GetTotalInflightAmountOfData() const { - return Coalesce(Proxy->Common->Settings.TotalInflightAmountOfData, DEFAULT_TOTAL_INFLIGHT_DATA); - } - - ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const { + } + + TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const { + return Coalesce(Proxy->Common->Settings.DeadPeer, DEFAULT_DEADPEER_TIMEOUT); + } + + TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const { + return Proxy->Common->Settings.CloseOnIdle; + } + + TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const { + return Coalesce(Proxy->Common->Settings.LostConnection, DEFAULT_LOST_CONNECTION_TIMEOUT); + } + + ui32 TInterconnectSessionTCP::GetTotalInflightAmountOfData() const { + return Coalesce(Proxy->Common->Settings.TotalInflightAmountOfData, DEFAULT_TOTAL_INFLIGHT_DATA); + } + + ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const { return DurationToCycles(TDuration::MicroSeconds(50)); - } - - void TInterconnectSessionTCP::IssuePingRequest() { - const TInstant now = TActivationContext::Now(); - if (now >= LastPingTimestamp + PingPeriodicity) { - LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); - if (Socket) { + } + + void TInterconnectSessionTCP::IssuePingRequest() { + const TInstant now = TActivationContext::Now(); + if (now >= LastPingTimestamp + PingPeriodicity) { + LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); + if (Socket) { MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask); - } - if (Socket) { - MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask); - } - LastPingTimestamp = now; - } - } - - void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) { - if (Socket) { - MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask); - } - } - - void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) { + } + if (Socket) { + MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask); + } + LastPingTimestamp = now; + } + } + + void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) { + if (Socket) { + MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask); + } + } + + void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) { HTML(str) { DIV_CLASS("panel panel-info") { DIV_CLASS("panel-heading") { @@ -1045,76 +1045,76 @@ namespace NActors { str << "Value"; } } - } + } TABLEBODY() { TABLER() { TABLED() { - str << "Encryption"; - } - TABLED() { - str << (Params.Encryption ? "<font color=green>Enabled</font>" : "<font color=red>Disabled</font>"); - } - } - if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Socket.Get())) { - TABLER() { - TABLED() { - str << "Cipher name"; - } - TABLED() { - str << x->GetCipherName(); - } - } - TABLER() { - TABLED() { - str << "Cipher bits"; - } - TABLED() { - str << x->GetCipherBits(); - } - } - TABLER() { - TABLED() { - str << "Protocol"; - } - TABLED() { - str << x->GetProtocolName(); - } - } - TABLER() { - TABLED() { - str << "Peer CN"; - } - TABLED() { - str << x->GetPeerCommonName(); - } - } - } - TABLER() { - TABLED() { str << "AuthOnly CN"; } - TABLED() { str << Params.AuthCN; } - } - TABLER() { - TABLED() { - str << "Local scope id"; - } - TABLED() { - str << ScopeIdToString(Proxy->Common->LocalScopeId); - } - } - TABLER() { - TABLED() { - str << "Peer scope id"; - } - TABLED() { - str << ScopeIdToString(Params.PeerScopeId); - } - } - TABLER() { - TABLED() { + str << "Encryption"; + } + TABLED() { + str << (Params.Encryption ? "<font color=green>Enabled</font>" : "<font color=red>Disabled</font>"); + } + } + if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Socket.Get())) { + TABLER() { + TABLED() { + str << "Cipher name"; + } + TABLED() { + str << x->GetCipherName(); + } + } + TABLER() { + TABLED() { + str << "Cipher bits"; + } + TABLED() { + str << x->GetCipherBits(); + } + } + TABLER() { + TABLED() { + str << "Protocol"; + } + TABLED() { + str << x->GetProtocolName(); + } + } + TABLER() { + TABLED() { + str << "Peer CN"; + } + TABLED() { + str << x->GetPeerCommonName(); + } + } + } + TABLER() { + TABLED() { str << "AuthOnly CN"; } + TABLED() { str << Params.AuthCN; } + } + TABLER() { + TABLED() { + str << "Local scope id"; + } + TABLED() { + str << ScopeIdToString(Proxy->Common->LocalScopeId); + } + } + TABLER() { + TABLED() { + str << "Peer scope id"; + } + TABLED() { + str << ScopeIdToString(Params.PeerScopeId); + } + } + TABLER() { + TABLED() { str << "This page generated at"; } TABLED() { - str << TActivationContext::Now() << " / " << Now(); + str << TActivationContext::Now() << " / " << Now(); } } TABLER() { @@ -1122,13 +1122,13 @@ namespace NActors { str << "SelfID"; } TABLED() { - str << SelfId().ToString(); + str << SelfId().ToString(); } } - TABLER() { - TABLED() { str << "Frame version/Checksum"; } - TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); } - } + TABLER() { + TABLED() { str << "Frame version/Checksum"; } + TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); } + } #define MON_VAR(NAME) \ TABLER() { \ TABLED() { \ @@ -1138,7 +1138,7 @@ namespace NActors { str << NAME; \ } \ } - + MON_VAR(Created) MON_VAR(NewConnectionSet) MON_VAR(ReceiverId) @@ -1150,7 +1150,7 @@ namespace NActors { MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket)) MON_VAR(ConfirmPacketsForcedBySize) MON_VAR(ConfirmPacketsForcedByTimeout) - + TABLER() { TABLED() { str << "Virtual self ID"; @@ -1158,7 +1158,7 @@ namespace NActors { TABLED() { str << Proxy->SessionVirtualId.ToString(); } - } + } TABLER() { TABLED() { str << "Virtual peer ID"; @@ -1175,54 +1175,54 @@ namespace NActors { str << (Socket ? i64(*Socket) : -1); } } - - ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; - + + ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; + MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) MON_VAR(SendQueueCache.size()) - MON_VAR(NumEventsInReadyChannels) + MON_VAR(NumEventsInReadyChannels) MON_VAR(TotalOutputQueueSize) MON_VAR(BytesUnwritten) - MON_VAR(InflightDataAmount) - MON_VAR(unsentQueueSize) - MON_VAR(SendBufferSize) + MON_VAR(InflightDataAmount) + MON_VAR(unsentQueueSize) + MON_VAR(SendBufferSize) MON_VAR(LastInputActivityTimestamp) MON_VAR(LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) - MON_VAR(LastSentSerial) - MON_VAR(ReceiveContext->GetLastProcessedPacketSerial()) + MON_VAR(LastSentSerial) + MON_VAR(ReceiveContext->GetLastProcessedPacketSerial()) MON_VAR(LastConfirmed) - MON_VAR(FlushSchedule.size()) - MON_VAR(MaxFlushSchedule) - MON_VAR(FlushEventsScheduled) - MON_VAR(FlushEventsProcessed) - - TString clockSkew; - i64 x = GetClockSkew(); - if (x < 0) { - clockSkew = Sprintf("-%s", TDuration::MicroSeconds(-x).ToString().data()); - } else { - clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); - } - - MON_VAR(LastPingTimestamp) - MON_VAR(GetPingRTT()) - MON_VAR(clockSkew) - + MON_VAR(FlushSchedule.size()) + MON_VAR(MaxFlushSchedule) + MON_VAR(FlushEventsScheduled) + MON_VAR(FlushEventsProcessed) + + TString clockSkew; + i64 x = GetClockSkew(); + if (x < 0) { + clockSkew = Sprintf("-%s", TDuration::MicroSeconds(-x).ToString().data()); + } else { + clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); + } + + MON_VAR(LastPingTimestamp) + MON_VAR(GetPingRTT()) + MON_VAR(clockSkew) + MON_VAR(GetDeadPeerTimeout()) - MON_VAR(GetTotalInflightAmountOfData()) - MON_VAR(GetCloseOnIdleTimeout()) - MON_VAR(Subscribers.size()) + MON_VAR(GetTotalInflightAmountOfData()) + MON_VAR(GetCloseOnIdleTimeout()) + MON_VAR(Subscribers.size()) } - } - } - } - } - } - - void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) { - TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common)); + } + } + } + } + } + + void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) { + TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common)); } -} +} |