aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-10 16:47:40 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:40 +0300
commit667a4ee7da2e004784b9c3cfab824a81e96f4d66 (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parentf3646f91e0de459836a7800b9ce3e8dc57a2ab3a (diff)
downloadydb-667a4ee7da2e004784b9c3cfab824a81e96f4d66.tar.gz
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp1620
1 files changed, 810 insertions, 810 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 8f13d89f35..2ded7f9f53 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -1,40 +1,40 @@
-#include "interconnect_tcp_proxy.h"
-#include "interconnect_tcp_session.h"
-#include "interconnect_handshake.h"
-
+#include "interconnect_tcp_proxy.h"
+#include "interconnect_tcp_session.h"
+#include "interconnect_handshake.h"
+
#include <library/cpp/actors/core/probes.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/monlib/service/pages/templates.h>
-
-namespace NActors {
+
+namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
-
+
DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
-
- template<typename T>
- T Coalesce(T&& x) {
- return x;
+
+ template<typename T>
+ T Coalesce(T&& x) {
+ return x;
}
-
- template<typename T, typename T2, typename... TRest>
- typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) {
- if (first != typename std::remove_reference<T>::type()) {
- return first;
- } else {
- return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...);
- }
+
+ template<typename T, typename T2, typename... TRest>
+ typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) {
+ if (first != typename std::remove_reference<T>::type()) {
+ return first;
+ } else {
+ return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...);
+ }
}
-
- TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params)
+
+ TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params)
: TActor(&TInterconnectSessionTCP::StateFunc)
- , Created(TInstant::Now())
+ , Created(TInstant::Now())
, Proxy(proxy)
- , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this))
- , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this))
- , Params(std::move(params))
+ , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this))
+ , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this))
+ , Params(std::move(params))
, TotalOutputQueueSize(0)
, OutputStuckFlag(false)
, OutputQueueUtilization(16)
@@ -42,92 +42,92 @@ namespace NActors {
{
Proxy->Metrics->SetConnected(0);
ReceiveContext.Reset(new TReceiveContext);
- }
-
- TInterconnectSessionTCP::~TInterconnectSessionTCP() {
- // close socket ASAP when actor system is being shut down
- if (Socket) {
- Socket->Shutdown(SHUT_RDWR);
- }
- }
-
- void TInterconnectSessionTCP::Init() {
- auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder<IEventBase> event) {
- as->Send(id, event.Release());
- };
- Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback));
+ }
+
+ TInterconnectSessionTCP::~TInterconnectSessionTCP() {
+ // close socket ASAP when actor system is being shut down
+ if (Socket) {
+ Socket->Shutdown(SHUT_RDWR);
+ }
+ }
+
+ void TInterconnectSessionTCP::Init() {
+ auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder<IEventBase> event) {
+ as->Send(id, event.Release());
+ };
+ Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback));
ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool,
- Proxy->Common->Settings.MaxSerializedEventSize, Params);
-
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
+ Proxy->Common->Settings.MaxSerializedEventSize, Params);
+
+ LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId));
- SendUpdateToWhiteboard();
+ SendUpdateToWhiteboard();
}
-
- void TInterconnectSessionTCP::CloseInputSession() {
- Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession);
- }
-
- void TInterconnectSessionTCP::Handle(TEvTerminate::TPtr& ev) {
- Terminate(ev->Get()->Reason);
- }
-
- void TInterconnectSessionTCP::HandlePoison() {
- Terminate(TDisconnectReason());
- }
-
- void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) {
- LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1));
-
- IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this);
- ShutdownSocket(std::move(reason));
-
+
+ void TInterconnectSessionTCP::CloseInputSession() {
+ Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession);
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvTerminate::TPtr& ev) {
+ Terminate(ev->Get()->Reason);
+ }
+
+ void TInterconnectSessionTCP::HandlePoison() {
+ Terminate(TDisconnectReason());
+ }
+
+ void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) {
+ LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1));
+
+ IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this);
+ ShutdownSocket(std::move(reason));
+
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
}
Proxy->Metrics->SubSubscribersCount(Subscribers.size());
- Subscribers.clear();
-
- ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
- channel.NotifyUndelivered();
- });
-
- if (ReceiverId) {
- Send(ReceiverId, new TEvents::TEvPoisonPill);
- }
-
- SendUpdateToWhiteboard(false);
-
+ Subscribers.clear();
+
+ ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
+ channel.NotifyUndelivered();
+ });
+
+ if (ReceiverId) {
+ Send(ReceiverId, new TEvents::TEvPoisonPill);
+ }
+
+ SendUpdateToWhiteboard(false);
+
Proxy->Metrics->SubOutputBuffersTotalSize(TotalOutputQueueSize);
Proxy->Metrics->SubInflightDataAmount(InflightDataAmount);
-
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
- if (!Subscribers.empty()) {
+ LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session destroyed", Proxy->PeerNodeId);
+
+ if (!Subscribers.empty()) {
Proxy->Metrics->SubSubscribersCount(Subscribers.size());
- }
-
- TActor::PassAway();
+ }
+
+ TActor::PassAway();
+ }
+
+ void TInterconnectSessionTCP::PassAway() {
+ Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
}
-
- void TInterconnectSessionTCP::PassAway() {
- Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
- }
-
- void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
- Proxy->ValidateEvent(ev, "Forward");
-
+
+ void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
+ Proxy->ValidateEvent(ev, "Forward");
+
LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
++MessagesGot;
-
+
if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
- Subscribe(ev);
+ Subscribe(ev);
}
-
+
ui16 evChannel = ev->GetChannel();
- auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
- const bool wasWorking = oChannel.IsWorking();
-
+ auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
+ const bool wasWorking = oChannel.IsWorking();
+
const auto [dataSize, event] = oChannel.Push(*ev);
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
@@ -137,268 +137,268 @@ namespace NActors {
// this channel has returned to work -- it was empty and this we have just put first event in the queue
ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
}
-
+
SetOutputStuckFlag(true);
- ++NumEventsInReadyChannels;
-
+ ++NumEventsInReadyChannels;
+
LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
- WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
+ WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
QueueSizeInEvents = oChannel.GetQueueSize(),
- QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
-
+ QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
+
// check for overloaded queues
- ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
+ ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
- LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64,
+ LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64,
Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
- return Terminate(TDisconnectReason::QueueOverload());
+ return Terminate(TDisconnectReason::QueueOverload());
}
-
- ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
+
+ ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
- LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
- if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
- CreateSessionKillingActor(Proxy->Common);
+ LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
+ if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
+ CreateSessionKillingActor(Proxy->Common);
}
}
-
- if (RamInQueue && !RamInQueue->Batching) {
- // we have pending TEvRam, so GenerateTraffic will be called no matter what
- } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) {
- // we can't issue more traffic now; GenerateTraffic will be called upon unblocking
- } else if (TotalOutputQueueSize >= 64 * 1024) {
- // output queue size is quite big to issue some traffic
- GenerateTraffic();
- } else if (!RamInQueue) {
- Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
- RamInQueue = new TEvRam(true);
- auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
- const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
- if (batchPeriod != TDuration()) {
- TActivationContext::Schedule(batchPeriod, ev);
- } else {
- TActivationContext::Send(ev);
- }
+
+ if (RamInQueue && !RamInQueue->Batching) {
+ // we have pending TEvRam, so GenerateTraffic will be called no matter what
+ } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) {
+ // we can't issue more traffic now; GenerateTraffic will be called upon unblocking
+ } else if (TotalOutputQueueSize >= 64 * 1024) {
+ // output queue size is quite big to issue some traffic
+ GenerateTraffic();
+ } else if (!RamInQueue) {
+ Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
+ RamInQueue = new TEvRam(true);
+ auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
+ const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
+ if (batchPeriod != TDuration()) {
+ TActivationContext::Schedule(batchPeriod, ev);
+ } else {
+ TActivationContext::Send(ev);
+ }
LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
- LOG_DEBUG_IC_SESSION("ICS17", "batching started");
- }
+ LOG_DEBUG_IC_SESSION("ICS17", "batching started");
+ }
}
- void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
+ void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
- if (inserted) {
+ if (inserted) {
Proxy->Metrics->IncSubscribersCount();
} else {
it->second = ev->Cookie;
- }
+ }
Send(ev->Sender, new TEvInterconnect::TEvNodeConnected(Proxy->PeerNodeId), 0, ev->Cookie);
- }
-
- void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
+ }
+
+ void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
}
-
- THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
- TEvHandshakeAsk *msg = ev->Get();
-
+
+ THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
+ TEvHandshakeAsk *msg = ev->Get();
+
// close existing input session, if any, and do nothing upon its destruction
- ReestablishConnection({}, false, TDisconnectReason::NewSession());
- const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
-
- LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64,
- msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
-
- return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params);
+ ReestablishConnection({}, false, TDisconnectReason::NewSession());
+ const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
+
+ LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64,
+ msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
+
+ return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params);
}
-
- void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) {
+
+ void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) {
if (ReceiverId) {
// upon destruction of input session actor invoke this callback again
- ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession());
+ ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession());
return;
}
-
- LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64,
- ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(),
- i64(*ev->Get()->Socket));
-
- NewConnectionSet = TActivationContext::Now();
+
+ LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64,
+ ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(),
+ i64(*ev->Get()->Socket));
+
+ NewConnectionSet = TActivationContext::Now();
PacketsWrittenToSocket = 0;
-
- SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
- Socket = std::move(ev->Get()->Socket);
-
- // there may be a race
- const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
-
+
+ SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
+ Socket = std::move(ev->Get()->Socket);
+
+ // there may be a race
+ const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
+
// arm watchdogs
- CloseOnIdleWatchdog.Arm(SelfId());
-
+ CloseOnIdleWatchdog.Arm(SelfId());
+
// reset activity timestamps
- LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
-
- LOG_INFO_IC_SESSION("ICS10", "traffic start");
-
+ LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
+
+ LOG_INFO_IC_SESSION("ICS10", "traffic start");
+
// create input session actor
- auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
+ auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
- ReceiveContext->UnlockLastProcessedPacketSerial();
+ ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
-
+
// register our socket in poller actor
- LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
- const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
- Y_VERIFY(success);
- ReceiveContext->WriteBlockedByFullSendBuffer = false;
-
- LostConnectionWatchdog.Disarm();
+ LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
+ const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
+ Y_VERIFY(success);
+ ReceiveContext->WriteBlockedByFullSendBuffer = false;
+
+ LostConnectionWatchdog.Disarm();
Proxy->Metrics->SetConnected(1);
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
-
+ LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
+
// arm pinger timer
- ResetFlushLogic();
-
+ ResetFlushLogic();
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// REINITIALIZE SEND QUEUE
//
// scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
// auxiliary packets; also reset packet metrics to zero to start sending from the beginning
// also reset SendQueuePos
-
+
// drop confirmed packets first as we do not need unwanted retransmissions
SendQueuePos = SendQueue.end();
- DropConfirmed(nextPacket);
-
- for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
+ DropConfirmed(nextPacket);
+
+ for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
const TSendQueue::iterator next = std::next(it);
if (it->IsEmpty()) {
- SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
+ SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
} else {
it->ResetBufs();
}
it = next;
}
- TrimSendQueueCache();
+ TrimSendQueueCache();
SendQueuePos = SendQueue.begin();
-
- TMaybe<ui64> s;
- for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
- if (!it->IsEmpty()) {
- s = it->GetSerial();
- }
- }
- const ui64 serial = s.GetOrElse(Max<ui64>());
-
+
+ TMaybe<ui64> s;
+ for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
+ if (!it->IsEmpty()) {
+ s = it->GetSerial();
+ }
+ }
+ const ui64 serial = s.GetOrElse(Max<ui64>());
+
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
- LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
- SendQueue.size(), LastConfirmed, serial);
-
+ LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
+ SendQueue.size(), LastConfirmed, serial);
+
BytesUnwritten = 0;
for (const auto& packet : SendQueue) {
- BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
- packet.GetDataSize();
+ BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
+ packet.GetDataSize();
}
-
+
SwitchStuckPeriod();
-
- LastHandshakeDone = TActivationContext::Now();
-
- RamInQueue = nullptr;
- GenerateTraffic();
- }
-
- void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
+
+ LastHandshakeDone = TActivationContext::Now();
+
+ RamInQueue = nullptr;
+ GenerateTraffic();
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
if (ev->Sender == ReceiverId) {
TEvUpdateFromInputSession& msg = *ev->Get();
-
- // update ping time
- Ping = msg.Ping;
+
+ // update ping time
+ Ping = msg.Ping;
LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
-
+
bool needConfirm = false;
-
+
// update activity timer for dead peer checker
- LastInputActivityTimestamp = TActivationContext::Now();
-
- if (msg.NumDataBytes) {
+ LastInputActivityTimestamp = TActivationContext::Now();
+
+ if (msg.NumDataBytes) {
UnconfirmedBytes += msg.NumDataBytes;
- if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) {
+ if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) {
needConfirm = true;
} else {
- SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod);
+ SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod);
}
-
+
// reset payload watchdog that controls close-on-idle behaviour
- LastPayloadActivityTimestamp = TActivationContext::Now();
- CloseOnIdleWatchdog.Reset();
+ LastPayloadActivityTimestamp = TActivationContext::Now();
+ CloseOnIdleWatchdog.Reset();
}
-
- bool unblockedSomething = false;
- LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
- unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
+
+ bool unblockedSomething = false;
+ LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
+ unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
}
-
- // generate more traffic if we have unblocked state now
- if (unblockedSomething) {
+
+ // generate more traffic if we have unblocked state now
+ if (unblockedSomething) {
LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
- GenerateTraffic();
+ GenerateTraffic();
}
-
+
// if we haven't generated any packets, then make a lone Flush packet without any data
if (needConfirm && Socket) {
++ConfirmPacketsForcedBySize;
- MakePacket(false);
- }
-
- for (;;) {
- switch (EUpdateState state = ReceiveContext->UpdateState) {
- case EUpdateState::NONE:
- case EUpdateState::CONFIRMING:
- Y_FAIL("unexpected state");
-
- case EUpdateState::INFLIGHT:
- // this message we are processing was the only one in flight, so we can reset state to NONE here
- if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) {
- return;
- }
- break;
-
- case EUpdateState::INFLIGHT_AND_PENDING:
- // there is more messages pending from the input session actor, so we have to inform it to release
- // that message
- if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) {
- Send(ev->Sender, new TEvConfirmUpdate);
- return;
- }
- break;
- }
- }
- }
+ MakePacket(false);
+ }
+
+ for (;;) {
+ switch (EUpdateState state = ReceiveContext->UpdateState) {
+ case EUpdateState::NONE:
+ case EUpdateState::CONFIRMING:
+ Y_FAIL("unexpected state");
+
+ case EUpdateState::INFLIGHT:
+ // this message we are processing was the only one in flight, so we can reset state to NONE here
+ if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) {
+ return;
+ }
+ break;
+
+ case EUpdateState::INFLIGHT_AND_PENDING:
+ // there is more messages pending from the input session actor, so we have to inform it to release
+ // that message
+ if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) {
+ Send(ev->Sender, new TEvConfirmUpdate);
+ return;
+ }
+ break;
+ }
+ }
+ }
}
-
- void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
- if (ev->Get() == RamInQueue) {
+
+ void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
+ if (ev->Get() == RamInQueue) {
LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
- RamInQueue = nullptr;
- GenerateTraffic();
- }
- }
-
- void TInterconnectSessionTCP::GenerateTraffic() {
- // generate ping request, if needed
- IssuePingRequest();
-
- if (RamInQueue && !RamInQueue->Batching) {
+ RamInQueue = nullptr;
+ GenerateTraffic();
+ }
+ }
+
+ void TInterconnectSessionTCP::GenerateTraffic() {
+ // generate ping request, if needed
+ IssuePingRequest();
+
+ if (RamInQueue && !RamInQueue->Batching) {
LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0);
- return; // we'll do it a bit later
- } else {
- RamInQueue = nullptr;
- }
-
- LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
-
+ return; // we'll do it a bit later
+ } else {
+ RamInQueue = nullptr;
+ }
+
+ LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
+
// There is a tradeoff between fairness and efficiency.
// The less traffic is generated here, the less buffering is after fair scheduler,
// the more fair system is, the less latency is present.
@@ -406,27 +406,27 @@ namespace NActors {
// the less cpu is consumed.
static const ui64 generateLimit = 64 * 1024;
- const ui64 sizeBefore = TotalOutputQueueSize;
+ const ui64 sizeBefore = TotalOutputQueueSize;
ui32 generatedPackets = 0;
ui64 generatedBytes = 0;
ui64 generateStarted = GetCycleCountFast();
-
- // apply traffic changes
- auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
-
+
+ // apply traffic changes
+ auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
+
// first, we create as many data packets as we can generate under certain conditions; they include presence
// of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
// we exit cycle
while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
if (generatedBytes >= generateLimit) {
// resume later but ensure that we have issued at least one packet
- RamInQueue = new TEvRam(false);
- Send(SelfId(), RamInQueue);
+ RamInQueue = new TEvRam(false);
+ Send(SelfId(), RamInQueue);
RamStartedCycles = GetCycleCountFast();
LWPROBE(StartRam, Proxy->PeerNodeId);
break;
- }
-
+ }
+
try {
generatedBytes += MakePacket(true);
++generatedPackets;
@@ -435,292 +435,292 @@ namespace NActors {
accountTraffic();
LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
return Terminate(TDisconnectReason::EventTooLarge());
- }
- }
-
+ }
+ }
+
if (Socket) {
WriteData();
}
LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes);
- accountTraffic();
- EqualizeCounter += ChannelScheduler->Equalize();
- }
-
- void TInterconnectSessionTCP::StartHandshake() {
- LOG_INFO_IC_SESSION("ICS15", "start handshake");
- IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
+ accountTraffic();
+ EqualizeCounter += ChannelScheduler->Equalize();
}
- void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
- ReestablishConnection({}, true, std::move(reason));
+ void TInterconnectSessionTCP::StartHandshake() {
+ LOG_INFO_IC_SESSION("ICS15", "start handshake");
+ IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
}
-
- void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
- TDisconnectReason reason) {
+
+ void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
+ ReestablishConnection({}, true, std::move(reason));
+ }
+
+ void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
+ TDisconnectReason reason) {
if (Socket) {
- LOG_INFO_IC_SESSION("ICS13", "reestablish connection");
- ShutdownSocket(std::move(reason)); // stop sending/receiving on socket
- PendingHandshakeDoneEvent = std::move(ev);
- StartHandshakeOnSessionClose = startHandshakeOnSessionClose;
+ LOG_INFO_IC_SESSION("ICS13", "reestablish connection");
+ ShutdownSocket(std::move(reason)); // stop sending/receiving on socket
+ PendingHandshakeDoneEvent = std::move(ev);
+ StartHandshakeOnSessionClose = startHandshakeOnSessionClose;
if (!ReceiverId) {
- ReestablishConnectionExecute();
+ ReestablishConnectionExecute();
}
- }
- }
-
- void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) {
+ }
+ }
+
+ void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) {
if (ev->Sender == ReceiverId) {
const bool wasConnected(Socket);
- LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
+ LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
if (wasConnected) {
// we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
// restart handshake process, closing our part of socket first
- ShutdownSocket(ev->Get()->Reason);
- StartHandshake();
+ ShutdownSocket(ev->Get()->Reason);
+ StartHandshake();
} else {
- ReestablishConnectionExecute();
+ ReestablishConnectionExecute();
}
- }
- }
-
- void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
+ }
+ }
+
+ void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
if (Socket) {
- if (const TString& s = reason.ToString()) {
+ if (const TString& s = reason.ToString()) {
Proxy->Metrics->IncDisconnectByReason(s);
- }
-
- LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
- Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
+ }
+
+ LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
+ Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
Socket->Shutdown(SHUT_RDWR);
Socket.Reset();
Proxy->Metrics->IncDisconnections();
CloseOnIdleWatchdog.Disarm();
- LostConnectionWatchdog.Arm(SelfId());
+ LostConnectionWatchdog.Arm(SelfId());
Proxy->Metrics->SetConnected(0);
- LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
+ LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
- }
-
- void TInterconnectSessionTCP::ReestablishConnectionExecute() {
- bool startHandshakeOnSessionClose = std::exchange(StartHandshakeOnSessionClose, false);
- TEvHandshakeDone::TPtr ev = std::move(PendingHandshakeDoneEvent);
-
- if (startHandshakeOnSessionClose) {
- StartHandshake();
- } else if (ev) {
- SetNewConnection(ev);
+ }
+
+ void TInterconnectSessionTCP::ReestablishConnectionExecute() {
+ bool startHandshakeOnSessionClose = std::exchange(StartHandshakeOnSessionClose, false);
+ TEvHandshakeDone::TPtr ev = std::move(PendingHandshakeDoneEvent);
+
+ if (startHandshakeOnSessionClose) {
+ StartHandshake();
+ } else if (ev) {
+ SetNewConnection(ev);
}
- }
-
- void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
- LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
- ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
- if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
+ LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
+ ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
+ if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
Proxy->Metrics->IncUsefulWriteWakeups();
ui64 nowCycles = GetCycleCountFast();
double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
- GenerateTraffic();
- } else if (!ev->Cookie) {
+ GenerateTraffic();
+ } else if (!ev->Cookie) {
Proxy->Metrics->IncSpuriousWriteWakeups();
}
- if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
- Send(ReceiverId, ev->Release().Release(), 0, 1);
- }
- }
-
- void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- PollerToken = std::move(ev->Get()->PollerToken);
- if (ReceiveContext->WriteBlockedByFullSendBuffer) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- PollerToken->Request(secure->WantRead(), secure->WantWrite());
- } else {
- PollerToken->Request(false, true);
- }
- }
- }
-
- void TInterconnectSessionTCP::WriteData() {
- ui64 written = 0;
-
+ if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
+ Send(ReceiverId, ev->Release().Release(), 0, 1);
+ }
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
+ PollerToken = std::move(ev->Get()->PollerToken);
+ if (ReceiveContext->WriteBlockedByFullSendBuffer) {
+ if (Params.Encryption) {
+ auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
+ PollerToken->Request(secure->WantRead(), secure->WantWrite());
+ } else {
+ PollerToken->Request(false, true);
+ }
+ }
+ }
+
+ void TInterconnectSessionTCP::WriteData() {
+ ui64 written = 0;
+
Y_VERIFY(Socket); // ensure that socket wasn't closed
-
- LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
+
+ LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
constexpr ui32 iovLimit = 256;
-#ifdef _linux_
- ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
-#else
- ui32 maxElementsInIOV = 64;
-#endif
- if (Params.Encryption) {
- maxElementsInIOV = 1;
- }
-
+#ifdef _linux_
+ ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
+#else
+ ui32 maxElementsInIOV = 64;
+#endif
+ if (Params.Encryption) {
+ maxElementsInIOV = 1;
+ }
+
// vector of write buffers with preallocated stack space
TStackVec<TConstIoVec, iovLimit> wbuffers;
-
- LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
- ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
-
- // update last confirmed packet number if it has changed
- if (SendQueuePos != SendQueue.end()) {
- SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial());
- }
-
- while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
- for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
+
+ LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
+ ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
+
+ // update last confirmed packet number if it has changed
+ if (SendQueuePos != SendQueue.end()) {
+ SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial());
+ }
+
+ while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
+ for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
it->AppendToIoVector(wbuffers, maxElementsInIOV);
- }
-
+ }
+
const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
int iovcnt = wbuffers.size();
-
+
Y_VERIFY(iovcnt > 0);
Y_VERIFY(iovec->iov_len > 0);
-
- TString err;
- ssize_t r = 0;
- do {
-#ifndef _win_
- r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt);
-#else
- r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
-#endif
+
+ TString err;
+ ssize_t r = 0;
+ do {
+#ifndef _win_
+ r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt);
+#else
+ r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
+#endif
Proxy->Metrics->IncSendSyscalls();
- } while (r == -EINTR);
-
- LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
-
+ } while (r == -EINTR);
+
+ LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
+
wbuffers.clear();
-
+
if (r > 0) {
Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
BytesUnwritten -= r;
- written += r;
+ written += r;
ui64 packets = 0;
-
+
// advance SendQueuePos to eat all processed items
for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
- if (!SendQueuePos->IsEmpty()) {
- LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
- }
+ if (!SendQueuePos->IsEmpty()) {
+ LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
+ }
++PacketsWrittenToSocket;
++packets;
LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
}
LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
- } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
- const TString message = r == 0 ? "connection closed by peer"
- : err ? err
- : Sprintf("socket: %s", strerror(-r));
+ } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
+ const TString message = r == 0 ? "connection closed by peer"
+ : err ? err
+ : Sprintf("socket: %s", strerror(-r));
LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
- if (written) {
+ if (written) {
Proxy->Metrics->AddTotalBytesWritten(written);
- }
- return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
+ }
+ return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
} else {
- // we have to do some hack for secure socket -- mark the packet as 'tried writing'
- if (Params.Encryption) {
- Y_VERIFY(SendQueuePos != SendQueue.end());
- SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
- }
-
+ // we have to do some hack for secure socket -- mark the packet as 'tried writing'
+ if (Params.Encryption) {
+ Y_VERIFY(SendQueuePos != SendQueue.end());
+ SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
+ }
+
// we have received EAGAIN error code, this means that we can't issue more data until we have received
// TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
- Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
- ReceiveContext->WriteBlockedByFullSendBuffer = true;
+ Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
+ ReceiveContext->WriteBlockedByFullSendBuffer = true;
WriteBlockedCycles = GetCycleCountFast();
LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written);
- LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
-
- if (PollerToken) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- PollerToken->Request(secure->WantRead(), secure->WantWrite());
- } else {
- PollerToken->Request(false, true);
- }
+ LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
+
+ if (PollerToken) {
+ if (Params.Encryption) {
+ auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
+ PollerToken->Request(secure->WantRead(), secure->WantWrite());
+ } else {
+ PollerToken->Request(false, true);
+ }
}
- }
- }
- }
- if (written) {
+ }
+ }
+ }
+ if (written) {
Proxy->Metrics->AddTotalBytesWritten(written);
- }
- }
-
- void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
+ }
+ }
+
+ void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
if (period != TDuration::Max()) {
- const TInstant when = TActivationContext::Now() + period;
+ const TInstant when = TActivationContext::Now() + period;
if (when < ForcePacketTimestamp) {
ForcePacketTimestamp = when;
- ScheduleFlush();
+ ScheduleFlush();
+ }
+ }
+ }
+
+ void TInterconnectSessionTCP::ScheduleFlush() {
+ if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
+ Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
+ FlushSchedule.push(ForcePacketTimestamp);
+ MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
+ ++FlushEventsScheduled;
+ }
+ }
+
+ void TInterconnectSessionTCP::HandleFlush() {
+ const TInstant now = TActivationContext::Now();
+ while (FlushSchedule && now >= FlushSchedule.top()) {
+ FlushSchedule.pop();
+ }
+ IssuePingRequest();
+ if (Socket) {
+ if (now >= ForcePacketTimestamp) {
+ ++ConfirmPacketsForcedByTimeout;
+ ++FlushEventsProcessed;
+ MakePacket(false); // just generate confirmation packet if we have preconditions for this
+ } else if (ForcePacketTimestamp != TInstant::Max()) {
+ ScheduleFlush();
}
- }
- }
-
- void TInterconnectSessionTCP::ScheduleFlush() {
- if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
- Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
- FlushSchedule.push(ForcePacketTimestamp);
- MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
- ++FlushEventsScheduled;
}
- }
-
- void TInterconnectSessionTCP::HandleFlush() {
- const TInstant now = TActivationContext::Now();
- while (FlushSchedule && now >= FlushSchedule.top()) {
- FlushSchedule.pop();
- }
- IssuePingRequest();
- if (Socket) {
- if (now >= ForcePacketTimestamp) {
- ++ConfirmPacketsForcedByTimeout;
- ++FlushEventsProcessed;
- MakePacket(false); // just generate confirmation packet if we have preconditions for this
- } else if (ForcePacketTimestamp != TInstant::Max()) {
- ScheduleFlush();
- }
- }
- }
-
- void TInterconnectSessionTCP::ResetFlushLogic() {
+ }
+
+ void TInterconnectSessionTCP::ResetFlushLogic() {
ForcePacketTimestamp = TInstant::Max();
UnconfirmedBytes = 0;
- const TDuration ping = Proxy->Common->Settings.PingPeriod;
- if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
- SetForcePacketTimestamp(ping);
+ const TDuration ping = Proxy->Common->Settings.PingPeriod;
+ if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
+ SetForcePacketTimestamp(ping);
+ }
+ }
+
+ void TInterconnectSessionTCP::TrimSendQueueCache() {
+ static constexpr size_t maxItems = 32;
+ static constexpr size_t trimThreshold = maxItems * 2;
+ if (SendQueueCache.size() >= trimThreshold) {
+ auto it = SendQueueCache.end();
+ for (size_t n = SendQueueCache.size() - maxItems; n; --n) {
+ --it;
+ }
+
+ auto ev = std::make_unique<TEvFreeItems>();
+ ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end());
+ ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask);
+ if (ev->GetInLineForDestruction(Proxy->Common)) {
+ Send(Proxy->Common->DestructorId, ev.release());
+ }
}
- }
-
- void TInterconnectSessionTCP::TrimSendQueueCache() {
- static constexpr size_t maxItems = 32;
- static constexpr size_t trimThreshold = maxItems * 2;
- if (SendQueueCache.size() >= trimThreshold) {
- auto it = SendQueueCache.end();
- for (size_t n = SendQueueCache.size() - maxItems; n; --n) {
- --it;
- }
-
- auto ev = std::make_unique<TEvFreeItems>();
- ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end());
- ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask);
- if (ev->GetInLineForDestruction(Proxy->Common)) {
- Send(Proxy->Common->DestructorId, ev.release());
- }
- }
- }
-
+ }
+
ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
Y_VERIFY(Socket);
-
+
TSendQueue::iterator packet;
if (SendQueueCache) {
// we have entries in cache, take one and move it to the end of SendQueue
@@ -729,191 +729,191 @@ namespace NActors {
packet->Reuse(); // reset packet to initial state
} else {
// we have to allocate new packet, so just do it
- LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
- packet = SendQueue.emplace(SendQueue.end(), Params);
+ LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
+ packet = SendQueue.emplace(SendQueue.end(), Params);
}
- }
-
- // update send queue position
- if (SendQueuePos == SendQueue.end()) {
- SendQueuePos = packet; // start sending this packet if we are not sending anything for now
- }
-
- ui64 serial = 0;
-
+ }
+
+ // update send queue position
+ if (SendQueuePos == SendQueue.end()) {
+ SendQueuePos = packet; // start sending this packet if we are not sending anything for now
+ }
+
+ ui64 serial = 0;
+
if (data) {
- // generate serial for this data packet
- serial = ++OutputCounter;
-
- // fill the data packet
- Y_VERIFY(NumEventsInReadyChannels);
- LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
- FillSendingBuffer(*packet, serial);
+ // generate serial for this data packet
+ serial = ++OutputCounter;
+
+ // fill the data packet
+ Y_VERIFY(NumEventsInReadyChannels);
+ LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
+ FillSendingBuffer(*packet, serial);
}
- Y_VERIFY(!packet->IsEmpty());
-
- InflightDataAmount += packet->GetDataSize();
+ Y_VERIFY(!packet->IsEmpty());
+
+ InflightDataAmount += packet->GetDataSize();
Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
- if (InflightDataAmount > GetTotalInflightAmountOfData()) {
+ if (InflightDataAmount > GetTotalInflightAmountOfData()) {
Proxy->Metrics->IncInflyLimitReach();
}
-
+
if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
}
// update payload activity timer
- LastPayloadActivityTimestamp = TActivationContext::Now();
- } else if (pingMask) {
- serial = *pingMask;
-
- // make this packet a priority one
- if (SendQueuePos != packet) {
- Y_VERIFY(SendQueuePos != SendQueue.end());
- if (SendQueuePos->IsAtBegin()) {
- // insert this packet just before the next being sent and step back
- SendQueue.splice(SendQueuePos, SendQueue, packet);
- --SendQueuePos;
- Y_VERIFY(SendQueuePos == packet);
- } else {
- // current packet is already being sent, so move new packet just after it
- SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
- }
- }
+ LastPayloadActivityTimestamp = TActivationContext::Now();
+ } else if (pingMask) {
+ serial = *pingMask;
+
+ // make this packet a priority one
+ if (SendQueuePos != packet) {
+ Y_VERIFY(SendQueuePos != SendQueue.end());
+ if (SendQueuePos->IsAtBegin()) {
+ // insert this packet just before the next being sent and step back
+ SendQueue.splice(SendQueuePos, SendQueue, packet);
+ --SendQueuePos;
+ Y_VERIFY(SendQueuePos == packet);
+ } else {
+ // current packet is already being sent, so move new packet just after it
+ SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
+ }
+ }
}
-
- const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
- packet->SetMetadata(serial, lastInputSerial);
+
+ const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
+ packet->SetMetadata(serial, lastInputSerial);
packet->Sign();
-
+
// count number of bytes pending for write
ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
BytesUnwritten += packetSize;
-
- LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
- " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
- InflightDataAmount, BytesUnwritten);
-
+
+ LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
+ " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
+ InflightDataAmount, BytesUnwritten);
+
// reset forced packet sending timestamp as we have confirmed all received data
- ResetFlushLogic();
-
+ ResetFlushLogic();
+
++PacketsGenerated;
LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
-
+
if (!data) {
- WriteData();
+ WriteData();
}
return packetSize;
- }
-
- bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
- LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
-
- Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
- "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
- LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
+ }
+
+ bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
+ LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
+
+ Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
+ "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
+ LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
LastConfirmed = confirm;
-
+
ui64 droppedDataAmount = 0;
ui32 numDropped = 0;
-
+
// drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
// making Serial <= confirm true
TSendQueue::iterator it;
- ui64 lastDroppedSerial = 0;
- for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
- if (!it->IsEmpty()) {
- lastDroppedSerial = it->GetSerial();
- }
- droppedDataAmount += it->GetDataSize();
+ ui64 lastDroppedSerial = 0;
+ for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
+ if (!it->IsEmpty()) {
+ lastDroppedSerial = it->GetSerial();
+ }
+ droppedDataAmount += it->GetDataSize();
++numDropped;
- }
- SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
- TrimSendQueueCache();
- ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
- channel.DropConfirmed(lastDroppedSerial);
- });
-
+ }
+ SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
+ TrimSendQueueCache();
+ ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
+ channel.DropConfirmed(lastDroppedSerial);
+ });
+
const ui64 current = InflightDataAmount;
const ui64 limit = GetTotalInflightAmountOfData();
const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
-
+
PacketsConfirmed += numDropped;
- InflightDataAmount -= droppedDataAmount;
+ InflightDataAmount -= droppedDataAmount;
Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
-
- LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
- " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
-
- Pool->Trim(); // send any unsent free requests
-
- return unblockedSomething;
+
+ LOG_DEBUG_IC_SESSION("ICS24", "exit InflightDataAmount: %" PRIu64 " bytes droppedDataAmount: %" PRIu64 " bytes"
+ " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
+
+ Pool->Trim(); // send any unsent free requests
+
+ return unblockedSomething;
}
-
- void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
- ui32 bytesGenerated = 0;
-
- Y_VERIFY(NumEventsInReadyChannels);
- while (NumEventsInReadyChannels) {
- TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight();
- Y_VERIFY_DEBUG(!channel->IsEmpty());
-
- // generate some data within this channel
- const ui64 netBefore = channel->GetBufferedAmountOfData();
- ui64 gross = 0;
- const bool eventDone = channel->FeedBuf(task, serial, &gross);
- channel->UnaccountedTraffic += gross;
- const ui64 netAfter = channel->GetBufferedAmountOfData();
- Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink
- const ui64 net = netBefore - netAfter; // number of net bytes serialized
-
+
+ void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
+ ui32 bytesGenerated = 0;
+
+ Y_VERIFY(NumEventsInReadyChannels);
+ while (NumEventsInReadyChannels) {
+ TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight();
+ Y_VERIFY_DEBUG(!channel->IsEmpty());
+
+ // generate some data within this channel
+ const ui64 netBefore = channel->GetBufferedAmountOfData();
+ ui64 gross = 0;
+ const bool eventDone = channel->FeedBuf(task, serial, &gross);
+ channel->UnaccountedTraffic += gross;
+ const ui64 netAfter = channel->GetBufferedAmountOfData();
+ Y_VERIFY_DEBUG(netAfter <= netBefore); // net amount should shrink
+ const ui64 net = netBefore - netAfter; // number of net bytes serialized
+
// adjust metrics for local and global queue size
- TotalOutputQueueSize -= net;
+ TotalOutputQueueSize -= net;
Proxy->Metrics->SubOutputBuffersTotalSize(net);
- bytesGenerated += gross;
- Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
-
- // return it back to queue or delete, depending on whether this channel is still working or not
- ChannelScheduler->FinishPick(gross, EqualizeCounter);
-
- // update some stats if the packet was fully serialized
- if (eventDone) {
- ++MessagesWrittenToBuffer;
-
- Y_VERIFY(NumEventsInReadyChannels);
- --NumEventsInReadyChannels;
-
- if (!NumEventsInReadyChannels) {
- SetOutputStuckFlag(false);
- }
- }
-
- if (!gross) { // no progress -- almost full packet buffer
- break;
- }
- }
-
+ bytesGenerated += gross;
+ Y_VERIFY_DEBUG(!!net == !!gross && gross >= net, "net# %" PRIu64 " gross# %" PRIu64, net, gross);
+
+ // return it back to queue or delete, depending on whether this channel is still working or not
+ ChannelScheduler->FinishPick(gross, EqualizeCounter);
+
+ // update some stats if the packet was fully serialized
+ if (eventDone) {
+ ++MessagesWrittenToBuffer;
+
+ Y_VERIFY(NumEventsInReadyChannels);
+ --NumEventsInReadyChannels;
+
+ if (!NumEventsInReadyChannels) {
+ SetOutputStuckFlag(false);
+ }
+ }
+
+ if (!gross) { // no progress -- almost full packet buffer
+ break;
+ }
+ }
+
LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal);
- Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization
- }
-
- ui32 TInterconnectSessionTCP::CalculateQueueUtilization() {
- SwitchStuckPeriod();
- ui64 sumBusy = 0, sumPeriod = 0;
- for (auto iter = OutputQueueUtilization.begin(); iter != OutputQueueUtilization.end() - 1; ++iter) {
- sumBusy += iter->first;
- sumPeriod += iter->second;
- }
- return sumBusy * 1000000 / sumPeriod;
- }
-
- void TInterconnectSessionTCP::SendUpdateToWhiteboard(bool connected) {
- const ui32 utilization = Socket ? CalculateQueueUtilization() : 0;
-
- if (const auto& callback = Proxy->Common->UpdateWhiteboard) {
+ Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization
+ }
+
+ ui32 TInterconnectSessionTCP::CalculateQueueUtilization() {
+ SwitchStuckPeriod();
+ ui64 sumBusy = 0, sumPeriod = 0;
+ for (auto iter = OutputQueueUtilization.begin(); iter != OutputQueueUtilization.end() - 1; ++iter) {
+ sumBusy += iter->first;
+ sumPeriod += iter->second;
+ }
+ return sumBusy * 1000000 / sumPeriod;
+ }
+
+ void TInterconnectSessionTCP::SendUpdateToWhiteboard(bool connected) {
+ const ui32 utilization = Socket ? CalculateQueueUtilization() : 0;
+
+ if (const auto& callback = Proxy->Common->UpdateWhiteboard) {
enum class EFlag {
GREEN,
YELLOW,
@@ -921,59 +921,59 @@ namespace NActors {
RED,
};
EFlag flagState = EFlag::RED;
-
+
if (Socket) {
flagState = EFlag::GREEN;
-
+
do {
- auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
+ auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
flagState = EFlag::ORANGE;
break;
} else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
flagState = EFlag::YELLOW;
}
-
+
// check utilization
- if (utilization > 875000) { // 7/8
+ if (utilization > 875000) { // 7/8
flagState = EFlag::ORANGE;
break;
- } else if (utilization > 500000) { // 1/2
+ } else if (utilization > 500000) { // 1/2
flagState = EFlag::YELLOW;
}
} while (false);
}
-
+
callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
connected,
flagState == EFlag::GREEN,
flagState == EFlag::YELLOW,
flagState == EFlag::ORANGE,
flagState == EFlag::RED,
- TlsActivationContext->ExecutorThread.ActorSystem);
- }
-
- if (connected) {
- Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
- }
- }
-
+ TlsActivationContext->ExecutorThread.ActorSystem);
+ }
+
+ if (connected) {
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
+ }
+ }
+
void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
if (OutputStuckFlag == state)
return;
-
+
if (OutputQueueUtilization.Size() == 0)
return;
-
+
auto& lastpair = OutputQueueUtilization.Last();
if (state)
lastpair.first -= GetCycleCountFast();
else
lastpair.first += GetCycleCountFast();
-
+
OutputStuckFlag = state;
}
-
+
void TInterconnectSessionTCP::SwitchStuckPeriod() {
auto now = GetCycleCountFast();
if (OutputQueueUtilization.Size() != 0) {
@@ -984,51 +984,51 @@ namespace NActors {
}
OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
- if (OutputStuckFlag)
+ if (OutputStuckFlag)
OutputQueueUtilization.Last().first -= now;
- }
-
- TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const {
- return Coalesce(Proxy->Common->Settings.DeadPeer, DEFAULT_DEADPEER_TIMEOUT);
- }
-
- TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const {
- return Proxy->Common->Settings.CloseOnIdle;
- }
-
- TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const {
- return Coalesce(Proxy->Common->Settings.LostConnection, DEFAULT_LOST_CONNECTION_TIMEOUT);
- }
-
- ui32 TInterconnectSessionTCP::GetTotalInflightAmountOfData() const {
- return Coalesce(Proxy->Common->Settings.TotalInflightAmountOfData, DEFAULT_TOTAL_INFLIGHT_DATA);
- }
-
- ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const {
+ }
+
+ TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const {
+ return Coalesce(Proxy->Common->Settings.DeadPeer, DEFAULT_DEADPEER_TIMEOUT);
+ }
+
+ TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const {
+ return Proxy->Common->Settings.CloseOnIdle;
+ }
+
+ TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const {
+ return Coalesce(Proxy->Common->Settings.LostConnection, DEFAULT_LOST_CONNECTION_TIMEOUT);
+ }
+
+ ui32 TInterconnectSessionTCP::GetTotalInflightAmountOfData() const {
+ return Coalesce(Proxy->Common->Settings.TotalInflightAmountOfData, DEFAULT_TOTAL_INFLIGHT_DATA);
+ }
+
+ ui64 TInterconnectSessionTCP::GetMaxCyclesPerEvent() const {
return DurationToCycles(TDuration::MicroSeconds(50));
- }
-
- void TInterconnectSessionTCP::IssuePingRequest() {
- const TInstant now = TActivationContext::Now();
- if (now >= LastPingTimestamp + PingPeriodicity) {
- LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
- if (Socket) {
+ }
+
+ void TInterconnectSessionTCP::IssuePingRequest() {
+ const TInstant now = TActivationContext::Now();
+ if (now >= LastPingTimestamp + PingPeriodicity) {
+ LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
+ if (Socket) {
MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
- }
- if (Socket) {
- MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
- }
- LastPingTimestamp = now;
- }
- }
-
- void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) {
- if (Socket) {
- MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask);
- }
- }
-
- void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
+ }
+ if (Socket) {
+ MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
+ }
+ LastPingTimestamp = now;
+ }
+ }
+
+ void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) {
+ if (Socket) {
+ MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask);
+ }
+ }
+
+ void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
HTML(str) {
DIV_CLASS("panel panel-info") {
DIV_CLASS("panel-heading") {
@@ -1045,76 +1045,76 @@ namespace NActors {
str << "Value";
}
}
- }
+ }
TABLEBODY() {
TABLER() {
TABLED() {
- str << "Encryption";
- }
- TABLED() {
- str << (Params.Encryption ? "<font color=green>Enabled</font>" : "<font color=red>Disabled</font>");
- }
- }
- if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Socket.Get())) {
- TABLER() {
- TABLED() {
- str << "Cipher name";
- }
- TABLED() {
- str << x->GetCipherName();
- }
- }
- TABLER() {
- TABLED() {
- str << "Cipher bits";
- }
- TABLED() {
- str << x->GetCipherBits();
- }
- }
- TABLER() {
- TABLED() {
- str << "Protocol";
- }
- TABLED() {
- str << x->GetProtocolName();
- }
- }
- TABLER() {
- TABLED() {
- str << "Peer CN";
- }
- TABLED() {
- str << x->GetPeerCommonName();
- }
- }
- }
- TABLER() {
- TABLED() { str << "AuthOnly CN"; }
- TABLED() { str << Params.AuthCN; }
- }
- TABLER() {
- TABLED() {
- str << "Local scope id";
- }
- TABLED() {
- str << ScopeIdToString(Proxy->Common->LocalScopeId);
- }
- }
- TABLER() {
- TABLED() {
- str << "Peer scope id";
- }
- TABLED() {
- str << ScopeIdToString(Params.PeerScopeId);
- }
- }
- TABLER() {
- TABLED() {
+ str << "Encryption";
+ }
+ TABLED() {
+ str << (Params.Encryption ? "<font color=green>Enabled</font>" : "<font color=red>Disabled</font>");
+ }
+ }
+ if (auto *x = dynamic_cast<NInterconnect::TSecureSocket*>(Socket.Get())) {
+ TABLER() {
+ TABLED() {
+ str << "Cipher name";
+ }
+ TABLED() {
+ str << x->GetCipherName();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Cipher bits";
+ }
+ TABLED() {
+ str << x->GetCipherBits();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Protocol";
+ }
+ TABLED() {
+ str << x->GetProtocolName();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Peer CN";
+ }
+ TABLED() {
+ str << x->GetPeerCommonName();
+ }
+ }
+ }
+ TABLER() {
+ TABLED() { str << "AuthOnly CN"; }
+ TABLED() { str << Params.AuthCN; }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Local scope id";
+ }
+ TABLED() {
+ str << ScopeIdToString(Proxy->Common->LocalScopeId);
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Peer scope id";
+ }
+ TABLED() {
+ str << ScopeIdToString(Params.PeerScopeId);
+ }
+ }
+ TABLER() {
+ TABLED() {
str << "This page generated at";
}
TABLED() {
- str << TActivationContext::Now() << " / " << Now();
+ str << TActivationContext::Now() << " / " << Now();
}
}
TABLER() {
@@ -1122,13 +1122,13 @@ namespace NActors {
str << "SelfID";
}
TABLED() {
- str << SelfId().ToString();
+ str << SelfId().ToString();
}
}
- TABLER() {
- TABLED() { str << "Frame version/Checksum"; }
- TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
- }
+ TABLER() {
+ TABLED() { str << "Frame version/Checksum"; }
+ TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
+ }
#define MON_VAR(NAME) \
TABLER() { \
TABLED() { \
@@ -1138,7 +1138,7 @@ namespace NActors {
str << NAME; \
} \
}
-
+
MON_VAR(Created)
MON_VAR(NewConnectionSet)
MON_VAR(ReceiverId)
@@ -1150,7 +1150,7 @@ namespace NActors {
MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
MON_VAR(ConfirmPacketsForcedBySize)
MON_VAR(ConfirmPacketsForcedByTimeout)
-
+
TABLER() {
TABLED() {
str << "Virtual self ID";
@@ -1158,7 +1158,7 @@ namespace NActors {
TABLED() {
str << Proxy->SessionVirtualId.ToString();
}
- }
+ }
TABLER() {
TABLED() {
str << "Virtual peer ID";
@@ -1175,54 +1175,54 @@ namespace NActors {
str << (Socket ? i64(*Socket) : -1);
}
}
-
- ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
-
+
+ ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
+
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
MON_VAR(SendQueueCache.size())
- MON_VAR(NumEventsInReadyChannels)
+ MON_VAR(NumEventsInReadyChannels)
MON_VAR(TotalOutputQueueSize)
MON_VAR(BytesUnwritten)
- MON_VAR(InflightDataAmount)
- MON_VAR(unsentQueueSize)
- MON_VAR(SendBufferSize)
+ MON_VAR(InflightDataAmount)
+ MON_VAR(unsentQueueSize)
+ MON_VAR(SendBufferSize)
MON_VAR(LastInputActivityTimestamp)
MON_VAR(LastPayloadActivityTimestamp)
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
- MON_VAR(LastSentSerial)
- MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
+ MON_VAR(LastSentSerial)
+ MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
MON_VAR(LastConfirmed)
- MON_VAR(FlushSchedule.size())
- MON_VAR(MaxFlushSchedule)
- MON_VAR(FlushEventsScheduled)
- MON_VAR(FlushEventsProcessed)
-
- TString clockSkew;
- i64 x = GetClockSkew();
- if (x < 0) {
- clockSkew = Sprintf("-%s", TDuration::MicroSeconds(-x).ToString().data());
- } else {
- clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
- }
-
- MON_VAR(LastPingTimestamp)
- MON_VAR(GetPingRTT())
- MON_VAR(clockSkew)
-
+ MON_VAR(FlushSchedule.size())
+ MON_VAR(MaxFlushSchedule)
+ MON_VAR(FlushEventsScheduled)
+ MON_VAR(FlushEventsProcessed)
+
+ TString clockSkew;
+ i64 x = GetClockSkew();
+ if (x < 0) {
+ clockSkew = Sprintf("-%s", TDuration::MicroSeconds(-x).ToString().data());
+ } else {
+ clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
+ }
+
+ MON_VAR(LastPingTimestamp)
+ MON_VAR(GetPingRTT())
+ MON_VAR(clockSkew)
+
MON_VAR(GetDeadPeerTimeout())
- MON_VAR(GetTotalInflightAmountOfData())
- MON_VAR(GetCloseOnIdleTimeout())
- MON_VAR(Subscribers.size())
+ MON_VAR(GetTotalInflightAmountOfData())
+ MON_VAR(GetCloseOnIdleTimeout())
+ MON_VAR(Subscribers.size())
}
- }
- }
- }
- }
- }
-
- void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
- TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common));
+ }
+ }
+ }
+ }
+ }
+
+ void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
+ TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common));
}
-}
+}