aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp598
1 files changed, 299 insertions, 299 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 0e18a20072..2ded7f9f53 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -10,14 +10,14 @@
#include <library/cpp/monlib/service/pages/templates.h>
namespace NActors {
- LWTRACE_USING(ACTORLIB_PROVIDER);
+ LWTRACE_USING(ACTORLIB_PROVIDER);
- DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
+ DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
template<typename T>
T Coalesce(T&& x) {
return x;
- }
+ }
template<typename T, typename T2, typename... TRest>
typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) {
@@ -26,22 +26,22 @@ namespace NActors {
} else {
return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...);
}
- }
+ }
TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params)
- : TActor(&TInterconnectSessionTCP::StateFunc)
+ : TActor(&TInterconnectSessionTCP::StateFunc)
, Created(TInstant::Now())
- , Proxy(proxy)
+ , Proxy(proxy)
, CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this))
, LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this))
, Params(std::move(params))
- , TotalOutputQueueSize(0)
- , OutputStuckFlag(false)
- , OutputQueueUtilization(16)
- , OutputCounter(0ULL)
- {
+ , TotalOutputQueueSize(0)
+ , OutputStuckFlag(false)
+ , OutputQueueUtilization(16)
+ , OutputCounter(0ULL)
+ {
Proxy->Metrics->SetConnected(0);
- ReceiveContext.Reset(new TReceiveContext);
+ ReceiveContext.Reset(new TReceiveContext);
}
TInterconnectSessionTCP::~TInterconnectSessionTCP() {
@@ -62,7 +62,7 @@ namespace NActors {
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId);
SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId));
SendUpdateToWhiteboard();
- }
+ }
void TInterconnectSessionTCP::CloseInputSession() {
Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession);
@@ -84,7 +84,7 @@ namespace NActors {
for (const auto& kv : Subscribers) {
Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second);
- }
+ }
Proxy->Metrics->SubSubscribersCount(Subscribers.size());
Subscribers.clear();
@@ -108,7 +108,7 @@ namespace NActors {
}
TActor::PassAway();
- }
+ }
void TInterconnectSessionTCP::PassAway() {
Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly");
@@ -118,13 +118,13 @@ namespace NActors {
Proxy->ValidateEvent(ev, "Forward");
LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
- ++MessagesGot;
+ ++MessagesGot;
- if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
+ if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Subscribe(ev);
- }
+ }
- ui16 evChannel = ev->GetChannel();
+ ui16 evChannel = ev->GetChannel();
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
@@ -136,31 +136,31 @@ namespace NActors {
if (!wasWorking) {
// this channel has returned to work -- it was empty and this we have just put first event in the queue
ChannelScheduler->AddToHeap(oChannel, EqualizeCounter);
- }
+ }
- SetOutputStuckFlag(true);
+ SetOutputStuckFlag(true);
++NumEventsInReadyChannels;
LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
- QueueSizeInEvents = oChannel.GetQueueSize(),
+ QueueSizeInEvents = oChannel.GetQueueSize(),
QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
- // check for overloaded queues
+ // check for overloaded queues
ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
- if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
+ if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) {
LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64,
- Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
+ Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit);
return Terminate(TDisconnectReason::QueueOverload());
- }
+ }
ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20);
if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) {
LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size");
if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) {
CreateSessionKillingActor(Proxy->Common);
- }
- }
+ }
+ }
if (RamInQueue && !RamInQueue->Batching) {
// we have pending TEvRam, so GenerateTraffic will be called no matter what
@@ -198,12 +198,12 @@ namespace NActors {
void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data());
Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender));
- }
+ }
THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) {
TEvHandshakeAsk *msg = ev->Get();
- // close existing input session, if any, and do nothing upon its destruction
+ // close existing input session, if any, and do nothing upon its destruction
ReestablishConnection({}, false, TDisconnectReason::NewSession());
const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
@@ -211,21 +211,21 @@ namespace NActors {
msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params);
- }
+ }
void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) {
- if (ReceiverId) {
- // upon destruction of input session actor invoke this callback again
+ if (ReceiverId) {
+ // upon destruction of input session actor invoke this callback again
ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession());
- return;
- }
+ return;
+ }
LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64,
ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(),
i64(*ev->Get()->Socket));
NewConnectionSet = TActivationContext::Now();
- PacketsWrittenToSocket = 0;
+ PacketsWrittenToSocket = 0;
SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
Socket = std::move(ev->Get()->Socket);
@@ -233,21 +233,21 @@ namespace NActors {
// there may be a race
const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
- // arm watchdogs
+ // arm watchdogs
CloseOnIdleWatchdog.Arm(SelfId());
- // reset activity timestamps
+ // reset activity timestamps
LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
LOG_INFO_IC_SESSION("ICS10", "traffic start");
- // create input session actor
+ // create input session actor
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
ReceiveContext->UnlockLastProcessedPacketSerial();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
- // register our socket in poller actor
+ // register our socket in poller actor
LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");
const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId()));
Y_VERIFY(success);
@@ -257,31 +257,31 @@ namespace NActors {
Proxy->Metrics->SetConnected(1);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId);
- // arm pinger timer
+ // arm pinger timer
ResetFlushLogic();
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // REINITIALIZE SEND QUEUE
- //
- // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // REINITIALIZE SEND QUEUE
+ //
+ // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
// auxiliary packets; also reset packet metrics to zero to start sending from the beginning
- // also reset SendQueuePos
+ // also reset SendQueuePos
- // drop confirmed packets first as we do not need unwanted retransmissions
- SendQueuePos = SendQueue.end();
+ // drop confirmed packets first as we do not need unwanted retransmissions
+ SendQueuePos = SendQueue.end();
DropConfirmed(nextPacket);
for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
- const TSendQueue::iterator next = std::next(it);
- if (it->IsEmpty()) {
+ const TSendQueue::iterator next = std::next(it);
+ if (it->IsEmpty()) {
SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
- } else {
- it->ResetBufs();
- }
- it = next;
- }
+ } else {
+ it->ResetBufs();
+ }
+ it = next;
+ }
TrimSendQueueCache();
- SendQueuePos = SendQueue.begin();
+ SendQueuePos = SendQueue.begin();
TMaybe<ui64> s;
for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
@@ -295,13 +295,13 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
SendQueue.size(), LastConfirmed, serial);
- BytesUnwritten = 0;
- for (const auto& packet : SendQueue) {
+ BytesUnwritten = 0;
+ for (const auto& packet : SendQueue) {
BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
packet.GetDataSize();
- }
+ }
- SwitchStuckPeriod();
+ SwitchStuckPeriod();
LastHandshakeDone = TActivationContext::Now();
@@ -310,45 +310,45 @@ namespace NActors {
}
void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
- if (ev->Sender == ReceiverId) {
- TEvUpdateFromInputSession& msg = *ev->Get();
+ if (ev->Sender == ReceiverId) {
+ TEvUpdateFromInputSession& msg = *ev->Get();
// update ping time
Ping = msg.Ping;
LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat());
- bool needConfirm = false;
+ bool needConfirm = false;
- // update activity timer for dead peer checker
+ // update activity timer for dead peer checker
LastInputActivityTimestamp = TActivationContext::Now();
if (msg.NumDataBytes) {
- UnconfirmedBytes += msg.NumDataBytes;
+ UnconfirmedBytes += msg.NumDataBytes;
if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) {
- needConfirm = true;
- } else {
+ needConfirm = true;
+ } else {
SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod);
- }
+ }
- // reset payload watchdog that controls close-on-idle behaviour
+ // reset payload watchdog that controls close-on-idle behaviour
LastPayloadActivityTimestamp = TActivationContext::Now();
CloseOnIdleWatchdog.Reset();
- }
+ }
bool unblockedSomething = false;
LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
- }
+ }
// generate more traffic if we have unblocked state now
if (unblockedSomething) {
LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
GenerateTraffic();
- }
+ }
- // if we haven't generated any packets, then make a lone Flush packet without any data
- if (needConfirm && Socket) {
- ++ConfirmPacketsForcedBySize;
+ // if we haven't generated any packets, then make a lone Flush packet without any data
+ if (needConfirm && Socket) {
+ ++ConfirmPacketsForcedBySize;
MakePacket(false);
}
@@ -376,7 +376,7 @@ namespace NActors {
}
}
}
- }
+ }
void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
if (ev->Get() == RamInQueue) {
@@ -451,57 +451,57 @@ namespace NActors {
void TInterconnectSessionTCP::StartHandshake() {
LOG_INFO_IC_SESSION("ICS15", "start handshake");
IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
- }
+ }
void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
ReestablishConnection({}, true, std::move(reason));
- }
+ }
void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose,
TDisconnectReason reason) {
- if (Socket) {
+ if (Socket) {
LOG_INFO_IC_SESSION("ICS13", "reestablish connection");
ShutdownSocket(std::move(reason)); // stop sending/receiving on socket
PendingHandshakeDoneEvent = std::move(ev);
StartHandshakeOnSessionClose = startHandshakeOnSessionClose;
- if (!ReceiverId) {
+ if (!ReceiverId) {
ReestablishConnectionExecute();
- }
+ }
}
}
void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) {
- if (ev->Sender == ReceiverId) {
- const bool wasConnected(Socket);
+ if (ev->Sender == ReceiverId) {
+ const bool wasConnected(Socket);
LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data());
ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet
- if (wasConnected) {
- // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
- // restart handshake process, closing our part of socket first
+ if (wasConnected) {
+ // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should
+ // restart handshake process, closing our part of socket first
ShutdownSocket(ev->Get()->Reason);
StartHandshake();
- } else {
+ } else {
ReestablishConnectionExecute();
- }
+ }
}
}
void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) {
- if (Socket) {
+ if (Socket) {
if (const TString& s = reason.ToString()) {
Proxy->Metrics->IncDisconnectByReason(s);
}
LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data());
Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data());
- Socket->Shutdown(SHUT_RDWR);
- Socket.Reset();
+ Socket->Shutdown(SHUT_RDWR);
+ Socket.Reset();
Proxy->Metrics->IncDisconnections();
- CloseOnIdleWatchdog.Disarm();
+ CloseOnIdleWatchdog.Disarm();
LostConnectionWatchdog.Arm(SelfId());
Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
- }
+ }
}
void TInterconnectSessionTCP::ReestablishConnectionExecute() {
@@ -512,7 +512,7 @@ namespace NActors {
StartHandshake();
} else if (ev) {
SetNewConnection(ev);
- }
+ }
}
void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
@@ -527,7 +527,7 @@ namespace NActors {
GenerateTraffic();
} else if (!ev->Cookie) {
Proxy->Metrics->IncSpuriousWriteWakeups();
- }
+ }
if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
Send(ReceiverId, ev->Release().Release(), 0, 1);
}
@@ -548,10 +548,10 @@ namespace NActors {
void TInterconnectSessionTCP::WriteData() {
ui64 written = 0;
- Y_VERIFY(Socket); // ensure that socket wasn't closed
+ Y_VERIFY(Socket); // ensure that socket wasn't closed
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
- constexpr ui32 iovLimit = 256;
+ constexpr ui32 iovLimit = 256;
#ifdef _linux_
ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
#else
@@ -561,8 +561,8 @@ namespace NActors {
maxElementsInIOV = 1;
}
- // vector of write buffers with preallocated stack space
- TStackVec<TConstIoVec, iovLimit> wbuffers;
+ // vector of write buffers with preallocated stack space
+ TStackVec<TConstIoVec, iovLimit> wbuffers;
LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
@@ -574,14 +574,14 @@ namespace NActors {
while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
- it->AppendToIoVector(wbuffers, maxElementsInIOV);
+ it->AppendToIoVector(wbuffers, maxElementsInIOV);
}
- const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
- int iovcnt = wbuffers.size();
+ const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
+ int iovcnt = wbuffers.size();
- Y_VERIFY(iovcnt > 0);
- Y_VERIFY(iovec->iov_len > 0);
+ Y_VERIFY(iovcnt > 0);
+ Y_VERIFY(iovec->iov_len > 0);
TString err;
ssize_t r = 0;
@@ -596,23 +596,23 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
- wbuffers.clear();
+ wbuffers.clear();
- if (r > 0) {
- Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
- BytesUnwritten -= r;
+ if (r > 0) {
+ Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
+ BytesUnwritten -= r;
written += r;
ui64 packets = 0;
- // advance SendQueuePos to eat all processed items
- for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
+ // advance SendQueuePos to eat all processed items
+ for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
if (!SendQueuePos->IsEmpty()) {
LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
}
- ++PacketsWrittenToSocket;
+ ++PacketsWrittenToSocket;
++packets;
LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
- }
+ }
LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
} else if (-r != EAGAIN && -r != EWOULDBLOCK) {
@@ -624,15 +624,15 @@ namespace NActors {
Proxy->Metrics->AddTotalBytesWritten(written);
}
return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
- } else {
+ } else {
// we have to do some hack for secure socket -- mark the packet as 'tried writing'
if (Params.Encryption) {
Y_VERIFY(SendQueuePos != SendQueue.end());
SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
}
- // we have received EAGAIN error code, this means that we can't issue more data until we have received
- // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
+ // we have received EAGAIN error code, this means that we can't issue more data until we have received
+ // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
ReceiveContext->WriteBlockedByFullSendBuffer = true;
WriteBlockedCycles = GetCycleCountFast();
@@ -646,7 +646,7 @@ namespace NActors {
} else {
PollerToken->Request(false, true);
}
- }
+ }
}
}
}
@@ -656,12 +656,12 @@ namespace NActors {
}
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
- if (period != TDuration::Max()) {
+ if (period != TDuration::Max()) {
const TInstant when = TActivationContext::Now() + period;
- if (when < ForcePacketTimestamp) {
- ForcePacketTimestamp = when;
+ if (when < ForcePacketTimestamp) {
+ ForcePacketTimestamp = when;
ScheduleFlush();
- }
+ }
}
}
@@ -671,7 +671,7 @@ namespace NActors {
FlushSchedule.push(ForcePacketTimestamp);
MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
++FlushEventsScheduled;
- }
+ }
}
void TInterconnectSessionTCP::HandleFlush() {
@@ -692,12 +692,12 @@ namespace NActors {
}
void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
- UnconfirmedBytes = 0;
+ ForcePacketTimestamp = TInstant::Max();
+ UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
SetForcePacketTimestamp(ping);
- }
+ }
}
void TInterconnectSessionTCP::TrimSendQueueCache() {
@@ -719,19 +719,19 @@ namespace NActors {
}
ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
- Y_VERIFY(Socket);
-
- TSendQueue::iterator packet;
- if (SendQueueCache) {
- // we have entries in cache, take one and move it to the end of SendQueue
- packet = SendQueueCache.begin();
- SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
- packet->Reuse(); // reset packet to initial state
- } else {
- // we have to allocate new packet, so just do it
+ Y_VERIFY(Socket);
+
+ TSendQueue::iterator packet;
+ if (SendQueueCache) {
+ // we have entries in cache, take one and move it to the end of SendQueue
+ packet = SendQueueCache.begin();
+ SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
+ packet->Reuse(); // reset packet to initial state
+ } else {
+ // we have to allocate new packet, so just do it
LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
packet = SendQueue.emplace(SendQueue.end(), Params);
- }
+ }
}
// update send queue position
@@ -741,7 +741,7 @@ namespace NActors {
ui64 serial = 0;
- if (data) {
+ if (data) {
// generate serial for this data packet
serial = ++OutputCounter;
@@ -749,21 +749,21 @@ namespace NActors {
Y_VERIFY(NumEventsInReadyChannels);
LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
FillSendingBuffer(*packet, serial);
- }
+ }
Y_VERIFY(!packet->IsEmpty());
InflightDataAmount += packet->GetDataSize();
Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
if (InflightDataAmount > GetTotalInflightAmountOfData()) {
Proxy->Metrics->IncInflyLimitReach();
- }
+ }
- if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
+ if (AtomicGet(ReceiveContext->ControlPacketId) == 0) {
AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast());
- AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
- }
-
- // update payload activity timer
+ AtomicSet(ReceiveContext->ControlPacketId, OutputCounter);
+ }
+
+ // update payload activity timer
LastPayloadActivityTimestamp = TActivationContext::Now();
} else if (pingMask) {
serial = *pingMask;
@@ -781,13 +781,13 @@ namespace NActors {
SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
}
}
- }
+ }
const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
packet->SetMetadata(serial, lastInputSerial);
- packet->Sign();
+ packet->Sign();
- // count number of bytes pending for write
+ // count number of bytes pending for write
ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
BytesUnwritten += packetSize;
@@ -795,15 +795,15 @@ namespace NActors {
" InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
InflightDataAmount, BytesUnwritten);
- // reset forced packet sending timestamp as we have confirmed all received data
+ // reset forced packet sending timestamp as we have confirmed all received data
ResetFlushLogic();
- ++PacketsGenerated;
+ ++PacketsGenerated;
LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
- if (!data) {
+ if (!data) {
WriteData();
- }
+ }
return packetSize;
}
@@ -814,21 +814,21 @@ namespace NActors {
Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
"%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
- LastConfirmed = confirm;
+ LastConfirmed = confirm;
- ui64 droppedDataAmount = 0;
- ui32 numDropped = 0;
+ ui64 droppedDataAmount = 0;
+ ui32 numDropped = 0;
- // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
- // making Serial <= confirm true
- TSendQueue::iterator it;
+ // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
+ // making Serial <= confirm true
+ TSendQueue::iterator it;
ui64 lastDroppedSerial = 0;
for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
if (!it->IsEmpty()) {
lastDroppedSerial = it->GetSerial();
}
droppedDataAmount += it->GetDataSize();
- ++numDropped;
+ ++numDropped;
}
SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
TrimSendQueueCache();
@@ -840,7 +840,7 @@ namespace NActors {
const ui64 limit = GetTotalInflightAmountOfData();
const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
- PacketsConfirmed += numDropped;
+ PacketsConfirmed += numDropped;
InflightDataAmount -= droppedDataAmount;
Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount);
@@ -851,7 +851,7 @@ namespace NActors {
Pool->Trim(); // send any unsent free requests
return unblockedSomething;
- }
+ }
void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
ui32 bytesGenerated = 0;
@@ -914,42 +914,42 @@ namespace NActors {
const ui32 utilization = Socket ? CalculateQueueUtilization() : 0;
if (const auto& callback = Proxy->Common->UpdateWhiteboard) {
- enum class EFlag {
- GREEN,
- YELLOW,
- ORANGE,
- RED,
- };
- EFlag flagState = EFlag::RED;
-
- if (Socket) {
- flagState = EFlag::GREEN;
-
- do {
+ enum class EFlag {
+ GREEN,
+ YELLOW,
+ ORANGE,
+ RED,
+ };
+ EFlag flagState = EFlag::RED;
+
+ if (Socket) {
+ flagState = EFlag::GREEN;
+
+ do {
auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
- if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
- flagState = EFlag::ORANGE;
- break;
- } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
- flagState = EFlag::YELLOW;
- }
-
- // check utilization
+ if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
+ flagState = EFlag::ORANGE;
+ break;
+ } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) {
+ flagState = EFlag::YELLOW;
+ }
+
+ // check utilization
if (utilization > 875000) { // 7/8
- flagState = EFlag::ORANGE;
- break;
+ flagState = EFlag::ORANGE;
+ break;
} else if (utilization > 500000) { // 1/2
- flagState = EFlag::YELLOW;
- }
- } while (false);
- }
+ flagState = EFlag::YELLOW;
+ }
+ } while (false);
+ }
callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(),
- connected,
- flagState == EFlag::GREEN,
- flagState == EFlag::YELLOW,
- flagState == EFlag::ORANGE,
- flagState == EFlag::RED,
+ connected,
+ flagState == EFlag::GREEN,
+ flagState == EFlag::YELLOW,
+ flagState == EFlag::ORANGE,
+ flagState == EFlag::RED,
TlsActivationContext->ExecutorThread.ActorSystem);
}
@@ -958,34 +958,34 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
- if (OutputStuckFlag == state)
- return;
+ void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) {
+ if (OutputStuckFlag == state)
+ return;
- if (OutputQueueUtilization.Size() == 0)
- return;
+ if (OutputQueueUtilization.Size() == 0)
+ return;
- auto& lastpair = OutputQueueUtilization.Last();
- if (state)
+ auto& lastpair = OutputQueueUtilization.Last();
+ if (state)
lastpair.first -= GetCycleCountFast();
- else
+ else
lastpair.first += GetCycleCountFast();
- OutputStuckFlag = state;
- }
+ OutputStuckFlag = state;
+ }
- void TInterconnectSessionTCP::SwitchStuckPeriod() {
+ void TInterconnectSessionTCP::SwitchStuckPeriod() {
auto now = GetCycleCountFast();
- if (OutputQueueUtilization.Size() != 0) {
- auto& lastpair = OutputQueueUtilization.Last();
- lastpair.second = now - lastpair.second;
- if (OutputStuckFlag)
- lastpair.first += now;
- }
-
- OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
+ if (OutputQueueUtilization.Size() != 0) {
+ auto& lastpair = OutputQueueUtilization.Last();
+ lastpair.second = now - lastpair.second;
+ if (OutputStuckFlag)
+ lastpair.first += now;
+ }
+
+ OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now));
if (OutputStuckFlag)
- OutputQueueUtilization.Last().first -= now;
+ OutputQueueUtilization.Last().first -= now;
}
TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const {
@@ -1029,26 +1029,26 @@ namespace NActors {
}
void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
- HTML(str) {
- DIV_CLASS("panel panel-info") {
- DIV_CLASS("panel-heading") {
- str << "Session";
- }
- DIV_CLASS("panel-body") {
- TABLE_CLASS("table") {
- TABLEHEAD() {
- TABLER() {
- TABLEH() {
- str << "Sensor";
- }
- TABLEH() {
- str << "Value";
- }
- }
+ HTML(str) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Session";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() {
+ str << "Sensor";
+ }
+ TABLEH() {
+ str << "Value";
+ }
+ }
}
- TABLEBODY() {
- TABLER() {
- TABLED() {
+ TABLEBODY() {
+ TABLER() {
+ TABLED() {
str << "Encryption";
}
TABLED() {
@@ -1111,89 +1111,89 @@ namespace NActors {
}
TABLER() {
TABLED() {
- str << "This page generated at";
- }
- TABLED() {
+ str << "This page generated at";
+ }
+ TABLED() {
str << TActivationContext::Now() << " / " << Now();
- }
- }
- TABLER() {
- TABLED() {
- str << "SelfID";
- }
- TABLED() {
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "SelfID";
+ }
+ TABLED() {
str << SelfId().ToString();
- }
- }
+ }
+ }
TABLER() {
TABLED() { str << "Frame version/Checksum"; }
TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
}
-#define MON_VAR(NAME) \
- TABLER() { \
- TABLED() { \
- str << #NAME; \
- } \
- TABLED() { \
- str << NAME; \
- } \
- }
-
- MON_VAR(Created)
- MON_VAR(NewConnectionSet)
- MON_VAR(ReceiverId)
- MON_VAR(MessagesGot)
- MON_VAR(MessagesWrittenToBuffer)
- MON_VAR(PacketsGenerated)
- MON_VAR(PacketsWrittenToSocket)
- MON_VAR(PacketsConfirmed)
- MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
- MON_VAR(ConfirmPacketsForcedBySize)
- MON_VAR(ConfirmPacketsForcedByTimeout)
-
- TABLER() {
- TABLED() {
- str << "Virtual self ID";
- }
- TABLED() {
- str << Proxy->SessionVirtualId.ToString();
- }
+#define MON_VAR(NAME) \
+ TABLER() { \
+ TABLED() { \
+ str << #NAME; \
+ } \
+ TABLED() { \
+ str << NAME; \
+ } \
+ }
+
+ MON_VAR(Created)
+ MON_VAR(NewConnectionSet)
+ MON_VAR(ReceiverId)
+ MON_VAR(MessagesGot)
+ MON_VAR(MessagesWrittenToBuffer)
+ MON_VAR(PacketsGenerated)
+ MON_VAR(PacketsWrittenToSocket)
+ MON_VAR(PacketsConfirmed)
+ MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
+ MON_VAR(ConfirmPacketsForcedBySize)
+ MON_VAR(ConfirmPacketsForcedByTimeout)
+
+ TABLER() {
+ TABLED() {
+ str << "Virtual self ID";
+ }
+ TABLED() {
+ str << Proxy->SessionVirtualId.ToString();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Virtual peer ID";
+ }
+ TABLED() {
+ str << Proxy->RemoteSessionVirtualId.ToString();
+ }
+ }
+ TABLER() {
+ TABLED() {
+ str << "Socket";
+ }
+ TABLED() {
+ str << (Socket ? i64(*Socket) : -1);
+ }
}
- TABLER() {
- TABLED() {
- str << "Virtual peer ID";
- }
- TABLED() {
- str << Proxy->RemoteSessionVirtualId.ToString();
- }
- }
- TABLER() {
- TABLED() {
- str << "Socket";
- }
- TABLED() {
- str << (Socket ? i64(*Socket) : -1);
- }
- }
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
- MON_VAR(OutputStuckFlag)
- MON_VAR(SendQueue.size())
- MON_VAR(SendQueueCache.size())
+ MON_VAR(OutputStuckFlag)
+ MON_VAR(SendQueue.size())
+ MON_VAR(SendQueueCache.size())
MON_VAR(NumEventsInReadyChannels)
- MON_VAR(TotalOutputQueueSize)
- MON_VAR(BytesUnwritten)
+ MON_VAR(TotalOutputQueueSize)
+ MON_VAR(BytesUnwritten)
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
- MON_VAR(LastHandshakeDone)
- MON_VAR(OutputCounter)
+ MON_VAR(LastInputActivityTimestamp)
+ MON_VAR(LastPayloadActivityTimestamp)
+ MON_VAR(LastHandshakeDone)
+ MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
- MON_VAR(LastConfirmed)
+ MON_VAR(LastConfirmed)
MON_VAR(FlushSchedule.size())
MON_VAR(MaxFlushSchedule)
MON_VAR(FlushEventsScheduled)
@@ -1211,11 +1211,11 @@ namespace NActors {
MON_VAR(GetPingRTT())
MON_VAR(clockSkew)
- MON_VAR(GetDeadPeerTimeout())
+ MON_VAR(GetDeadPeerTimeout())
MON_VAR(GetTotalInflightAmountOfData())
MON_VAR(GetCloseOnIdleTimeout())
MON_VAR(Subscribers.size())
- }
+ }
}
}
}
@@ -1224,5 +1224,5 @@ namespace NActors {
void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common));
- }
+ }
}