aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/actors/interconnect/interconnect_tcp_session.h
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.h')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h280
1 files changed, 140 insertions, 140 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7fc00dbcc5..1ecd829132 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -30,61 +30,61 @@
#include <unordered_map>
namespace NActors {
- class TSlowPathChecker {
- using TTraceCallback = std::function<void(double)>;
- TTraceCallback Callback;
- const NHPTimer::STime Start;
-
- public:
- TSlowPathChecker(TTraceCallback&& callback)
- : Callback(std::move(callback))
+ class TSlowPathChecker {
+ using TTraceCallback = std::function<void(double)>;
+ TTraceCallback Callback;
+ const NHPTimer::STime Start;
+
+ public:
+ TSlowPathChecker(TTraceCallback&& callback)
+ : Callback(std::move(callback))
, Start(GetCycleCountFast())
- {
- }
+ {
+ }
- ~TSlowPathChecker() {
+ ~TSlowPathChecker() {
const NHPTimer::STime end = GetCycleCountFast();
- const NHPTimer::STime elapsed = end - Start;
- if (elapsed > 1000000) {
- Callback(NHPTimer::GetSeconds(elapsed) * 1000);
- }
+ const NHPTimer::STime elapsed = end - Start;
+ if (elapsed > 1000000) {
+ Callback(NHPTimer::GetSeconds(elapsed) * 1000);
+ }
+ }
+
+ operator bool() const {
+ return false;
}
+ };
- operator bool() const {
- return false;
- }
- };
+#define LWPROBE_IF_TOO_LONG(...) \
+ if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \
+ ; \
+ else
-#define LWPROBE_IF_TOO_LONG(...) \
- if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \
- ; \
- else
-
- class TTimeLimit {
- public:
- TTimeLimit(ui64 limitInCycles)
+ class TTimeLimit {
+ public:
+ TTimeLimit(ui64 limitInCycles)
: UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles)
- {
- }
+ {
+ }
- TTimeLimit(ui64 startTS, ui64 limitInCycles)
- : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles)
- {
- }
+ TTimeLimit(ui64 startTS, ui64 limitInCycles)
+ : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles)
+ {
+ }
- bool CheckExceeded() {
+ bool CheckExceeded() {
return UpperLimit != 0 && GetCycleCountFast() > UpperLimit;
- }
+ }
- const ui64 UpperLimit;
- };
+ const ui64 UpperLimit;
+ };
- static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
+ static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10);
- static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
- static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024;
+ static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
+ static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024;
- class TInterconnectProxyTCP;
+ class TInterconnectProxyTCP;
enum class EUpdateState : ui8 {
NONE, // no updates generated by input session yet
@@ -93,22 +93,22 @@ namespace NActors {
CONFIRMING, // confirmation inflight
};
- struct TReceiveContext: public TAtomicRefCount<TReceiveContext> {
- /* All invokations to these fields should be thread-safe */
+ struct TReceiveContext: public TAtomicRefCount<TReceiveContext> {
+ /* All invokations to these fields should be thread-safe */
- ui64 ControlPacketSendTimer = 0;
- ui64 ControlPacketId = 0;
+ ui64 ControlPacketSendTimer = 0;
+ ui64 ControlPacketId = 0;
- // number of packets received by input session
- TAtomic PacketsReadFromSocket = 0;
- TAtomic DataPacketsReadFromSocket = 0;
+ // number of packets received by input session
+ TAtomic PacketsReadFromSocket = 0;
+ TAtomic DataPacketsReadFromSocket = 0;
// last processed packet by input session
std::atomic_uint64_t LastProcessedPacketSerial = 0;
static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63;
- // for hardened checks
- TAtomic NumInputSessions = 0;
+ // for hardened checks
+ TAtomic NumInputSessions = 0;
NHPTimer::STime StartTime;
@@ -160,9 +160,9 @@ namespace NActors {
ui64 GetLastProcessedPacketSerial() {
return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
}
- };
+ };
- class TInputSessionTCP
+ class TInputSessionTCP
: public TActorBootstrapped<TInputSessionTCP>
, public TInterconnectLoggingBase
{
@@ -174,14 +174,14 @@ namespace NActors {
struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {};
struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {};
- public:
- static constexpr EActivityType ActorActivityType() {
+ public:
+ static constexpr EActivityType ActorActivityType() {
return INTERCONNECT_SESSION_TCP;
- }
+ }
TInputSessionTCP(const TActorId& sessionId,
- TIntrusivePtr<NInterconnect::TStreamSocket> socket,
- TIntrusivePtr<TReceiveContext> context,
+ TIntrusivePtr<NInterconnect::TStreamSocket> socket,
+ TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common,
std::shared_ptr<IInterconnectMetrics> metrics,
ui32 nodeId,
@@ -189,8 +189,8 @@ namespace NActors {
TDuration deadPeerTimeout,
TSessionParams params);
- private:
- friend class TActorBootstrapped<TInputSessionTCP>;
+ private:
+ friend class TActorBootstrapped<TInputSessionTCP>;
void Bootstrap();
@@ -204,13 +204,13 @@ namespace NActors {
cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate)
)
- private:
+ private:
TRope IncomingData;
const TActorId SessionId;
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
TPollerToken::TPtr PollerToken;
- TIntrusivePtr<TReceiveContext> Context;
+ TIntrusivePtr<TReceiveContext> Context;
TInterconnectProxyCommon::TPtr Common;
const ui32 NodeId;
const TSessionParams Params;
@@ -235,11 +235,11 @@ namespace NActors {
THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
- ui64 ConfirmedByInput;
+ ui64 ConfirmedByInput;
std::shared_ptr<IInterconnectMetrics> Metrics;
- bool CloseInputSessionRequested = false;
+ bool CloseInputSessionRequested = false;
void CloseInputSession();
@@ -258,12 +258,12 @@ namespace NActors {
TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
- static constexpr size_t NumPreallocatedBuffers = 16;
- void PreallocateBuffers();
+ static constexpr size_t NumPreallocatedBuffers = 16;
+ void PreallocateBuffers();
- inline ui64 GetMaxCyclesPerEvent() const {
+ inline ui64 GetMaxCyclesPerEvent() const {
return DurationToCycles(TDuration::MicroSeconds(500));
- }
+ }
const TDuration DeadPeerTimeout;
TInstant LastReceiveTimestamp;
@@ -278,21 +278,21 @@ namespace NActors {
void HandlePingResponse(TDuration passed);
void HandleClock(TInstant clock);
- };
+ };
- class TInterconnectSessionTCP
+ class TInterconnectSessionTCP
: public TActor<TInterconnectSessionTCP>
, public TInterconnectLoggingBase
{
- enum {
+ enum {
EvCheckCloseOnIdle = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCheckLostConnection,
EvRam,
EvTerminate,
EvFreeItems,
- };
+ };
- struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
+ struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
struct TEvCheckLostConnection : TEventLocal<TEvCheckLostConnection, EvCheckLostConnection> {};
struct TEvRam : TEventLocal<TEvRam, EvRam> {
@@ -308,18 +308,18 @@ namespace NActors {
{}
};
- const TInstant Created;
- TInstant NewConnectionSet;
- ui64 MessagesGot = 0;
- ui64 MessagesWrittenToBuffer = 0;
- ui64 PacketsGenerated = 0;
- ui64 PacketsWrittenToSocket = 0;
- ui64 PacketsConfirmed = 0;
+ const TInstant Created;
+ TInstant NewConnectionSet;
+ ui64 MessagesGot = 0;
+ ui64 MessagesWrittenToBuffer = 0;
+ ui64 PacketsGenerated = 0;
+ ui64 PacketsWrittenToSocket = 0;
+ ui64 PacketsConfirmed = 0;
- public:
- static constexpr EActivityType ActorActivityType() {
- return INTERCONNECT_SESSION_TCP;
- }
+ public:
+ static constexpr EActivityType ActorActivityType() {
+ return INTERCONNECT_SESSION_TCP;
+ }
TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params);
~TInterconnectSessionTCP();
@@ -339,8 +339,8 @@ namespace NActors {
return ReceiveContext->ClockSkew_us;
}
- private:
- friend class TInterconnectProxyTCP;
+ private:
+ friend class TInterconnectProxyTCP;
void Handle(TEvTerminate::TPtr& ev);
void HandlePoison();
@@ -400,7 +400,7 @@ namespace NActors {
void ReestablishConnectionWithHandshake(TDisconnectReason reason);
void ReestablishConnectionExecute();
- TInterconnectProxyTCP* const Proxy;
+ TInterconnectProxyTCP* const Proxy;
// various connection settings access
TDuration GetDeadPeerTimeout() const;
@@ -418,11 +418,11 @@ namespace NActors {
void IssuePingRequest();
void Handle(TEvProcessPingRequest::TPtr ev);
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TInstant LastInputActivityTimestamp;
- TInstant LastPayloadActivityTimestamp;
- TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
+ TInstant LastInputActivityTimestamp;
+ TInstant LastPayloadActivityTimestamp;
+ TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
void OnCloseOnIdleTimerHit() {
@@ -435,26 +435,26 @@ namespace NActors {
Terminate(TDisconnectReason::LostConnection());
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const TSessionParams Params;
TMaybe<TEventHolderPool> Pool;
TMaybe<TChannelScheduler> ChannelScheduler;
- ui64 TotalOutputQueueSize;
- bool OutputStuckFlag;
- TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
+ ui64 TotalOutputQueueSize;
+ bool OutputStuckFlag;
+ TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
size_t NumEventsInReadyChannels = 0;
- void SetOutputStuckFlag(bool state);
- void SwitchStuckPeriod();
+ void SetOutputStuckFlag(bool state);
+ void SwitchStuckPeriod();
- using TSendQueue = TList<TTcpPacketOutTask>;
- TSendQueue SendQueue;
- TSendQueue SendQueueCache;
- TSendQueue::iterator SendQueuePos;
+ using TSendQueue = TList<TTcpPacketOutTask>;
+ TSendQueue SendQueue;
+ TSendQueue SendQueueCache;
+ TSendQueue::iterator SendQueuePos;
ui64 WriteBlockedCycles = 0; // start of current block period
TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
- ui64 BytesUnwritten = 0;
+ ui64 BytesUnwritten = 0;
void TrimSendQueueCache();
@@ -467,21 +467,21 @@ namespace NActors {
}
}
- ui64 OutputCounter;
+ ui64 OutputCounter;
ui64 LastSentSerial = 0;
- TInstant LastHandshakeDone;
+ TInstant LastHandshakeDone;
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
+ TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
TPollerToken::TPtr PollerToken;
ui32 SendBufferSize;
ui64 InflightDataAmount = 0;
std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers;
- // time at which we want to send confirmation packet even if there was no outgoing data
- ui64 UnconfirmedBytes = 0;
- TInstant ForcePacketTimestamp = TInstant::Max();
+ // time at which we want to send confirmation packet even if there was no outgoing data
+ ui64 UnconfirmedBytes = 0;
+ TInstant ForcePacketTimestamp = TInstant::Max();
TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
size_t MaxFlushSchedule = 0;
ui64 FlushEventsScheduled = 0;
@@ -494,71 +494,71 @@ namespace NActors {
void GenerateHttpInfo(TStringStream& str);
- TIntrusivePtr<TReceiveContext> ReceiveContext;
+ TIntrusivePtr<TReceiveContext> ReceiveContext;
TActorId ReceiverId;
TDuration Ping;
- ui64 ConfirmPacketsForcedBySize = 0;
- ui64 ConfirmPacketsForcedByTimeout = 0;
+ ui64 ConfirmPacketsForcedBySize = 0;
+ ui64 ConfirmPacketsForcedByTimeout = 0;
- ui64 LastConfirmed = 0;
+ ui64 LastConfirmed = 0;
TEvHandshakeDone::TPtr PendingHandshakeDoneEvent;
bool StartHandshakeOnSessionClose = false;
ui64 EqualizeCounter = 0;
- };
+ };
- class TInterconnectSessionKiller
- : public TActorBootstrapped<TInterconnectSessionKiller> {
- ui32 RepliesReceived = 0;
- ui32 RepliesNumber = 0;
+ class TInterconnectSessionKiller
+ : public TActorBootstrapped<TInterconnectSessionKiller> {
+ ui32 RepliesReceived = 0;
+ ui32 RepliesNumber = 0;
TActorId LargestSession = TActorId();
- ui64 MaxBufferSize = 0;
+ ui64 MaxBufferSize = 0;
TInterconnectProxyCommon::TPtr Common;
- public:
+ public:
static constexpr EActivityType ActorActivityType() {
return INTERCONNECT_SESSION_KILLER;
}
TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common)
- : Common(common)
- {
- }
+ : Common(common)
+ {
+ }
void Bootstrap() {
- auto sender = SelfId();
+ auto sender = SelfId();
const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* {
- auto ev = new TEvSessionBufferSizeRequest();
- return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
- };
+ auto ev = new TEvSessionBufferSizeRequest();
+ return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery);
+ };
RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric);
- Become(&TInterconnectSessionKiller::StateFunc);
- }
+ Become(&TInterconnectSessionKiller::StateFunc);
+ }
STRICT_STFUNC(StateFunc,
hFunc(TEvSessionBufferSizeResponse, ProcessResponse)
cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered)
)
-
+
void ProcessResponse(TEvSessionBufferSizeResponse::TPtr& ev) {
- RepliesReceived++;
- if (MaxBufferSize < ev->Get()->BufferSize) {
- MaxBufferSize = ev->Get()->BufferSize;
- LargestSession = ev->Get()->SessionID;
- }
- if (RepliesReceived == RepliesNumber) {
+ RepliesReceived++;
+ if (MaxBufferSize < ev->Get()->BufferSize) {
+ MaxBufferSize = ev->Get()->BufferSize;
+ LargestSession = ev->Get()->SessionID;
+ }
+ if (RepliesReceived == RepliesNumber) {
Send(LargestSession, new TEvents::TEvPoisonPill);
- AtomicUnlock(&Common->StartedSessionKiller);
+ AtomicUnlock(&Common->StartedSessionKiller);
PassAway();
- }
+ }
}
-
+
void ProcessUndelivered() {
- RepliesReceived++;
+ RepliesReceived++;
}
- };
+ };
void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common);