summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authorakchernikov <[email protected]>2022-02-10 16:50:44 +0300
committerDaniil Cherednik <[email protected]>2022-02-10 16:50:44 +0300
commitea46c401e7900b229add3e6074dbf89adc84ebfc (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parent87cccadbd489f00bc6d81b27ad182277cbb25826 (diff)
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp54
1 files changed, 27 insertions, 27 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 74e0be4b3e5..2ded7f9f537 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -40,7 +40,7 @@ namespace NActors {
, OutputQueueUtilization(16)
, OutputCounter(0ULL)
{
- Proxy->Metrics->SetConnected(0);
+ Proxy->Metrics->SetConnected(0);
ReceiveContext.Reset(new TReceiveContext);
}
@@ -56,7 +56,7 @@ namespace NActors {
as->Send(id, event.Release());
};
Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback));
- ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
+ ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
Proxy->Common->Settings.MaxSerializedEventSize, Params);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
@@ -85,7 +85,7 @@ namespace NActors {
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
}
- Proxy->Metrics->SubSubscribersCount(Subscribers.size());
+ Proxy->Metrics->SubSubscribersCount(Subscribers.size());
Subscribers.clear();
ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
@@ -98,13 +98,13 @@ namespace NActors {
SendUpdateToWhiteboard(false);
- Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
- Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
+ Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
+ Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
if (!Subscribers.empty()) {
- Proxy->Metrics->SubSubscribersCount(Subscribers.size());
+ Proxy->Metrics->SubSubscribersCount(Subscribers.size());
}
TActor::PassAway();
@@ -132,7 +132,7 @@ namespace NActors {
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
- Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
+ Proxy->Metrics->AddOutputBuffersTotalSize(dataSize);
if (!wasWorking) {
// this channel has returned to work -- it was empty and this we have just put first event in the queue
ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
@@ -155,7 +155,7 @@ namespace NActors {
}
ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
- if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
+ if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
CreateSessionKillingActor(Proxy->Common);
@@ -188,7 +188,7 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
if (inserted) {
- Proxy->Metrics->IncSubscribersCount();
+ Proxy->Metrics->IncSubscribersCount();
} else {
it->second = ev->Cookie;
}
@@ -197,7 +197,7 @@ namespace NActors {
void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
- Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
+ Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
}
THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
@@ -243,7 +243,7 @@ namespace NActors {
// create input session actor
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
- Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
+ Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
@@ -254,7 +254,7 @@ namespace NActors {
ReceiveContext->WriteBlockedByFullSendBuffer = false;
LostConnectionWatchdog.Disarm();
- Proxy->Metrics->SetConnected(1);
+ Proxy->Metrics->SetConnected(1);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
// arm pinger timer
@@ -264,7 +264,7 @@ namespace NActors {
// REINITIALIZE SEND QUEUE
//
// scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
- // auxiliary packets; also reset packet metrics to zero to start sending from the beginning
+ // auxiliary packets; also reset packet metrics to zero to start sending from the beginning
// also reset SendQueuePos
// drop confirmed packets first as we do not need unwanted retransmissions
@@ -489,17 +489,17 @@ namespace NActors {
void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
if (Socket) {
if (const TString& s = reason.ToString()) {
- Proxy->Metrics->IncDisconnectByReason(s);
+ Proxy->Metrics->IncDisconnectByReason(s);
}
LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
Socket->Shutdown(SHUT_RDWR);
Socket.Reset();
- Proxy->Metrics->IncDisconnections();
+ Proxy->Metrics->IncDisconnections();
CloseOnIdleWatchdog.Disarm();
LostConnectionWatchdog.Arm(SelfId());
- Proxy->Metrics->SetConnected(0);
+ Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
}
@@ -519,14 +519,14 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
- Proxy->Metrics->IncUsefulWriteWakeups();
+ Proxy->Metrics->IncUsefulWriteWakeups();
ui64 nowCycles = GetCycleCountFast();
double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
GenerateTraffic();
} else if (!ev->Cookie) {
- Proxy->Metrics->IncSpuriousWriteWakeups();
+ Proxy->Metrics->IncSpuriousWriteWakeups();
}
if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
Send(ReceiverId, ev->Release().Release(), 0, 1);
@@ -591,7 +591,7 @@ namespace NActors {
#else
r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
#endif
- Proxy->Metrics->IncSendSyscalls();
+ Proxy->Metrics->IncSendSyscalls();
} while (r == -EINTR);
LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
@@ -621,7 +621,7 @@ namespace NActors {
: Sprintf("socket: %s", strerror(-r));
LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
+ Proxy->Metrics->AddTotalBytesWritten(written);
}
return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
} else {
@@ -651,7 +651,7 @@ namespace NActors {
}
}
if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
+ Proxy->Metrics->AddTotalBytesWritten(written);
}
}
@@ -753,9 +753,9 @@ namespace NActors {
Y_VERIFY(!packet->IsEmpty());
InflightDataAmount += packet->GetDataSize();
- Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
+ Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
if (InflightDataAmount > GetTotalInflightAmountOfData()) {
- Proxy->Metrics->IncInflyLimitReach();
+ Proxy->Metrics->IncInflyLimitReach();
}
if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
@@ -842,7 +842,7 @@ namespace NActors {
PacketsConfirmed += numDropped;
InflightDataAmount -= droppedDataAmount;
- Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
+ Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
@@ -870,9 +870,9 @@ namespace NActors {
Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink
const ui64 net = netBefore - netAfter; // number of net bytes serialized
- // adjust metrics for local and global queue size
+ // adjust metrics for local and global queue size
TotalOutputQueueSize -= net;
- Proxy->Metrics->SubOutputBuffersTotalSize(net);
+ Proxy->Metrics->SubOutputBuffersTotalSize(net);
bytesGenerated += gross;
Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
@@ -944,7 +944,7 @@ namespace NActors {
} while (false);
}
- callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
+ callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
connected,
flagState == EFlag::GREEN,
flagState == EFlag::YELLOW,