diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 598 |
1 files changed, 299 insertions, 299 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 0e18a20072..2ded7f9f53 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -10,14 +10,14 @@ #include <library/cpp/monlib/service/pages/templates.h> namespace NActors { - LWTRACE_USING(ACTORLIB_PROVIDER); + LWTRACE_USING(ACTORLIB_PROVIDER); - DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); + DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); template<typename T> T Coalesce(T&& x) { return x; - } + } template<typename T, typename T2, typename... TRest> typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) { @@ -26,22 +26,22 @@ namespace NActors { } else { return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...); } - } + } TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params) - : TActor(&TInterconnectSessionTCP::StateFunc) + : TActor(&TInterconnectSessionTCP::StateFunc) , Created(TInstant::Now()) - , Proxy(proxy) + , Proxy(proxy) , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this)) , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this)) , Params(std::move(params)) - , TotalOutputQueueSize(0) - , OutputStuckFlag(false) - , OutputQueueUtilization(16) - , OutputCounter(0ULL) - { + , TotalOutputQueueSize(0) + , OutputStuckFlag(false) + , OutputQueueUtilization(16) + , OutputCounter(0ULL) + { Proxy->Metrics->SetConnected(0); - ReceiveContext.Reset(new TReceiveContext); + ReceiveContext.Reset(new TReceiveContext); } TInterconnectSessionTCP::~TInterconnectSessionTCP() { @@ -62,7 +62,7 @@ namespace NActors { LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId)); SendUpdateToWhiteboard(); - } + } void TInterconnectSessionTCP::CloseInputSession() { Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession); @@ -84,7 +84,7 @@ namespace NActors { for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); - } + } Proxy->Metrics->SubSubscribersCount(Subscribers.size()); Subscribers.clear(); @@ -108,7 +108,7 @@ namespace NActors { } TActor::PassAway(); - } + } void TInterconnectSessionTCP::PassAway() { Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); @@ -118,13 +118,13 @@ namespace NActors { Proxy->ValidateEvent(ev, "Forward"); LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); - ++MessagesGot; + ++MessagesGot; - if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Subscribe(ev); - } + } - ui16 evChannel = ev->GetChannel(); + ui16 evChannel = ev->GetChannel(); auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); @@ -136,31 +136,31 @@ namespace NActors { if (!wasWorking) { // this channel has returned to work -- it was empty and this we have just put first event in the queue ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); - } + } - SetOutputStuckFlag(true); + SetOutputStuckFlag(true); ++NumEventsInReadyChannels; LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, - QueueSizeInEvents = oChannel.GetQueueSize(), + QueueSizeInEvents = oChannel.GetQueueSize(), QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); - // check for overloaded queues + // check for overloaded queues ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); - if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) { + if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) { LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64, - Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit); + Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit); return Terminate(TDisconnectReason::QueueOverload()); - } + } ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20); if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size"); if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) { CreateSessionKillingActor(Proxy->Common); - } - } + } + } if (RamInQueue && !RamInQueue->Batching) { // we have pending TEvRam, so GenerateTraffic will be called no matter what @@ -198,12 +198,12 @@ namespace NActors { void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); - } + } THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) { TEvHandshakeAsk *msg = ev->Get(); - // close existing input session, if any, and do nothing upon its destruction + // close existing input session, if any, and do nothing upon its destruction ReestablishConnection({}, false, TDisconnectReason::NewSession()); const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial(); @@ -211,21 +211,21 @@ namespace NActors { msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial); return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params); - } + } void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) { - if (ReceiverId) { - // upon destruction of input session actor invoke this callback again + if (ReceiverId) { + // upon destruction of input session actor invoke this callback again ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession()); - return; - } + return; + } LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64, ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(), i64(*ev->Get()->Socket)); NewConnectionSet = TActivationContext::Now(); - PacketsWrittenToSocket = 0; + PacketsWrittenToSocket = 0; SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); Socket = std::move(ev->Get()->Socket); @@ -233,21 +233,21 @@ namespace NActors { // there may be a race const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); - // arm watchdogs + // arm watchdogs CloseOnIdleWatchdog.Arm(SelfId()); - // reset activity timestamps + // reset activity timestamps LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); - // create input session actor + // create input session actor auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); - // register our socket in poller actor + // register our socket in poller actor LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); Y_VERIFY(success); @@ -257,31 +257,31 @@ namespace NActors { Proxy->Metrics->SetConnected(1); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId); - // arm pinger timer + // arm pinger timer ResetFlushLogic(); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // REINITIALIZE SEND QUEUE - // - // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // REINITIALIZE SEND QUEUE + // + // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other // auxiliary packets; also reset packet metrics to zero to start sending from the beginning - // also reset SendQueuePos + // also reset SendQueuePos - // drop confirmed packets first as we do not need unwanted retransmissions - SendQueuePos = SendQueue.end(); + // drop confirmed packets first as we do not need unwanted retransmissions + SendQueuePos = SendQueue.end(); DropConfirmed(nextPacket); for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) { - const TSendQueue::iterator next = std::next(it); - if (it->IsEmpty()) { + const TSendQueue::iterator next = std::next(it); + if (it->IsEmpty()) { SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it); - } else { - it->ResetBufs(); - } - it = next; - } + } else { + it->ResetBufs(); + } + it = next; + } TrimSendQueueCache(); - SendQueuePos = SendQueue.begin(); + SendQueuePos = SendQueue.begin(); TMaybe<ui64> s; for (auto it = SendQueuePos; it != SendQueue.end(); ++it) { @@ -295,13 +295,13 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n", SendQueue.size(), LastConfirmed, serial); - BytesUnwritten = 0; - for (const auto& packet : SendQueue) { + BytesUnwritten = 0; + for (const auto& packet : SendQueue) { BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet.GetDataSize(); - } + } - SwitchStuckPeriod(); + SwitchStuckPeriod(); LastHandshakeDone = TActivationContext::Now(); @@ -310,45 +310,45 @@ namespace NActors { } void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) { - if (ev->Sender == ReceiverId) { - TEvUpdateFromInputSession& msg = *ev->Get(); + if (ev->Sender == ReceiverId) { + TEvUpdateFromInputSession& msg = *ev->Get(); // update ping time Ping = msg.Ping; LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat()); - bool needConfirm = false; + bool needConfirm = false; - // update activity timer for dead peer checker + // update activity timer for dead peer checker LastInputActivityTimestamp = TActivationContext::Now(); if (msg.NumDataBytes) { - UnconfirmedBytes += msg.NumDataBytes; + UnconfirmedBytes += msg.NumDataBytes; if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) { - needConfirm = true; - } else { + needConfirm = true; + } else { SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod); - } + } - // reset payload watchdog that controls close-on-idle behaviour + // reset payload watchdog that controls close-on-idle behaviour LastPayloadActivityTimestamp = TActivationContext::Now(); CloseOnIdleWatchdog.Reset(); - } + } bool unblockedSomething = false; LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { unblockedSomething = DropConfirmed(msg.ConfirmedByInput); - } + } // generate more traffic if we have unblocked state now if (unblockedSomething) { LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); GenerateTraffic(); - } + } - // if we haven't generated any packets, then make a lone Flush packet without any data - if (needConfirm && Socket) { - ++ConfirmPacketsForcedBySize; + // if we haven't generated any packets, then make a lone Flush packet without any data + if (needConfirm && Socket) { + ++ConfirmPacketsForcedBySize; MakePacket(false); } @@ -376,7 +376,7 @@ namespace NActors { } } } - } + } void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) { if (ev->Get() == RamInQueue) { @@ -451,57 +451,57 @@ namespace NActors { void TInterconnectSessionTCP::StartHandshake() { LOG_INFO_IC_SESSION("ICS15", "start handshake"); IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial()); - } + } void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) { ReestablishConnection({}, true, std::move(reason)); - } + } void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose, TDisconnectReason reason) { - if (Socket) { + if (Socket) { LOG_INFO_IC_SESSION("ICS13", "reestablish connection"); ShutdownSocket(std::move(reason)); // stop sending/receiving on socket PendingHandshakeDoneEvent = std::move(ev); StartHandshakeOnSessionClose = startHandshakeOnSessionClose; - if (!ReceiverId) { + if (!ReceiverId) { ReestablishConnectionExecute(); - } + } } } void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) { - if (ev->Sender == ReceiverId) { - const bool wasConnected(Socket); + if (ev->Sender == ReceiverId) { + const bool wasConnected(Socket); LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data()); ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet - if (wasConnected) { - // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should - // restart handshake process, closing our part of socket first + if (wasConnected) { + // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should + // restart handshake process, closing our part of socket first ShutdownSocket(ev->Get()->Reason); StartHandshake(); - } else { + } else { ReestablishConnectionExecute(); - } + } } } void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { - if (Socket) { + if (Socket) { if (const TString& s = reason.ToString()) { Proxy->Metrics->IncDisconnectByReason(s); } LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data()); Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data()); - Socket->Shutdown(SHUT_RDWR); - Socket.Reset(); + Socket->Shutdown(SHUT_RDWR); + Socket.Reset(); Proxy->Metrics->IncDisconnections(); - CloseOnIdleWatchdog.Disarm(); + CloseOnIdleWatchdog.Disarm(); LostConnectionWatchdog.Arm(SelfId()); Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); - } + } } void TInterconnectSessionTCP::ReestablishConnectionExecute() { @@ -512,7 +512,7 @@ namespace NActors { StartHandshake(); } else if (ev) { SetNewConnection(ev); - } + } } void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) { @@ -527,7 +527,7 @@ namespace NActors { GenerateTraffic(); } else if (!ev->Cookie) { Proxy->Metrics->IncSpuriousWriteWakeups(); - } + } if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { Send(ReceiverId, ev->Release().Release(), 0, 1); } @@ -548,10 +548,10 @@ namespace NActors { void TInterconnectSessionTCP::WriteData() { ui64 written = 0; - Y_VERIFY(Socket); // ensure that socket wasn't closed + Y_VERIFY(Socket); // ensure that socket wasn't closed LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { - constexpr ui32 iovLimit = 256; + constexpr ui32 iovLimit = 256; #ifdef _linux_ ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX)); #else @@ -561,8 +561,8 @@ namespace NActors { maxElementsInIOV = 1; } - // vector of write buffers with preallocated stack space - TStackVec<TConstIoVec, iovLimit> wbuffers; + // vector of write buffers with preallocated stack space + TStackVec<TConstIoVec, iovLimit> wbuffers; LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); @@ -574,14 +574,14 @@ namespace NActors { while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) { for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) { - it->AppendToIoVector(wbuffers, maxElementsInIOV); + it->AppendToIoVector(wbuffers, maxElementsInIOV); } - const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); - int iovcnt = wbuffers.size(); + const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); + int iovcnt = wbuffers.size(); - Y_VERIFY(iovcnt > 0); - Y_VERIFY(iovec->iov_len > 0); + Y_VERIFY(iovcnt > 0); + Y_VERIFY(iovec->iov_len > 0); TString err; ssize_t r = 0; @@ -596,23 +596,23 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); - wbuffers.clear(); + wbuffers.clear(); - if (r > 0) { - Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); - BytesUnwritten -= r; + if (r > 0) { + Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); + BytesUnwritten -= r; written += r; ui64 packets = 0; - // advance SendQueuePos to eat all processed items - for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { + // advance SendQueuePos to eat all processed items + for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { if (!SendQueuePos->IsEmpty()) { LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial()); } - ++PacketsWrittenToSocket; + ++PacketsWrittenToSocket; ++packets; LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); - } + } LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } else if (-r != EAGAIN && -r != EWOULDBLOCK) { @@ -624,15 +624,15 @@ namespace NActors { Proxy->Metrics->AddTotalBytesWritten(written); } return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); - } else { + } else { // we have to do some hack for secure socket -- mark the packet as 'tried writing' if (Params.Encryption) { Y_VERIFY(SendQueuePos != SendQueue.end()); SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL } - // we have received EAGAIN error code, this means that we can't issue more data until we have received - // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event + // we have received EAGAIN error code, this means that we can't issue more data until we have received + // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); ReceiveContext->WriteBlockedByFullSendBuffer = true; WriteBlockedCycles = GetCycleCountFast(); @@ -646,7 +646,7 @@ namespace NActors { } else { PollerToken->Request(false, true); } - } + } } } } @@ -656,12 +656,12 @@ namespace NActors { } void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { - if (period != TDuration::Max()) { + if (period != TDuration::Max()) { const TInstant when = TActivationContext::Now() + period; - if (when < ForcePacketTimestamp) { - ForcePacketTimestamp = when; + if (when < ForcePacketTimestamp) { + ForcePacketTimestamp = when; ScheduleFlush(); - } + } } } @@ -671,7 +671,7 @@ namespace NActors { FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; - } + } } void TInterconnectSessionTCP::HandleFlush() { @@ -692,12 +692,12 @@ namespace NActors { } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); - UnconfirmedBytes = 0; + ForcePacketTimestamp = TInstant::Max(); + UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { SetForcePacketTimestamp(ping); - } + } } void TInterconnectSessionTCP::TrimSendQueueCache() { @@ -719,19 +719,19 @@ namespace NActors { } ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { - Y_VERIFY(Socket); - - TSendQueue::iterator packet; - if (SendQueueCache) { - // we have entries in cache, take one and move it to the end of SendQueue - packet = SendQueueCache.begin(); - SendQueue.splice(SendQueue.end(), SendQueueCache, packet); - packet->Reuse(); // reset packet to initial state - } else { - // we have to allocate new packet, so just do it + Y_VERIFY(Socket); + + TSendQueue::iterator packet; + if (SendQueueCache) { + // we have entries in cache, take one and move it to the end of SendQueue + packet = SendQueueCache.begin(); + SendQueue.splice(SendQueue.end(), SendQueueCache, packet); + packet->Reuse(); // reset packet to initial state + } else { + // we have to allocate new packet, so just do it LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) { packet = SendQueue.emplace(SendQueue.end(), Params); - } + } } // update send queue position @@ -741,7 +741,7 @@ namespace NActors { ui64 serial = 0; - if (data) { + if (data) { // generate serial for this data packet serial = ++OutputCounter; @@ -749,21 +749,21 @@ namespace NActors { Y_VERIFY(NumEventsInReadyChannels); LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { FillSendingBuffer(*packet, serial); - } + } Y_VERIFY(!packet->IsEmpty()); InflightDataAmount += packet->GetDataSize(); Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); if (InflightDataAmount > GetTotalInflightAmountOfData()) { Proxy->Metrics->IncInflyLimitReach(); - } + } - if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { + if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast()); - AtomicSet(ReceiveContext->ControlPacketId, OutputCounter); - } - - // update payload activity timer + AtomicSet(ReceiveContext->ControlPacketId, OutputCounter); + } + + // update payload activity timer LastPayloadActivityTimestamp = TActivationContext::Now(); } else if (pingMask) { serial = *pingMask; @@ -781,13 +781,13 @@ namespace NActors { SendQueue.splice(std::next(SendQueuePos), SendQueue, packet); } } - } + } const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial(); packet->SetMetadata(serial, lastInputSerial); - packet->Sign(); + packet->Sign(); - // count number of bytes pending for write + // count number of bytes pending for write ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize(); BytesUnwritten += packetSize; @@ -795,15 +795,15 @@ namespace NActors { " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(), InflightDataAmount, BytesUnwritten); - // reset forced packet sending timestamp as we have confirmed all received data + // reset forced packet sending timestamp as we have confirmed all received data ResetFlushLogic(); - ++PacketsGenerated; + ++PacketsGenerated; LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); - if (!data) { + if (!data) { WriteData(); - } + } return packetSize; } @@ -814,21 +814,21 @@ namespace NActors { Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64, LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial); - LastConfirmed = confirm; + LastConfirmed = confirm; - ui64 droppedDataAmount = 0; - ui32 numDropped = 0; + ui64 droppedDataAmount = 0; + ui32 numDropped = 0; - // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively - // making Serial <= confirm true - TSendQueue::iterator it; + // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively + // making Serial <= confirm true + TSendQueue::iterator it; ui64 lastDroppedSerial = 0; for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) { if (!it->IsEmpty()) { lastDroppedSerial = it->GetSerial(); } droppedDataAmount += it->GetDataSize(); - ++numDropped; + ++numDropped; } SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it); TrimSendQueueCache(); @@ -840,7 +840,7 @@ namespace NActors { const ui64 limit = GetTotalInflightAmountOfData(); const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount; - PacketsConfirmed += numDropped; + PacketsConfirmed += numDropped; InflightDataAmount -= droppedDataAmount; Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); @@ -851,7 +851,7 @@ namespace NActors { Pool->Trim(); // send any unsent free requests return unblockedSomething; - } + } void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { ui32 bytesGenerated = 0; @@ -914,42 +914,42 @@ namespace NActors { const ui32 utilization = Socket ? CalculateQueueUtilization() : 0; if (const auto& callback = Proxy->Common->UpdateWhiteboard) { - enum class EFlag { - GREEN, - YELLOW, - ORANGE, - RED, - }; - EFlag flagState = EFlag::RED; - - if (Socket) { - flagState = EFlag::GREEN; - - do { + enum class EFlag { + GREEN, + YELLOW, + ORANGE, + RED, + }; + EFlag flagState = EFlag::RED; + + if (Socket) { + flagState = EFlag::GREEN; + + do { auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; - if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { - flagState = EFlag::ORANGE; - break; - } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) { - flagState = EFlag::YELLOW; - } - - // check utilization + if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { + flagState = EFlag::ORANGE; + break; + } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) { + flagState = EFlag::YELLOW; + } + + // check utilization if (utilization > 875000) { // 7/8 - flagState = EFlag::ORANGE; - break; + flagState = EFlag::ORANGE; + break; } else if (utilization > 500000) { // 1/2 - flagState = EFlag::YELLOW; - } - } while (false); - } + flagState = EFlag::YELLOW; + } + } while (false); + } callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), - connected, - flagState == EFlag::GREEN, - flagState == EFlag::YELLOW, - flagState == EFlag::ORANGE, - flagState == EFlag::RED, + connected, + flagState == EFlag::GREEN, + flagState == EFlag::YELLOW, + flagState == EFlag::ORANGE, + flagState == EFlag::RED, TlsActivationContext->ExecutorThread.ActorSystem); } @@ -958,34 +958,34 @@ namespace NActors { } } - void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) { - if (OutputStuckFlag == state) - return; + void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) { + if (OutputStuckFlag == state) + return; - if (OutputQueueUtilization.Size() == 0) - return; + if (OutputQueueUtilization.Size() == 0) + return; - auto& lastpair = OutputQueueUtilization.Last(); - if (state) + auto& lastpair = OutputQueueUtilization.Last(); + if (state) lastpair.first -= GetCycleCountFast(); - else + else lastpair.first += GetCycleCountFast(); - OutputStuckFlag = state; - } + OutputStuckFlag = state; + } - void TInterconnectSessionTCP::SwitchStuckPeriod() { + void TInterconnectSessionTCP::SwitchStuckPeriod() { auto now = GetCycleCountFast(); - if (OutputQueueUtilization.Size() != 0) { - auto& lastpair = OutputQueueUtilization.Last(); - lastpair.second = now - lastpair.second; - if (OutputStuckFlag) - lastpair.first += now; - } - - OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now)); + if (OutputQueueUtilization.Size() != 0) { + auto& lastpair = OutputQueueUtilization.Last(); + lastpair.second = now - lastpair.second; + if (OutputStuckFlag) + lastpair.first += now; + } + + OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now)); if (OutputStuckFlag) - OutputQueueUtilization.Last().first -= now; + OutputQueueUtilization.Last().first -= now; } TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const { @@ -1029,26 +1029,26 @@ namespace NActors { } void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) { - HTML(str) { - DIV_CLASS("panel panel-info") { - DIV_CLASS("panel-heading") { - str << "Session"; - } - DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { - TABLER() { - TABLEH() { - str << "Sensor"; - } - TABLEH() { - str << "Value"; - } - } + HTML(str) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Session"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { + str << "Sensor"; + } + TABLEH() { + str << "Value"; + } + } } - TABLEBODY() { - TABLER() { - TABLED() { + TABLEBODY() { + TABLER() { + TABLED() { str << "Encryption"; } TABLED() { @@ -1111,89 +1111,89 @@ namespace NActors { } TABLER() { TABLED() { - str << "This page generated at"; - } - TABLED() { + str << "This page generated at"; + } + TABLED() { str << TActivationContext::Now() << " / " << Now(); - } - } - TABLER() { - TABLED() { - str << "SelfID"; - } - TABLED() { + } + } + TABLER() { + TABLED() { + str << "SelfID"; + } + TABLED() { str << SelfId().ToString(); - } - } + } + } TABLER() { TABLED() { str << "Frame version/Checksum"; } TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); } } -#define MON_VAR(NAME) \ - TABLER() { \ - TABLED() { \ - str << #NAME; \ - } \ - TABLED() { \ - str << NAME; \ - } \ - } - - MON_VAR(Created) - MON_VAR(NewConnectionSet) - MON_VAR(ReceiverId) - MON_VAR(MessagesGot) - MON_VAR(MessagesWrittenToBuffer) - MON_VAR(PacketsGenerated) - MON_VAR(PacketsWrittenToSocket) - MON_VAR(PacketsConfirmed) - MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket)) - MON_VAR(ConfirmPacketsForcedBySize) - MON_VAR(ConfirmPacketsForcedByTimeout) - - TABLER() { - TABLED() { - str << "Virtual self ID"; - } - TABLED() { - str << Proxy->SessionVirtualId.ToString(); - } +#define MON_VAR(NAME) \ + TABLER() { \ + TABLED() { \ + str << #NAME; \ + } \ + TABLED() { \ + str << NAME; \ + } \ + } + + MON_VAR(Created) + MON_VAR(NewConnectionSet) + MON_VAR(ReceiverId) + MON_VAR(MessagesGot) + MON_VAR(MessagesWrittenToBuffer) + MON_VAR(PacketsGenerated) + MON_VAR(PacketsWrittenToSocket) + MON_VAR(PacketsConfirmed) + MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket)) + MON_VAR(ConfirmPacketsForcedBySize) + MON_VAR(ConfirmPacketsForcedByTimeout) + + TABLER() { + TABLED() { + str << "Virtual self ID"; + } + TABLED() { + str << Proxy->SessionVirtualId.ToString(); + } + } + TABLER() { + TABLED() { + str << "Virtual peer ID"; + } + TABLED() { + str << Proxy->RemoteSessionVirtualId.ToString(); + } + } + TABLER() { + TABLED() { + str << "Socket"; + } + TABLED() { + str << (Socket ? i64(*Socket) : -1); + } } - TABLER() { - TABLED() { - str << "Virtual peer ID"; - } - TABLED() { - str << Proxy->RemoteSessionVirtualId.ToString(); - } - } - TABLER() { - TABLED() { - str << "Socket"; - } - TABLED() { - str << (Socket ? i64(*Socket) : -1); - } - } ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; - MON_VAR(OutputStuckFlag) - MON_VAR(SendQueue.size()) - MON_VAR(SendQueueCache.size()) + MON_VAR(OutputStuckFlag) + MON_VAR(SendQueue.size()) + MON_VAR(SendQueueCache.size()) MON_VAR(NumEventsInReadyChannels) - MON_VAR(TotalOutputQueueSize) - MON_VAR(BytesUnwritten) + MON_VAR(TotalOutputQueueSize) + MON_VAR(BytesUnwritten) MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) - MON_VAR(LastHandshakeDone) - MON_VAR(OutputCounter) + MON_VAR(LastInputActivityTimestamp) + MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(LastHandshakeDone) + MON_VAR(OutputCounter) MON_VAR(LastSentSerial) MON_VAR(ReceiveContext->GetLastProcessedPacketSerial()) - MON_VAR(LastConfirmed) + MON_VAR(LastConfirmed) MON_VAR(FlushSchedule.size()) MON_VAR(MaxFlushSchedule) MON_VAR(FlushEventsScheduled) @@ -1211,11 +1211,11 @@ namespace NActors { MON_VAR(GetPingRTT()) MON_VAR(clockSkew) - MON_VAR(GetDeadPeerTimeout()) + MON_VAR(GetDeadPeerTimeout()) MON_VAR(GetTotalInflightAmountOfData()) MON_VAR(GetCloseOnIdleTimeout()) MON_VAR(Subscribers.size()) - } + } } } } @@ -1224,5 +1224,5 @@ namespace NActors { void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) { TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common)); - } + } } |