aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actorsystem.h16
-rw-r--r--library/cpp/actors/core/config.h12
-rw-r--r--library/cpp/actors/core/executor_pool.h14
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp19
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h3
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp2
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp2
-rw-r--r--library/cpp/actors/core/harmonizer.cpp65
-rw-r--r--library/cpp/actors/core/harmonizer.h1
-rw-r--r--library/cpp/actors/core/mon_stats.h9
-rw-r--r--library/cpp/actors/core/worker_context.h3
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h11
-rw-r--r--library/cpp/actors/interconnect/events_local.h15
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h78
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp88
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp7
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h28
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp30
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h12
-rw-r--r--library/cpp/actors/interconnect/watchdog_timer.h13
-rw-r--r--library/cpp/actors/util/rc_buf.h7
22 files changed, 187 insertions, 256 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 8051f5ee57..cd2cfda1bb 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -122,7 +122,21 @@ namespace NActors {
}
ui32 GetThreads(ui32 poolId) const {
- return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId);
+ auto result = GetThreadsOptional(poolId);
+ Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
+ return *result;
+ }
+
+ std::optional<ui32> GetThreadsOptional(const ui32 poolId) const {
+ if (Y_LIKELY(Executors)) {
+ if (Y_LIKELY(poolId < ExecutorsCount)) {
+ return Executors[poolId]->GetDefaultThreadCount();
+ } else {
+ return {};
+ }
+ } else {
+ return CpuManager.GetThreadsOptional(poolId);
+ }
}
};
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 0bf4b871d7..650b1f39f5 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -128,10 +128,10 @@ namespace NActors {
Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
}
- ui32 GetThreads(ui32 poolId) const {
+ std::optional<ui32> GetThreadsOptional(ui32 poolId) const {
for (const auto& p : Basic) {
if (p.PoolId == poolId) {
- return p.Threads;
+ return p.DefaultThreadCount;
}
}
for (const auto& p : IO) {
@@ -144,7 +144,13 @@ namespace NActors {
return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount;
}
}
- Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
+ return {};
+ }
+
+ ui32 GetThreads(ui32 poolId) const {
+ auto result = GetThreadsOptional(poolId);
+ Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
+ return *result;
}
};
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index f39415c7e2..c7c85e61fd 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -10,6 +10,11 @@ namespace NActors {
struct TWorkerContext;
class ISchedulerCookie;
+ struct TCpuConsumption {
+ double ConsumedUs = 0;
+ double BookedUs = 0;
+ };
+
class IExecutorPool : TNonCopyable {
public:
const ui32 PoolId;
@@ -131,14 +136,9 @@ namespace NActors {
return false;
}
- virtual double GetThreadConsumedUs(i16 threadIdx) {
- Y_UNUSED(threadIdx);
- return 0.0;
- }
-
- virtual double GetThreadBookedUs(i16 threadIdx) {
+ virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
- return 0.0;
+ return TCpuConsumption{0.0, 0.0};
}
};
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 0c984f8fb0..de04105991 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -334,6 +334,8 @@ namespace NActors {
poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000);
poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
+ poolStats.DefaultThreadCount = DefaultThreadCount;
+ poolStats.MaxThreadCount = MaxThreadCount;
if (Harmonizer) {
TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId);
poolStats.IsNeedy = stats.IsNeedy;
@@ -342,6 +344,7 @@ namespace NActors {
poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState;
poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState;
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
+ poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
}
statsCopy.resize(PoolThreads + 1);
@@ -490,24 +493,14 @@ namespace NActors {
return false;
}
- double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) {
+ TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) {
if ((ui32)threadIdx >= PoolThreads) {
- return 0;
+ return {0.0, 0.0};
}
TThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStats(stats);
- return Ts2Us(stats.ElapsedTicks);
- }
-
- double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) {
- if ((ui32)threadIdx >= PoolThreads) {
- return 0;
- }
- TThreadCtx& threadCtx = Threads[threadIdx];
- TExecutorThreadStats stats;
- threadCtx.Thread->GetCurrentStats(stats);
- return stats.CpuNs / 1000.0;
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index cd94a998f1..813f91dc9a 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -153,8 +153,7 @@ namespace NActors {
i16 GetMinThreadCount() const override;
i16 GetMaxThreadCount() const override;
bool IsThreadBeingStopped(i16 threadIdx) const override;
- double GetThreadConsumedUs(i16 threadIdx) override;
- double GetThreadBookedUs(i16 threadIdx) override;
+ TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override;
i16 GetBlockingThreadCount() const override;
i16 GetPriority() const override;
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 6361bc6662..f96f65931a 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined
+ //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
UNIT_ASSERT(stats[0].ParkedTicks > 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index 133e9c5f2a..a7c7399d73 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
//UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined
+ //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
//UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index f318d8909c..e2fd0c5f24 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -121,6 +121,7 @@ struct TPoolInfo {
TAtomic IncreasingThreadsByNeedyState = 0;
TAtomic DecreasingThreadsByStarvedState = 0;
TAtomic DecreasingThreadsByHoggishState = 0;
+ TAtomic PotentialMaxThreadCount = 0;
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
@@ -169,9 +170,10 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
void TPoolInfo::PullStats(ui64 ts) {
for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
TThreadInfo &threadInfo = ThreadInfo[threadIdx];
- threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx));
+ TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
+ threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
- threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx));
+ threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
}
}
@@ -236,7 +238,7 @@ void THarmonizer::PullStats(ui64 ts) {
}
Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
- return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+ return consumed < booked * 0.7;
}
Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
@@ -293,35 +295,43 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
}
double budget = total - Max(booked, lastSecondBooked);
+ i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
if (budget < -0.1) {
isStarvedPresent = true;
}
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt));
+ }
double overbooked = consumed - booked;
if (isStarvedPresent) {
- // last_starved_at_consumed_value = сумма по всем пулам consumed;
- // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
- // использовать вместо total
- if (beingStopped && beingStopped >= overbooked) {
- // do nothing
- } else {
- TStackVec<size_t> reorder;
- for (size_t i = 0; i < Pools.size(); ++i) {
- reorder.push_back(i);
- }
- for (ui16 poolIdx : PriorityOrder) {
- TPoolInfo &pool = Pools[poolIdx];
- i64 threadCount = pool.GetThreadCount();
- if (threadCount > pool.DefaultThreadCount) {
- pool.SetThreadCount(threadCount - 1);
- AtomicIncrement(pool.DecreasingThreadsByStarvedState);
- overbooked--;
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
- if (overbooked < 1) {
- break;
- }
- }
- }
- }
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ TStackVec<size_t> reorder;
+ for (size_t i = 0; i < Pools.size(); ++i) {
+ reorder.push_back(i);
+ }
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ while (threadCount > pool.DefaultThreadCount) {
+ pool.SetThreadCount(threadCount - 1);
+ AtomicIncrement(pool.DecreasingThreadsByStarvedState);
+ overbooked--;
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
} else {
for (size_t needyPoolIdx : needyPools) {
TPoolInfo &pool = Pools[needyPoolIdx];
@@ -422,6 +432,7 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const {
.IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
.DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
.DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
.IsNeedy = static_cast<bool>(flags & 1),
.IsStarved = static_cast<bool>(flags & 2),
.IsHoggish = static_cast<bool>(flags & 4),
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index 61f13e43ac..bc6b938fe8 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -10,6 +10,7 @@ namespace NActors {
ui64 IncreasingThreadsByNeedyState = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ i16 PotentialMaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 38629e2aa1..4c664a964a 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -65,6 +65,9 @@ namespace NActors {
ui64 DecreasingThreadsByHoggishState = 0;
i16 WrongWakenedThreadCount = 0;
i16 CurrentThreadCount = 0;
+ i16 PotentialMaxThreadCount = 0;
+ i16 DefaultThreadCount = 0;
+ i16 MaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
@@ -76,7 +79,8 @@ namespace NActors {
ui64 PreemptedEvents = 0; // Number of events experienced hard preemption
ui64 NonDeliveredEvents = 0;
ui64 EmptyMailboxActivation = 0;
- ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion)
+ ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
+ ui64 SafeElapsedTicks = 0;
ui64 WorstActivationTimeUs = 0;
NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
@@ -120,7 +124,8 @@ namespace NActors {
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
- CpuNs += RelaxedLoad(&other.CpuNs);
+ CpuUs += RelaxedLoad(&other.CpuUs);
+ SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
RelaxedStore(
&WorstActivationTimeUs,
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index 2179771fb6..c3a2947df1 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -137,7 +137,8 @@ namespace NActors {
}
void UpdateThreadTime() {
- RelaxedStore(&WorkerStats.CpuNs, ThreadCPUTime() * 1000);
+ RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks));
+ RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime());
}
#else
void GetCurrentStats(TExecutorThreadStats&) const {}
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index b1217b1d63..d80951827d 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -126,6 +126,9 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount;
NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
@@ -178,6 +181,9 @@ private:
MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
+ PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false);
+ DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false);
+ MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false);
IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
IsStarved = PoolGroup->GetCounter("IsStarved", false);
IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
@@ -211,7 +217,7 @@ private:
*NonDeliveredEvents = stats.NonDeliveredEvents;
*DestroyedActors = stats.PoolDestroyedActors;
*EmptyMailboxActivation = stats.EmptyMailboxActivation;
- *CpuMicrosec = stats.CpuNs / 1000;
+ *CpuMicrosec = stats.CpuUs;
*ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
*ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
*ActorRegistrations = stats.PoolActorRegistrations;
@@ -222,6 +228,9 @@ private:
*MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
*WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
*CurrentThreadCount = poolStats.CurrentThreadCount;
+ *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount;
+ *DefaultThreadCount = poolStats.DefaultThreadCount;
+ *MaxThreadCount = poolStats.MaxThreadCount;
*IsNeedy = poolStats.IsNeedy;
*IsStarved = poolStats.IsStarved;
*IsHoggish = poolStats.IsHoggish;
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 43f376038b..b1b8ae0c75 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -52,9 +52,6 @@ namespace NActors {
EvProcessPingRequest,
EvGetSecureSocket,
EvSecureSocket,
- HandshakeBrokerTake,
- HandshakeBrokerFree,
- HandshakeBrokerPermit,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
@@ -101,18 +98,6 @@ namespace NActors {
}
};
- struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake")
- };
-
- struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree")
- };
-
- struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit")
- };
-
struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
TEvHandshakeAsk(const TActorId& self,
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
deleted file mode 100644
index 70a7cb91dc..0000000000
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ /dev/null
@@ -1,78 +0,0 @@
-#pragma once
-
-#include <library/cpp/actors/core/actor.h>
-
-#include <deque>
-
-namespace NActors {
- static constexpr ui32 DEFAULT_INFLIGHT = 100;
-
- class THandshakeBroker : public TActor<THandshakeBroker> {
- private:
- std::deque<TActorId> Waiting;
- ui32 Capacity;
-
- void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
- if (Capacity > 0) {
- Capacity -= 1;
- Send(ev->Sender, new TEvHandshakeBrokerPermit());
- } else {
- Waiting.push_back(ev->Sender);
- }
- }
-
- void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
- Y_UNUSED(ev);
- if (Capacity == 0 && !Waiting.empty()) {
- Send(Waiting.front(), new TEvHandshakeBrokerPermit());
- Waiting.pop_front();
- } else {
- Capacity += 1;
- }
- }
-
- void PassAway() override {
- while (!Waiting.empty()) {
- Send(Waiting.front(), new TEvHandshakeBrokerPermit());
- Waiting.pop_front();
- }
- TActor::PassAway();
- }
-
- public:
- THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT)
- : TActor(&TThis::StateFunc)
- , Capacity(inflightLimit)
- {
- }
-
- static constexpr char ActorName[] = "HANDSHAKE_BROKER_ACTOR";
-
- STFUNC(StateFunc) {
- Y_UNUSED(ctx);
- switch(ev->GetTypeRewrite()) {
- hFunc(TEvHandshakeBrokerTake, Handle);
- hFunc(TEvHandshakeBrokerFree, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
- }
- }
-
- void Bootstrap() {
- Become(&TThis::StateFunc);
- };
- };
-
- inline IActor* CreateHandshakeBroker() {
- return new THandshakeBroker();
- }
-
- inline TActorId MakeHandshakeBrokerOutId() {
- char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
- }
-
- inline TActorId MakeHandshakeBrokerInId() {
- char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
- }
-};
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index a9c6b1dd11..dc651f3762 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -1,5 +1,4 @@
#include "interconnect_handshake.h"
-#include "handshake_broker.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/core/actor_coroutine.h>
@@ -97,13 +96,8 @@ namespace NActors {
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
TInstant Deadline;
- TActorId HandshakeBroker;
public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::INTERCONNECT_HANDSHAKE;
- }
-
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
: TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
@@ -119,7 +113,6 @@ namespace NActors {
Y_VERIFY(SelfVirtualId);
Y_VERIFY(SelfVirtualId.NodeId());
Y_VERIFY(PeerNodeId);
- HandshakeBroker = MakeHandshakeBrokerOutId();
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
@@ -135,7 +128,6 @@ namespace NActors {
} else {
PeerAddr.clear();
}
- HandshakeBroker = MakeHandshakeBrokerInId();
}
void UpdatePrefix() {
@@ -145,64 +137,45 @@ namespace NActors {
void Run() override {
UpdatePrefix();
- bool isBrokerActive = false;
-
- if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
- isBrokerActive = true;
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ // set up overall handshake process timer
+ TDuration timeout = Common->Settings.Handshake;
+ if (timeout == TDuration::Zero()) {
+ timeout = DEFAULT_HANDSHAKE_TIMEOUT;
}
+ timeout += ResolveTimeout * 2;
+ Deadline = Now() + timeout;
+ Schedule(Deadline, new TEvents::TEvWakeup);
try {
- // set up overall handshake process timer
- TDuration timeout = Common->Settings.Handshake;
- if (timeout == TDuration::Zero()) {
- timeout = DEFAULT_HANDSHAKE_TIMEOUT;
- }
- timeout += ResolveTimeout * 2;
- Deadline = Now() + timeout;
- Schedule(Deadline, new TEvents::TEvWakeup);
-
- try {
- if (Socket) {
- PerformIncomingHandshake();
- } else {
- PerformOutgoingHandshake();
- }
-
- // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
- if (ProgramInfo) {
- if (Params.Encryption) {
- EstablishSecureConnection();
- } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
- }
- }
- } catch (const TExHandshakeFailed&) {
- ProgramInfo.Clear();
+ if (Socket) {
+ PerformIncomingHandshake();
+ } else {
+ PerformOutgoingHandshake();
}
+ // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
if (ProgramInfo) {
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
- Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ if (Params.Encryption) {
+ EstablishSecureConnection();
+ } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
}
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
- }
- } catch (const TDtorException&) {
- throw; // we can't use actor system when handling this exception
- } catch (...) {
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
}
- throw;
+ } catch (const TExHandshakeFailed&) {
+ ProgramInfo.Clear();
}
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ if (ProgramInfo) {
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
+ Y_VERIFY(NextPacketFromPeer);
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ }
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
+
Socket.Reset();
}
@@ -1022,11 +995,12 @@ namespace NActors {
const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
TSessionParams params) {
return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket,
- std::move(peerHostName), std::move(params)));
+ std::move(peerHostName), std::move(params)), IActor::INTERCONNECT_HANDSHAKE);
}
IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) {
- return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)));
+ return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)),
+ IActor::INTERCONNECT_HANDSHAKE);
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index a8c505d94d..fdf035499f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -39,7 +39,7 @@ namespace NActors {
SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId));
Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer);
LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created");
- LastReceiveTimestamp = TActivationContext::Now();
+ LastReceiveTimestamp = TActivationContext::Monotonic();
ReceiveData();
}
@@ -437,7 +437,7 @@ namespace NActors {
}
}
- LastReceiveTimestamp = TActivationContext::Now();
+ LastReceiveTimestamp = TActivationContext::Monotonic();
return true;
}
@@ -473,7 +473,7 @@ namespace NActors {
}
void TInputSessionTCP::HandleCheckDeadPeer() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastReceiveTimestamp + DeadPeerTimeout) {
ReceiveData();
if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) {
@@ -481,7 +481,7 @@ namespace NActors {
DestroySession(TDisconnectReason::DeadPeer());
}
}
- Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer);
+ Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer);
}
void TInputSessionTCP::HandlePingResponse(TDuration passed) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 7e2d8ccb94..b4cc263a4c 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -40,7 +40,6 @@ namespace NActors {
SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId));
SwitchToInitialState();
- PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15);
LOG_INFO_IC("ICP01", "ready to work");
}
@@ -563,7 +562,7 @@ namespace NActors {
ValidateEvent(ev, "EnqueueSessionEvent");
const ui32 size = ev->GetSize();
PendingSessionEventsSize += size;
- PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev);
+ PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev);
ScheduleCleanupEventQueue();
CleanupEventQueue();
}
@@ -810,7 +809,7 @@ namespace NActors {
if (!CleanupEventQueueScheduled && PendingSessionEvents) {
// apply batching at 50 ms granularity
- Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue);
+ Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue);
CleanupEventQueueScheduled = true;
}
}
@@ -827,7 +826,7 @@ namespace NActors {
void TInterconnectProxyTCP::CleanupEventQueue() {
ICPROXY_PROFILED;
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (PendingSessionEvents) {
TPendingSessionEvent& ev = PendingSessionEvents.front();
if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 023e5bd1ee..b750e278e1 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -175,11 +175,18 @@ namespace NActors {
Become(std::forward<TArgs>(args)...);
Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state
if (CurrentStateFunc() != &TThis::PendingActivation) {
- PassAwayTimestamp = TInstant::Max();
+ PassAwayTimestamp = TMonotonic::Max();
+ } else if (DynamicPtr) {
+ PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15);
+ if (!PassAwayScheduled) {
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ {}, nullptr, 0));
+ PassAwayScheduled = true;
+ }
}
}
- TInstant PassAwayTimestamp;
+ TMonotonic PassAwayTimestamp;
bool PassAwayScheduled = false;
void SwitchToInitialState() {
@@ -189,17 +196,18 @@ namespace NActors {
" PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(),
PendingIncomingHandshakeEvents.size(), State);
SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation);
- if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
- {}, nullptr, 0));
- PassAwayScheduled = true;
- }
}
void HandlePassAwayIfNeeded() {
Y_VERIFY(PassAwayScheduled);
- if (PassAwayTimestamp != TInstant::Max()) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ if (now >= PassAwayTimestamp) {
PassAway();
+ } else if (PassAwayTimestamp != TMonotonic::Max()) {
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ {}, nullptr, 0));
+ } else {
+ PassAwayScheduled = false;
}
}
@@ -387,11 +395,11 @@ namespace NActors {
// hold all events before connection is established
struct TPendingSessionEvent {
- TInstant Deadline;
+ TMonotonic Deadline;
ui32 Size;
THolder<IEventHandle> Event;
- TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event)
+ TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event)
: Deadline(deadline)
, Size(size)
, Event(event)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index feb55a16ad..18df8e42ff 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -232,7 +232,7 @@ namespace NActors {
CloseOnIdleWatchdog.Arm(SelfId());
// reset activity timestamps
- LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic();
LOG_INFO_IC_SESSION("ICS10", "traffic start");
@@ -315,7 +315,7 @@ namespace NActors {
bool needConfirm = false;
// update activity timer for dead peer checker
- LastInputActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = TActivationContext::Monotonic();
if (msg.NumDataBytes) {
UnconfirmedBytes += msg.NumDataBytes;
@@ -326,7 +326,7 @@ namespace NActors {
}
// reset payload watchdog that controls close-on-idle behaviour
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
CloseOnIdleWatchdog.Reset();
}
@@ -654,7 +654,7 @@ namespace NActors {
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
if (period != TDuration::Max()) {
- const TInstant when = TActivationContext::Now() + period;
+ const TMonotonic when = TActivationContext::Monotonic() + period;
if (when < ForcePacketTimestamp) {
ForcePacketTimestamp = when;
ScheduleFlush();
@@ -664,7 +664,7 @@ namespace NActors {
void TInterconnectSessionTCP::ScheduleFlush() {
if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
- Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
+ Schedule(ForcePacketTimestamp, new TEvFlush);
FlushSchedule.push(ForcePacketTimestamp);
MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
++FlushEventsScheduled;
@@ -672,7 +672,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::HandleFlush() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (FlushSchedule && now >= FlushSchedule.top()) {
FlushSchedule.pop();
}
@@ -682,14 +682,14 @@ namespace NActors {
++ConfirmPacketsForcedByTimeout;
++FlushEventsProcessed;
MakePacket(false); // just generate confirmation packet if we have preconditions for this
- } else if (ForcePacketTimestamp != TInstant::Max()) {
+ } else if (ForcePacketTimestamp != TMonotonic::Max()) {
ScheduleFlush();
}
}
}
void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
+ ForcePacketTimestamp = TMonotonic::Max();
UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
@@ -761,7 +761,7 @@ namespace NActors {
}
// update payload activity timer
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
} else if (pingMask) {
serial = *pingMask;
@@ -923,7 +923,7 @@ namespace NActors {
flagState = EFlag::GREEN;
do {
- auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
+ auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp;
if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
flagState = EFlag::ORANGE;
break;
@@ -1006,7 +1006,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::IssuePingRequest() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastPingTimestamp + PingPeriodicity) {
LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
if (Socket) {
@@ -1175,6 +1175,8 @@ namespace NActors {
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
+ const TMonotonic now = TActivationContext::Monotonic();
+
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
MON_VAR(SendQueueCache.size())
@@ -1184,8 +1186,8 @@ namespace NActors {
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
+ MON_VAR(now - LastInputActivityTimestamp)
+ MON_VAR(now - LastPayloadActivityTimestamp)
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
@@ -1204,7 +1206,7 @@ namespace NActors {
clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
}
- MON_VAR(LastPingTimestamp)
+ MON_VAR(now - LastPingTimestamp)
MON_VAR(GetPingRTT())
MON_VAR(clockSkew)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 51c5bfa453..598a5c9220 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -266,7 +266,7 @@ namespace NActors {
}
const TDuration DeadPeerTimeout;
- TInstant LastReceiveTimestamp;
+ TMonotonic LastReceiveTimestamp;
void HandleCheckDeadPeer();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -413,15 +413,15 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// pinger
- TInstant LastPingTimestamp;
+ TMonotonic LastPingTimestamp;
static constexpr TDuration PingPeriodicity = TDuration::Seconds(1);
void IssuePingRequest();
void Handle(TEvProcessPingRequest::TPtr ev);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TInstant LastInputActivityTimestamp;
- TInstant LastPayloadActivityTimestamp;
+ TMonotonic LastInputActivityTimestamp;
+ TMonotonic LastPayloadActivityTimestamp;
TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
@@ -481,8 +481,8 @@ namespace NActors {
// time at which we want to send confirmation packet even if there was no outgoing data
ui64 UnconfirmedBytes = 0;
- TInstant ForcePacketTimestamp = TInstant::Max();
- TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
+ TMonotonic ForcePacketTimestamp = TMonotonic::Max();
+ TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule;
size_t MaxFlushSchedule = 0;
ui64 FlushEventsScheduled = 0;
ui64 FlushEventsProcessed = 0;
diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h
index c190105a59..fe62006e3b 100644
--- a/library/cpp/actors/interconnect/watchdog_timer.h
+++ b/library/cpp/actors/interconnect/watchdog_timer.h
@@ -8,7 +8,7 @@ namespace NActors {
const TDuration Timeout;
const TCallback Callback;
- TInstant LastResetTimestamp;
+ TMonotonic LastResetTimestamp;
TEvent* ExpectedEvent = nullptr;
ui32 Iteration = 0;
@@ -29,7 +29,7 @@ namespace NActors {
}
void Reset() {
- LastResetTimestamp = TActivationContext::Now();
+ LastResetTimestamp = TActivationContext::Monotonic();
}
void Disarm() {
@@ -38,11 +38,11 @@ namespace NActors {
void operator()(typename TEvent::TPtr& ev) {
if (ev->Get() == ExpectedEvent) {
- const TInstant now = TActivationContext::Now();
- const TInstant barrier = LastResetTimestamp + Timeout;
+ const TMonotonic now = TActivationContext::Monotonic();
+ const TMonotonic barrier = LastResetTimestamp + Timeout;
if (now < barrier) {
// the time hasn't come yet
- Schedule(barrier - now, TActorIdentity(ev->Recipient));
+ Schedule(barrier, TActorIdentity(ev->Recipient));
} else if (Iteration < NumIterationsBeforeFiring) {
// time has come, but we will still give actor a chance to process some messages and rearm timer
++Iteration;
@@ -57,7 +57,8 @@ namespace NActors {
}
private:
- void Schedule(TDuration timeout, const TActorIdentity& actor) {
+ template<typename T>
+ void Schedule(T&& timeout, const TActorIdentity& actor) {
auto ev = MakeHolder<TEvent>();
ExpectedEvent = ev.Get();
Iteration = 0;
diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h
index 1a492064ee..a2bce33fba 100644
--- a/library/cpp/actors/util/rc_buf.h
+++ b/library/cpp/actors/util/rc_buf.h
@@ -113,10 +113,11 @@ public:
private:
static int Compare(const TContiguousSpan& x, const TContiguousSpan& y) {
- if (int res = std::memcmp(x.data(), y.data(), std::min(x.size(), y.size())); res) {
- return res;
+ int res = 0;
+ if (const size_t common = std::min(x.size(), y.size())) {
+ res = std::memcmp(x.data(), y.data(), common);
}
- return x.size() - y.size();
+ return res ? res : x.size() - y.size();
}
};