diff options
author | serxa <serxa@yandex-team.ru> | 2022-02-10 16:49:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:08 +0300 |
commit | d6d7db348c2cc64e71243cab9940ee6778f4317d (patch) | |
tree | bac67f42a02f9368eb4d329f5d79b77d0a6adc18 /library/cpp/actors/interconnect | |
parent | 8d57b69dee81198a59c39e64704f7dc9f04b4fbf (diff) | |
download | ydb-d6d7db348c2cc64e71243cab9940ee6778f4317d.tar.gz |
Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect')
10 files changed, 128 insertions, 128 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a66ba2a154..9aee8fd5a2 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -3,13 +3,13 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/executor_thread.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/actors/core/probes.h> +#include <library/cpp/actors/core/probes.h> #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/prof/tag.h> #include <library/cpp/digest/crc32c/crc32c.h> -LWTRACE_USING(ACTORLIB_PROVIDER); - +LWTRACE_USING(ACTORLIB_PROVIDER); + namespace NActors { DECLARE_WILSON_EVENT(EventSentToSocket); DECLARE_WILSON_EVENT(EventReceivedFromSocket); @@ -25,8 +25,8 @@ namespace NActors { // WILSON_TRACE(*ctx, &traceId, EventSentToSocket); // } traceId.Serialize(&event.Descr.TraceId); - LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); - task.Orbit.Take(event.Orbit); + LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); + task.Orbit.Take(event.Orbit); event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); @@ -58,7 +58,7 @@ namespace NActors { switch (State) { case EState::INITIAL: event.InitChecksum(); - LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize); + LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize); if (event.Event) { State = EState::CHUNKER; IEventBase *base = event.Event.Get(); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e4a0ae3cda..a26cb7a92c 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -42,11 +42,11 @@ namespace NActors { class TEventOutputChannel : public TInterconnectLoggingBase { public: - TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, + TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params) - : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId)) + : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId)) , Pool(pool) - , PeerNodeId(peerNodeId) + , PeerNodeId(peerNodeId) , ChannelId(id) , Metrics(std::move(metrics)) , Params(std::move(params)) @@ -56,11 +56,11 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { + std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); OutputQueueSize += bytes; - return std::make_pair(bytes, &event); + return std::make_pair(bytes, &event); } void DropConfirmed(ui64 confirm); @@ -86,7 +86,7 @@ namespace NActors { void NotifyUndelivered(); TEventHolderPool& Pool; - const ui32 PeerNodeId; + const ui32 PeerNodeId; const ui16 ChannelId; std::shared_ptr<IInterconnectMetrics> Metrics; const TSessionParams Params; diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 285709a00c..104573daaa 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -2,7 +2,7 @@ #include <library/cpp/actors/core/actorid.h> #include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/datetime.h> #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/monlib/metrics/metric_registry.h> #include <util/generic/map.h> @@ -82,7 +82,7 @@ namespace NActors { TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024; TString ClusterUUID; TVector<TString> AcceptUUID; - ui64 StartTime = GetCycleCountFast(); + ui64 StartTime = GetCycleCountFast(); TString TechnicalSelfHostName; TInitWhiteboardCallback InitWhiteboard; TUpdateWhiteboardCallback UpdateWhiteboard; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 0abe9fe659..e445c42f70 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -1,7 +1,7 @@ #include "interconnect_tcp_session.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/probes.h> -#include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/datetime.h> namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); @@ -195,7 +195,7 @@ namespace NActors { ConfirmedByInput = HeaderConfirm; if (AtomicGet(Context->ControlPacketId) <= HeaderConfirm && !NewPingProtocol) { ui64 sendTime = AtomicGet(Context->ControlPacketSendTimer); - TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime); + TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime); const auto durationUs = duration.MicroSeconds(); Metrics->UpdateLegacyPingTimeHist(durationUs); PingQ.push_back(duration); @@ -217,7 +217,7 @@ namespace NActors { Send(SessionId, new TEvProcessPingRequest(HeaderSerial & ~TTcpPacketBuf::PingRequestMask)); } else if (HeaderSerial & TTcpPacketBuf::PingResponseMask) { const ui64 sent = HeaderSerial & ~TTcpPacketBuf::PingResponseMask; - const ui64 received = GetCycleCountFast(); + const ui64 received = GetCycleCountFast(); HandlePingResponse(CyclesToDuration(received - sent)); } else if (HeaderSerial & TTcpPacketBuf::ClockMask) { HandleClock(TInstant::MicroSeconds(HeaderSerial & ~TTcpPacketBuf::ClockMask)); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f53..8da77bcd1d 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -5,7 +5,7 @@ #include <library/cpp/actors/core/probes.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/interconnect.h> -#include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/monlib/service/pages/templates.h> @@ -128,20 +128,20 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev); - LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); - - TotalOutputQueueSize += dataSize; + const auto [dataSize, event] = oChannel.Push(*ev); + LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); + + TotalOutputQueueSize += dataSize; Proxy->Metrics->AddOutputBuffersTotalSize(dataSize); - if (!wasWorking) { - // this channel has returned to work -- it was empty and this we have just put first event in the queue - ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); + if (!wasWorking) { + // this channel has returned to work -- it was empty and this we have just put first event in the queue + ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); } SetOutputStuckFlag(true); ++NumEventsInReadyChannels; - LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); + LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, QueueSizeInEvents = oChannel.GetQueueSize(), QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); @@ -179,7 +179,7 @@ namespace NActors { } else { TActivationContext::Send(ev); } - LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat()); + LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat()); LOG_DEBUG_IC_SESSION("ICS17", "batching started"); } } @@ -315,7 +315,7 @@ namespace NActors { // update ping time Ping = msg.Ping; - LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat()); + LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat()); bool needConfirm = false; @@ -342,7 +342,7 @@ namespace NActors { // generate more traffic if we have unblocked state now if (unblockedSomething) { - LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); + LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); GenerateTraffic(); } @@ -380,7 +380,7 @@ namespace NActors { void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) { if (ev->Get() == RamInQueue) { - LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); + LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); RamInQueue = nullptr; GenerateTraffic(); } @@ -391,7 +391,7 @@ namespace NActors { IssuePingRequest(); if (RamInQueue && !RamInQueue->Batching) { - LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0); + LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0); return; // we'll do it a bit later } else { RamInQueue = nullptr; @@ -399,51 +399,51 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic"); - // There is a tradeoff between fairness and efficiency. - // The less traffic is generated here, the less buffering is after fair scheduler, - // the more fair system is, the less latency is present. - // The more traffic is generated here, the less syscalls and actor-system overhead occurs, - // the less cpu is consumed. - static const ui64 generateLimit = 64 * 1024; - + // There is a tradeoff between fairness and efficiency. + // The less traffic is generated here, the less buffering is after fair scheduler, + // the more fair system is, the less latency is present. + // The more traffic is generated here, the less syscalls and actor-system overhead occurs, + // the less cpu is consumed. + static const ui64 generateLimit = 64 * 1024; + const ui64 sizeBefore = TotalOutputQueueSize; - ui32 generatedPackets = 0; - ui64 generatedBytes = 0; - ui64 generateStarted = GetCycleCountFast(); + ui32 generatedPackets = 0; + ui64 generatedBytes = 0; + ui64 generateStarted = GetCycleCountFast(); // apply traffic changes auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); }; - // first, we create as many data packets as we can generate under certain conditions; they include presence - // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions - // we exit cycle - while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) { - if (generatedBytes >= generateLimit) { - // resume later but ensure that we have issued at least one packet + // first, we create as many data packets as we can generate under certain conditions; they include presence + // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions + // we exit cycle + while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) { + if (generatedBytes >= generateLimit) { + // resume later but ensure that we have issued at least one packet RamInQueue = new TEvRam(false); Send(SelfId(), RamInQueue); - RamStartedCycles = GetCycleCountFast(); - LWPROBE(StartRam, Proxy->PeerNodeId); - break; + RamStartedCycles = GetCycleCountFast(); + LWPROBE(StartRam, Proxy->PeerNodeId); + break; } - try { - generatedBytes += MakePacket(true); - ++generatedPackets; - } catch (const TExSerializedEventTooLarge& ex) { - // terminate session if the event can't be serialized properly - accountTraffic(); - LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); - return Terminate(TDisconnectReason::EventTooLarge()); + try { + generatedBytes += MakePacket(true); + ++generatedPackets; + } catch (const TExSerializedEventTooLarge& ex) { + // terminate session if the event can't be serialized properly + accountTraffic(); + LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); + return Terminate(TDisconnectReason::EventTooLarge()); } } - if (Socket) { - WriteData(); - } - - LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes); - + if (Socket) { + WriteData(); + } + + LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes); + accountTraffic(); EqualizeCounter += ChannelScheduler->Equalize(); } @@ -520,10 +520,10 @@ namespace NActors { ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false"); if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) { Proxy->Metrics->IncUsefulWriteWakeups(); - ui64 nowCycles = GetCycleCountFast(); - double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0; - LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0); - WriteBlockedTotal += TDuration::MicroSeconds(blockedUs); + ui64 nowCycles = GetCycleCountFast(); + double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0; + LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0); + WriteBlockedTotal += TDuration::MicroSeconds(blockedUs); GenerateTraffic(); } else if (!ev->Cookie) { Proxy->Metrics->IncSpuriousWriteWakeups(); @@ -602,7 +602,7 @@ namespace NActors { Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); BytesUnwritten -= r; written += r; - ui64 packets = 0; + ui64 packets = 0; // advance SendQueuePos to eat all processed items for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { @@ -610,11 +610,11 @@ namespace NActors { LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial()); } ++PacketsWrittenToSocket; - ++packets; - LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); + ++packets; + LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } - - LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); + + LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } else if (-r != EAGAIN && -r != EWOULDBLOCK) { const TString message = r == 0 ? "connection closed by peer" : err ? err @@ -635,8 +635,8 @@ namespace NActors { // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); ReceiveContext->WriteBlockedByFullSendBuffer = true; - WriteBlockedCycles = GetCycleCountFast(); - LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written); + WriteBlockedCycles = GetCycleCountFast(); + LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written); LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit"); if (PollerToken) { @@ -718,7 +718,7 @@ namespace NActors { } } - ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { + ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { Y_VERIFY(Socket); TSendQueue::iterator packet; @@ -759,7 +759,7 @@ namespace NActors { } if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { - AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast()); + AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast()); AtomicSet(ReceiveContext->ControlPacketId, OutputCounter); } @@ -788,8 +788,8 @@ namespace NActors { packet->Sign(); // count number of bytes pending for write - ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize(); - BytesUnwritten += packetSize; + ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize(); + BytesUnwritten += packetSize; LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(), @@ -799,13 +799,13 @@ namespace NActors { ResetFlushLogic(); ++PacketsGenerated; - LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); + LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); if (!data) { WriteData(); } - - return packetSize; + + return packetSize; } bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { @@ -836,14 +836,14 @@ namespace NActors { channel.DropConfirmed(lastDroppedSerial); }); - const ui64 current = InflightDataAmount; - const ui64 limit = GetTotalInflightAmountOfData(); - const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount; + const ui64 current = InflightDataAmount; + const ui64 limit = GetTotalInflightAmountOfData(); + const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount; PacketsConfirmed += numDropped; InflightDataAmount -= droppedDataAmount; Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); - LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); + LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes" " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped); @@ -896,7 +896,7 @@ namespace NActors { } } - LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal); + LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal); Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization } @@ -967,15 +967,15 @@ namespace NActors { auto& lastpair = OutputQueueUtilization.Last(); if (state) - lastpair.first -= GetCycleCountFast(); + lastpair.first -= GetCycleCountFast(); else - lastpair.first += GetCycleCountFast(); + lastpair.first += GetCycleCountFast(); OutputStuckFlag = state; } void TInterconnectSessionTCP::SwitchStuckPeriod() { - auto now = GetCycleCountFast(); + auto now = GetCycleCountFast(); if (OutputQueueUtilization.Size() != 0) { auto& lastpair = OutputQueueUtilization.Last(); lastpair.second = now - lastpair.second; @@ -1005,7 +1005,7 @@ namespace NActors { } ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const { - return DurationToCycles(TDuration::MicroSeconds(50)); + return DurationToCycles(TDuration::MicroSeconds(50)); } void TInterconnectSessionTCP::IssuePingRequest() { @@ -1013,7 +1013,7 @@ namespace NActors { if (now >= LastPingTimestamp + PingPeriodicity) { LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); if (Socket) { - MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask); + MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask); } if (Socket) { MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7fc00dbcc5..e347d9b799 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -6,7 +6,7 @@ #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/helpers/mon_histogram_helper.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/util/funnel_queue.h> #include <library/cpp/actors/util/recentwnd.h> @@ -38,12 +38,12 @@ namespace NActors { public: TSlowPathChecker(TTraceCallback&& callback) : Callback(std::move(callback)) - , Start(GetCycleCountFast()) + , Start(GetCycleCountFast()) { } ~TSlowPathChecker() { - const NHPTimer::STime end = GetCycleCountFast(); + const NHPTimer::STime end = GetCycleCountFast(); const NHPTimer::STime elapsed = end - Start; if (elapsed > 1000000) { Callback(NHPTimer::GetSeconds(elapsed) * 1000); @@ -63,7 +63,7 @@ namespace NActors { class TTimeLimit { public: TTimeLimit(ui64 limitInCycles) - : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles) + : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles) { } @@ -73,7 +73,7 @@ namespace NActors { } bool CheckExceeded() { - return UpperLimit != 0 && GetCycleCountFast() > UpperLimit; + return UpperLimit != 0 && GetCycleCountFast() > UpperLimit; } const ui64 UpperLimit; @@ -125,7 +125,7 @@ namespace NActors { std::unordered_map<ui16, TRope> ChannelMap; TReceiveContext() { - GetTimeFast(&StartTime); + GetTimeFast(&StartTime); } // returns false if sessions needs to be terminated and packet not to be processed @@ -378,7 +378,7 @@ namespace NActors { void SetNewConnection(TEvHandshakeDone::TPtr& ev); TEvRam* RamInQueue = nullptr; - ui64 RamStartedCycles = 0; + ui64 RamStartedCycles = 0; void HandleRam(TEvRam::TPtr& ev); void GenerateTraffic(); @@ -389,7 +389,7 @@ namespace NActors { void Handle(TEvPollerRegisterResult::TPtr ev); void WriteData(); - ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); + ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); bool DropConfirmed(ui64 confirm); void ShutdownSocket(TDisconnectReason reason); @@ -452,21 +452,21 @@ namespace NActors { TSendQueue SendQueue; TSendQueue SendQueueCache; TSendQueue::iterator SendQueuePos; - ui64 WriteBlockedCycles = 0; // start of current block period - TDuration WriteBlockedTotal; // total incremental duration that session has been blocked + ui64 WriteBlockedCycles = 0; // start of current block period + TDuration WriteBlockedTotal; // total incremental duration that session has been blocked ui64 BytesUnwritten = 0; void TrimSendQueueCache(); - TDuration GetWriteBlockedTotal() const { + TDuration GetWriteBlockedTotal() const { if (ReceiveContext->WriteBlockedByFullSendBuffer) { - double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0; - return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any - } else { - return WriteBlockedTotal; - } - } - + double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0; + return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any + } else { + return WriteBlockedTotal; + } + } + ui64 OutputCounter; ui64 LastSentSerial = 0; diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp index e2c289ed59..55c9679414 100644 --- a/library/cpp/actors/interconnect/packet.cpp +++ b/library/cpp/actors/interconnect/packet.cpp @@ -1,11 +1,11 @@ #include "packet.h" -#include <library/cpp/actors/core/probes.h> - +#include <library/cpp/actors/core/probes.h> + #include <util/system/datetime.h> -LWTRACE_USING(ACTORLIB_PROVIDER); - +LWTRACE_USING(ACTORLIB_PROVIDER); + ui32 TEventHolder::Fill(IEventHandle& ev) { Serial = 0; Descr.Type = ev.Type; @@ -27,6 +27,6 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { } else { EventSerializedSize = 0; } - + return EventSerializedSize; } diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 4ba50a2b5f..f062633b0b 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -8,7 +8,7 @@ #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/prof/tag.h> #include <library/cpp/digest/crc32c/crc32c.h> -#include <library/cpp/lwtrace/shuttle.h> +#include <library/cpp/lwtrace/shuttle.h> #include <util/generic/string.h> #include <util/generic/list.h> @@ -108,7 +108,7 @@ struct TEventHolder : TNonCopyable { ui64 Serial; ui32 EventSerializedSize; ui32 EventActuallySerialized; - mutable NLWTrace::TOrbit Orbit; + mutable NLWTrace::TOrbit Orbit; ui32 Fill(IEventHandle& ev); @@ -136,7 +136,7 @@ struct TEventHolder : TNonCopyable { void Clear() { Event.Reset(); Buffer.Reset(); - Orbit.Reset(); + Orbit.Reset(); } }; @@ -154,7 +154,7 @@ struct TTcpPacketOutTask : TNonCopyable { bool TriedWriting; char *FreeArea; char *End; - mutable NLWTrace::TOrbit Orbit; + mutable NLWTrace::TOrbit Orbit; public: TTcpPacketOutTask(const TSessionParams& params) @@ -189,7 +189,7 @@ public: TriedWriting = false; FreeArea = Params.UseModernFrame ? Packet.v2.Data : Packet.v1.Data; End = FreeArea + TTcpPacketBuf::PacketDataLen; - Orbit.Reset(); + Orbit.Reset(); } bool IsEmpty() const { diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..ef363b208e 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -5,7 +5,7 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/actors/core/probes.h> +#include <library/cpp/actors/core/probes.h> #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/util/funnel_queue.h> @@ -17,9 +17,9 @@ #include <variant> namespace NActors { - - LWTRACE_USING(ACTORLIB_PROVIDER); - + + LWTRACE_USING(ACTORLIB_PROVIDER); + namespace { int LastSocketError() { #if defined(_win_) diff --git a/library/cpp/actors/interconnect/profiler.h b/library/cpp/actors/interconnect/profiler.h index 77a59e3179..d9192e1559 100644 --- a/library/cpp/actors/interconnect/profiler.h +++ b/library/cpp/actors/interconnect/profiler.h @@ -1,7 +1,7 @@ #pragma once -#include <library/cpp/actors/util/datetime.h> - +#include <library/cpp/actors/util/datetime.h> + namespace NActors { class TProfiled { @@ -14,7 +14,7 @@ namespace NActors { EType Type; // entry kind int Line; const char *Marker; // name of the profiled function/part - ui64 Timestamp; // cycles + ui64 Timestamp; // cycles }; bool Enable = false; @@ -44,7 +44,7 @@ namespace NActors { type, line, marker, - GetCycleCountFast() + GetCycleCountFast() }); } } |