aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authorserxa <serxa@yandex-team.ru>2022-02-10 16:49:08 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:08 +0300
commitd6d7db348c2cc64e71243cab9940ee6778f4317d (patch)
treebac67f42a02f9368eb4d329f5d79b77d0a6adc18 /library/cpp/actors/interconnect
parent8d57b69dee81198a59c39e64704f7dc9f04b4fbf (diff)
downloadydb-d6d7db348c2cc64e71243cab9940ee6778f4317d.tar.gz
Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h12
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp150
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h36
-rw-r--r--library/cpp/actors/interconnect/packet.cpp10
-rw-r--r--library/cpp/actors/interconnect/packet.h10
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp8
-rw-r--r--library/cpp/actors/interconnect/profiler.h8
10 files changed, 128 insertions, 128 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index a66ba2a154..9aee8fd5a2 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -3,13 +3,13 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/executor_thread.h>
#include <library/cpp/actors/core/log.h>
-#include <library/cpp/actors/core/probes.h>
+#include <library/cpp/actors/core/probes.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/prof/tag.h>
#include <library/cpp/digest/crc32c/crc32c.h>
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
namespace NActors {
DECLARE_WILSON_EVENT(EventSentToSocket);
DECLARE_WILSON_EVENT(EventReceivedFromSocket);
@@ -25,8 +25,8 @@ namespace NActors {
// WILSON_TRACE(*ctx, &traceId, EventSentToSocket);
// }
traceId.Serialize(&event.Descr.TraceId);
- LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
- task.Orbit.Take(event.Orbit);
+ LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
+ task.Orbit.Take(event.Orbit);
event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
(ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
@@ -58,7 +58,7 @@ namespace NActors {
switch (State) {
case EState::INITIAL:
event.InitChecksum();
- LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize);
+ LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize);
if (event.Event) {
State = EState::CHUNKER;
IEventBase *base = event.Event.Get();
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index e4a0ae3cda..a26cb7a92c 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -42,11 +42,11 @@ namespace NActors {
class TEventOutputChannel : public TInterconnectLoggingBase {
public:
- TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
+ TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
- : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
+ : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
, Pool(pool)
- , PeerNodeId(peerNodeId)
+ , PeerNodeId(peerNodeId)
, ChannelId(id)
, Metrics(std::move(metrics))
, Params(std::move(params))
@@ -56,11 +56,11 @@ namespace NActors {
~TEventOutputChannel() {
}
- std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
+ std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
OutputQueueSize += bytes;
- return std::make_pair(bytes, &event);
+ return std::make_pair(bytes, &event);
}
void DropConfirmed(ui64 confirm);
@@ -86,7 +86,7 @@ namespace NActors {
void NotifyUndelivered();
TEventHolderPool& Pool;
- const ui32 PeerNodeId;
+ const ui32 PeerNodeId;
const ui16 ChannelId;
std::shared_ptr<IInterconnectMetrics> Metrics;
const TSessionParams Params;
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
index 285709a00c..104573daaa 100644
--- a/library/cpp/actors/interconnect/interconnect_common.h
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -2,7 +2,7 @@
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/map.h>
@@ -82,7 +82,7 @@ namespace NActors {
TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
TString ClusterUUID;
TVector<TString> AcceptUUID;
- ui64 StartTime = GetCycleCountFast();
+ ui64 StartTime = GetCycleCountFast();
TString TechnicalSelfHostName;
TInitWhiteboardCallback InitWhiteboard;
TUpdateWhiteboardCallback UpdateWhiteboard;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 0abe9fe659..e445c42f70 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -1,7 +1,7 @@
#include "interconnect_tcp_session.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/core/probes.h>
-#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/datetime.h>
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
@@ -195,7 +195,7 @@ namespace NActors {
ConfirmedByInput = HeaderConfirm;
if (AtomicGet(Context->ControlPacketId) <= HeaderConfirm && !NewPingProtocol) {
ui64 sendTime = AtomicGet(Context->ControlPacketSendTimer);
- TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime);
+ TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime);
const auto durationUs = duration.MicroSeconds();
Metrics->UpdateLegacyPingTimeHist(durationUs);
PingQ.push_back(duration);
@@ -217,7 +217,7 @@ namespace NActors {
Send(SessionId, new TEvProcessPingRequest(HeaderSerial & ~TTcpPacketBuf::PingRequestMask));
} else if (HeaderSerial & TTcpPacketBuf::PingResponseMask) {
const ui64 sent = HeaderSerial & ~TTcpPacketBuf::PingResponseMask;
- const ui64 received = GetCycleCountFast();
+ const ui64 received = GetCycleCountFast();
HandlePingResponse(CyclesToDuration(received - sent));
} else if (HeaderSerial & TTcpPacketBuf::ClockMask) {
HandleClock(TInstant::MicroSeconds(HeaderSerial & ~TTcpPacketBuf::ClockMask));
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2ded7f9f53..8da77bcd1d 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -5,7 +5,7 @@
#include <library/cpp/actors/core/probes.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/interconnect.h>
-#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/monlib/service/pages/templates.h>
@@ -128,20 +128,20 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev);
- LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
-
- TotalOutputQueueSize += dataSize;
+ const auto [dataSize, event] = oChannel.Push(*ev);
+ LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
+
+ TotalOutputQueueSize += dataSize;
Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
- if (!wasWorking) {
- // this channel has returned to work -- it was empty and this we have just put first event in the queue
- ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
+ if (!wasWorking) {
+ // this channel has returned to work -- it was empty and this we have just put first event in the queue
+ ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
}
SetOutputStuckFlag(true);
++NumEventsInReadyChannels;
- LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
+ LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
QueueSizeInEvents = oChannel.GetQueueSize(),
QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
@@ -179,7 +179,7 @@ namespace NActors {
} else {
TActivationContext::Send(ev);
}
- LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
+ LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
LOG_DEBUG_IC_SESSION("ICS17", "batching started");
}
}
@@ -315,7 +315,7 @@ namespace NActors {
// update ping time
Ping = msg.Ping;
- LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
+ LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
bool needConfirm = false;
@@ -342,7 +342,7 @@ namespace NActors {
// generate more traffic if we have unblocked state now
if (unblockedSomething) {
- LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
+ LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
GenerateTraffic();
}
@@ -380,7 +380,7 @@ namespace NActors {
void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
if (ev->Get() == RamInQueue) {
- LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
+ LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
RamInQueue = nullptr;
GenerateTraffic();
}
@@ -391,7 +391,7 @@ namespace NActors {
IssuePingRequest();
if (RamInQueue && !RamInQueue->Batching) {
- LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0);
+ LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0);
return; // we'll do it a bit later
} else {
RamInQueue = nullptr;
@@ -399,51 +399,51 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
- // There is a tradeoff between fairness and efficiency.
- // The less traffic is generated here, the less buffering is after fair scheduler,
- // the more fair system is, the less latency is present.
- // The more traffic is generated here, the less syscalls and actor-system overhead occurs,
- // the less cpu is consumed.
- static const ui64 generateLimit = 64 * 1024;
-
+ // There is a tradeoff between fairness and efficiency.
+ // The less traffic is generated here, the less buffering is after fair scheduler,
+ // the more fair system is, the less latency is present.
+ // The more traffic is generated here, the less syscalls and actor-system overhead occurs,
+ // the less cpu is consumed.
+ static const ui64 generateLimit = 64 * 1024;
+
const ui64 sizeBefore = TotalOutputQueueSize;
- ui32 generatedPackets = 0;
- ui64 generatedBytes = 0;
- ui64 generateStarted = GetCycleCountFast();
+ ui32 generatedPackets = 0;
+ ui64 generatedBytes = 0;
+ ui64 generateStarted = GetCycleCountFast();
// apply traffic changes
auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
- // first, we create as many data packets as we can generate under certain conditions; they include presence
- // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
- // we exit cycle
- while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
- if (generatedBytes >= generateLimit) {
- // resume later but ensure that we have issued at least one packet
+ // first, we create as many data packets as we can generate under certain conditions; they include presence
+ // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
+ // we exit cycle
+ while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
+ if (generatedBytes >= generateLimit) {
+ // resume later but ensure that we have issued at least one packet
RamInQueue = new TEvRam(false);
Send(SelfId(), RamInQueue);
- RamStartedCycles = GetCycleCountFast();
- LWPROBE(StartRam, Proxy->PeerNodeId);
- break;
+ RamStartedCycles = GetCycleCountFast();
+ LWPROBE(StartRam, Proxy->PeerNodeId);
+ break;
}
- try {
- generatedBytes += MakePacket(true);
- ++generatedPackets;
- } catch (const TExSerializedEventTooLarge& ex) {
- // terminate session if the event can't be serialized properly
- accountTraffic();
- LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
- return Terminate(TDisconnectReason::EventTooLarge());
+ try {
+ generatedBytes += MakePacket(true);
+ ++generatedPackets;
+ } catch (const TExSerializedEventTooLarge& ex) {
+ // terminate session if the event can't be serialized properly
+ accountTraffic();
+ LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
+ return Terminate(TDisconnectReason::EventTooLarge());
}
}
- if (Socket) {
- WriteData();
- }
-
- LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes);
-
+ if (Socket) {
+ WriteData();
+ }
+
+ LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes);
+
accountTraffic();
EqualizeCounter += ChannelScheduler->Equalize();
}
@@ -520,10 +520,10 @@ namespace NActors {
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
Proxy->Metrics->IncUsefulWriteWakeups();
- ui64 nowCycles = GetCycleCountFast();
- double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
- LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
- WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
+ ui64 nowCycles = GetCycleCountFast();
+ double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
+ LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
+ WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
GenerateTraffic();
} else if (!ev->Cookie) {
Proxy->Metrics->IncSpuriousWriteWakeups();
@@ -602,7 +602,7 @@ namespace NActors {
Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
BytesUnwritten -= r;
written += r;
- ui64 packets = 0;
+ ui64 packets = 0;
// advance SendQueuePos to eat all processed items
for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
@@ -610,11 +610,11 @@ namespace NActors {
LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
}
++PacketsWrittenToSocket;
- ++packets;
- LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
+ ++packets;
+ LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
}
-
- LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
+
+ LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
} else if (-r != EAGAIN && -r != EWOULDBLOCK) {
const TString message = r == 0 ? "connection closed by peer"
: err ? err
@@ -635,8 +635,8 @@ namespace NActors {
// TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
ReceiveContext->WriteBlockedByFullSendBuffer = true;
- WriteBlockedCycles = GetCycleCountFast();
- LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written);
+ WriteBlockedCycles = GetCycleCountFast();
+ LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written);
LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
if (PollerToken) {
@@ -718,7 +718,7 @@ namespace NActors {
}
}
- ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
+ ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
Y_VERIFY(Socket);
TSendQueue::iterator packet;
@@ -759,7 +759,7 @@ namespace NActors {
}
if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
- AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
+ AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
}
@@ -788,8 +788,8 @@ namespace NActors {
packet->Sign();
// count number of bytes pending for write
- ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
- BytesUnwritten += packetSize;
+ ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
+ BytesUnwritten += packetSize;
LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
" InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
@@ -799,13 +799,13 @@ namespace NActors {
ResetFlushLogic();
++PacketsGenerated;
- LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
+ LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
if (!data) {
WriteData();
}
-
- return packetSize;
+
+ return packetSize;
}
bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
@@ -836,14 +836,14 @@ namespace NActors {
channel.DropConfirmed(lastDroppedSerial);
});
- const ui64 current = InflightDataAmount;
- const ui64 limit = GetTotalInflightAmountOfData();
- const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
+ const ui64 current = InflightDataAmount;
+ const ui64 limit = GetTotalInflightAmountOfData();
+ const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
PacketsConfirmed += numDropped;
InflightDataAmount -= droppedDataAmount;
Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
- LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
+ LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
" dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
@@ -896,7 +896,7 @@ namespace NActors {
}
}
- LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal);
+ LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal);
Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization
}
@@ -967,15 +967,15 @@ namespace NActors {
auto& lastpair = OutputQueueUtilization.Last();
if (state)
- lastpair.first -= GetCycleCountFast();
+ lastpair.first -= GetCycleCountFast();
else
- lastpair.first += GetCycleCountFast();
+ lastpair.first += GetCycleCountFast();
OutputStuckFlag = state;
}
void TInterconnectSessionTCP::SwitchStuckPeriod() {
- auto now = GetCycleCountFast();
+ auto now = GetCycleCountFast();
if (OutputQueueUtilization.Size() != 0) {
auto& lastpair = OutputQueueUtilization.Last();
lastpair.second = now - lastpair.second;
@@ -1005,7 +1005,7 @@ namespace NActors {
}
ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const {
- return DurationToCycles(TDuration::MicroSeconds(50));
+ return DurationToCycles(TDuration::MicroSeconds(50));
}
void TInterconnectSessionTCP::IssuePingRequest() {
@@ -1013,7 +1013,7 @@ namespace NActors {
if (now >= LastPingTimestamp + PingPeriodicity) {
LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
if (Socket) {
- MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
+ MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
}
if (Socket) {
MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7fc00dbcc5..e347d9b799 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -6,7 +6,7 @@
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/util/funnel_queue.h>
#include <library/cpp/actors/util/recentwnd.h>
@@ -38,12 +38,12 @@ namespace NActors {
public:
TSlowPathChecker(TTraceCallback&& callback)
: Callback(std::move(callback))
- , Start(GetCycleCountFast())
+ , Start(GetCycleCountFast())
{
}
~TSlowPathChecker() {
- const NHPTimer::STime end = GetCycleCountFast();
+ const NHPTimer::STime end = GetCycleCountFast();
const NHPTimer::STime elapsed = end - Start;
if (elapsed > 1000000) {
Callback(NHPTimer::GetSeconds(elapsed) * 1000);
@@ -63,7 +63,7 @@ namespace NActors {
class TTimeLimit {
public:
TTimeLimit(ui64 limitInCycles)
- : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles)
+ : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles)
{
}
@@ -73,7 +73,7 @@ namespace NActors {
}
bool CheckExceeded() {
- return UpperLimit != 0 && GetCycleCountFast() > UpperLimit;
+ return UpperLimit != 0 && GetCycleCountFast() > UpperLimit;
}
const ui64 UpperLimit;
@@ -125,7 +125,7 @@ namespace NActors {
std::unordered_map<ui16, TRope> ChannelMap;
TReceiveContext() {
- GetTimeFast(&StartTime);
+ GetTimeFast(&StartTime);
}
// returns false if sessions needs to be terminated and packet not to be processed
@@ -378,7 +378,7 @@ namespace NActors {
void SetNewConnection(TEvHandshakeDone::TPtr& ev);
TEvRam* RamInQueue = nullptr;
- ui64 RamStartedCycles = 0;
+ ui64 RamStartedCycles = 0;
void HandleRam(TEvRam::TPtr& ev);
void GenerateTraffic();
@@ -389,7 +389,7 @@ namespace NActors {
void Handle(TEvPollerRegisterResult::TPtr ev);
void WriteData();
- ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
+ ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
bool DropConfirmed(ui64 confirm);
void ShutdownSocket(TDisconnectReason reason);
@@ -452,21 +452,21 @@ namespace NActors {
TSendQueue SendQueue;
TSendQueue SendQueueCache;
TSendQueue::iterator SendQueuePos;
- ui64 WriteBlockedCycles = 0; // start of current block period
- TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
+ ui64 WriteBlockedCycles = 0; // start of current block period
+ TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
ui64 BytesUnwritten = 0;
void TrimSendQueueCache();
- TDuration GetWriteBlockedTotal() const {
+ TDuration GetWriteBlockedTotal() const {
if (ReceiveContext->WriteBlockedByFullSendBuffer) {
- double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0;
- return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any
- } else {
- return WriteBlockedTotal;
- }
- }
-
+ double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0;
+ return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any
+ } else {
+ return WriteBlockedTotal;
+ }
+ }
+
ui64 OutputCounter;
ui64 LastSentSerial = 0;
diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp
index e2c289ed59..55c9679414 100644
--- a/library/cpp/actors/interconnect/packet.cpp
+++ b/library/cpp/actors/interconnect/packet.cpp
@@ -1,11 +1,11 @@
#include "packet.h"
-#include <library/cpp/actors/core/probes.h>
-
+#include <library/cpp/actors/core/probes.h>
+
#include <util/system/datetime.h>
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
ui32 TEventHolder::Fill(IEventHandle& ev) {
Serial = 0;
Descr.Type = ev.Type;
@@ -27,6 +27,6 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {
} else {
EventSerializedSize = 0;
}
-
+
return EventSerializedSize;
}
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4ba50a2b5f..f062633b0b 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -8,7 +8,7 @@
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/prof/tag.h>
#include <library/cpp/digest/crc32c/crc32c.h>
-#include <library/cpp/lwtrace/shuttle.h>
+#include <library/cpp/lwtrace/shuttle.h>
#include <util/generic/string.h>
#include <util/generic/list.h>
@@ -108,7 +108,7 @@ struct TEventHolder : TNonCopyable {
ui64 Serial;
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
- mutable NLWTrace::TOrbit Orbit;
+ mutable NLWTrace::TOrbit Orbit;
ui32 Fill(IEventHandle& ev);
@@ -136,7 +136,7 @@ struct TEventHolder : TNonCopyable {
void Clear() {
Event.Reset();
Buffer.Reset();
- Orbit.Reset();
+ Orbit.Reset();
}
};
@@ -154,7 +154,7 @@ struct TTcpPacketOutTask : TNonCopyable {
bool TriedWriting;
char *FreeArea;
char *End;
- mutable NLWTrace::TOrbit Orbit;
+ mutable NLWTrace::TOrbit Orbit;
public:
TTcpPacketOutTask(const TSessionParams& params)
@@ -189,7 +189,7 @@ public:
TriedWriting = false;
FreeArea = Params.UseModernFrame ? Packet.v2.Data : Packet.v1.Data;
End = FreeArea + TTcpPacketBuf::PacketDataLen;
- Orbit.Reset();
+ Orbit.Reset();
}
bool IsEmpty() const {
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..ef363b208e 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -5,7 +5,7 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
-#include <library/cpp/actors/core/probes.h>
+#include <library/cpp/actors/core/probes.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/util/funnel_queue.h>
@@ -17,9 +17,9 @@
#include <variant>
namespace NActors {
-
- LWTRACE_USING(ACTORLIB_PROVIDER);
-
+
+ LWTRACE_USING(ACTORLIB_PROVIDER);
+
namespace {
int LastSocketError() {
#if defined(_win_)
diff --git a/library/cpp/actors/interconnect/profiler.h b/library/cpp/actors/interconnect/profiler.h
index 77a59e3179..d9192e1559 100644
--- a/library/cpp/actors/interconnect/profiler.h
+++ b/library/cpp/actors/interconnect/profiler.h
@@ -1,7 +1,7 @@
#pragma once
-#include <library/cpp/actors/util/datetime.h>
-
+#include <library/cpp/actors/util/datetime.h>
+
namespace NActors {
class TProfiled {
@@ -14,7 +14,7 @@ namespace NActors {
EType Type; // entry kind
int Line;
const char *Marker; // name of the profiled function/part
- ui64 Timestamp; // cycles
+ ui64 Timestamp; // cycles
};
bool Enable = false;
@@ -44,7 +44,7 @@ namespace NActors {
type,
line,
marker,
- GetCycleCountFast()
+ GetCycleCountFast()
});
}
}