diff options
author | akchernikov <[email protected]> | 2022-02-10 16:50:44 +0300 |
---|---|---|
committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:44 +0300 |
commit | ea46c401e7900b229add3e6074dbf89adc84ebfc (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | 87cccadbd489f00bc6d81b27ad182277cbb25826 (diff) |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 74e0be4b3e5..2ded7f9f537 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -40,7 +40,7 @@ namespace NActors { , OutputQueueUtilization(16) , OutputCounter(0ULL) { - Proxy->Metrics->SetConnected(0); + Proxy->Metrics->SetConnected(0); ReceiveContext.Reset(new TReceiveContext); } @@ -56,7 +56,7 @@ namespace NActors { as->Send(id, event.Release()); }; Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback)); - ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool, + ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool, Proxy->Common->Settings.MaxSerializedEventSize, Params); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); @@ -85,7 +85,7 @@ namespace NActors { for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); } - Proxy->Metrics->SubSubscribersCount(Subscribers.size()); + Proxy->Metrics->SubSubscribersCount(Subscribers.size()); Subscribers.clear(); ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { @@ -98,13 +98,13 @@ namespace NActors { SendUpdateToWhiteboard(false); - Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize); - Proxy->Metrics->SubInflightDataAmount(InflightDataAmount); + Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize); + Proxy->Metrics->SubInflightDataAmount(InflightDataAmount); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId); if (!Subscribers.empty()) { - Proxy->Metrics->SubSubscribersCount(Subscribers.size()); + Proxy->Metrics->SubSubscribersCount(Subscribers.size()); } TActor::PassAway(); @@ -132,7 +132,7 @@ namespace NActors { LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; - Proxy->Metrics->AddOutputBuffersTotalSize(dataSize); + Proxy->Metrics->AddOutputBuffersTotalSize(dataSize); if (!wasWorking) { // this channel has returned to work -- it was empty and this we have just put first event in the queue ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); @@ -155,7 +155,7 @@ namespace NActors { } ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20); - if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { + if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size"); if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) { CreateSessionKillingActor(Proxy->Common); @@ -188,7 +188,7 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data()); const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie); if (inserted) { - Proxy->Metrics->IncSubscribersCount(); + Proxy->Metrics->IncSubscribersCount(); } else { it->second = ev->Cookie; } @@ -197,7 +197,7 @@ namespace NActors { void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); - Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); + Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); } THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) { @@ -243,7 +243,7 @@ namespace NActors { // create input session actor auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, - Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); + Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); @@ -254,7 +254,7 @@ namespace NActors { ReceiveContext->WriteBlockedByFullSendBuffer = false; LostConnectionWatchdog.Disarm(); - Proxy->Metrics->SetConnected(1); + Proxy->Metrics->SetConnected(1); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId); // arm pinger timer @@ -264,7 +264,7 @@ namespace NActors { // REINITIALIZE SEND QUEUE // // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other - // auxiliary packets; also reset packet metrics to zero to start sending from the beginning + // auxiliary packets; also reset packet metrics to zero to start sending from the beginning // also reset SendQueuePos // drop confirmed packets first as we do not need unwanted retransmissions @@ -489,17 +489,17 @@ namespace NActors { void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { if (Socket) { if (const TString& s = reason.ToString()) { - Proxy->Metrics->IncDisconnectByReason(s); + Proxy->Metrics->IncDisconnectByReason(s); } LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data()); Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data()); Socket->Shutdown(SHUT_RDWR); Socket.Reset(); - Proxy->Metrics->IncDisconnections(); + Proxy->Metrics->IncDisconnections(); CloseOnIdleWatchdog.Disarm(); LostConnectionWatchdog.Arm(SelfId()); - Proxy->Metrics->SetConnected(0); + Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } } @@ -519,14 +519,14 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s", ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false"); if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) { - Proxy->Metrics->IncUsefulWriteWakeups(); + Proxy->Metrics->IncUsefulWriteWakeups(); ui64 nowCycles = GetCycleCountFast(); double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0; LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0); WriteBlockedTotal += TDuration::MicroSeconds(blockedUs); GenerateTraffic(); } else if (!ev->Cookie) { - Proxy->Metrics->IncSpuriousWriteWakeups(); + Proxy->Metrics->IncSpuriousWriteWakeups(); } if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { Send(ReceiverId, ev->Release().Release(), 0, 1); @@ -591,7 +591,7 @@ namespace NActors { #else r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err); #endif - Proxy->Metrics->IncSendSyscalls(); + Proxy->Metrics->IncSendSyscalls(); } while (r == -EINTR); LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); @@ -621,7 +621,7 @@ namespace NActors { : Sprintf("socket: %s", strerror(-r)); LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); if (written) { - Proxy->Metrics->AddTotalBytesWritten(written); + Proxy->Metrics->AddTotalBytesWritten(written); } return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); } else { @@ -651,7 +651,7 @@ namespace NActors { } } if (written) { - Proxy->Metrics->AddTotalBytesWritten(written); + Proxy->Metrics->AddTotalBytesWritten(written); } } @@ -753,9 +753,9 @@ namespace NActors { Y_VERIFY(!packet->IsEmpty()); InflightDataAmount += packet->GetDataSize(); - Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); + Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); if (InflightDataAmount > GetTotalInflightAmountOfData()) { - Proxy->Metrics->IncInflyLimitReach(); + Proxy->Metrics->IncInflyLimitReach(); } if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { @@ -842,7 +842,7 @@ namespace NActors { PacketsConfirmed += numDropped; InflightDataAmount -= droppedDataAmount; - Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); + Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes" @@ -870,9 +870,9 @@ namespace NActors { Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink const ui64 net = netBefore - netAfter; // number of net bytes serialized - // adjust metrics for local and global queue size + // adjust metrics for local and global queue size TotalOutputQueueSize -= net; - Proxy->Metrics->SubOutputBuffersTotalSize(net); + Proxy->Metrics->SubOutputBuffersTotalSize(net); bytesGenerated += gross; Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross); @@ -944,7 +944,7 @@ namespace NActors { } while (false); } - callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), + callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), connected, flagState == EFlag::GREEN, flagState == EFlag::YELLOW, |