diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors')
22 files changed, 187 insertions, 256 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 8051f5ee57..cd2cfda1bb 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -122,7 +122,21 @@ namespace NActors { } ui32 GetThreads(ui32 poolId) const { - return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId); + auto result = GetThreadsOptional(poolId); + Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId); + return *result; + } + + std::optional<ui32> GetThreadsOptional(const ui32 poolId) const { + if (Y_LIKELY(Executors)) { + if (Y_LIKELY(poolId < ExecutorsCount)) { + return Executors[poolId]->GetDefaultThreadCount(); + } else { + return {}; + } + } else { + return CpuManager.GetThreadsOptional(poolId); + } } }; diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 0bf4b871d7..650b1f39f5 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -128,10 +128,10 @@ namespace NActors { Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); } - ui32 GetThreads(ui32 poolId) const { + std::optional<ui32> GetThreadsOptional(ui32 poolId) const { for (const auto& p : Basic) { if (p.PoolId == poolId) { - return p.Threads; + return p.DefaultThreadCount; } } for (const auto& p : IO) { @@ -144,7 +144,13 @@ namespace NActors { return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount; } } - Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); + return {}; + } + + ui32 GetThreads(ui32 poolId) const { + auto result = GetThreadsOptional(poolId); + Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId); + return *result; } }; diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index f39415c7e2..c7c85e61fd 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -10,6 +10,11 @@ namespace NActors { struct TWorkerContext; class ISchedulerCookie; + struct TCpuConsumption { + double ConsumedUs = 0; + double BookedUs = 0; + }; + class IExecutorPool : TNonCopyable { public: const ui32 PoolId; @@ -131,14 +136,9 @@ namespace NActors { return false; } - virtual double GetThreadConsumedUs(i16 threadIdx) { - Y_UNUSED(threadIdx); - return 0.0; - } - - virtual double GetThreadBookedUs(i16 threadIdx) { + virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { Y_UNUSED(threadIdx); - return 0.0; + return TCpuConsumption{0.0, 0.0}; } }; diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 0c984f8fb0..de04105991 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -334,6 +334,8 @@ namespace NActors { poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000); poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); + poolStats.DefaultThreadCount = DefaultThreadCount; + poolStats.MaxThreadCount = MaxThreadCount; if (Harmonizer) { TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId); poolStats.IsNeedy = stats.IsNeedy; @@ -342,6 +344,7 @@ namespace NActors { poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState; poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; + poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; } statsCopy.resize(PoolThreads + 1); @@ -490,24 +493,14 @@ namespace NActors { return false; } - double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) { + TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) { if ((ui32)threadIdx >= PoolThreads) { - return 0; + return {0.0, 0.0}; } TThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStats(stats); - return Ts2Us(stats.ElapsedTicks); - } - - double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) { - if ((ui32)threadIdx >= PoolThreads) { - return 0; - } - TThreadCtx& threadCtx = Threads[threadIdx]; - TExecutorThreadStats stats; - threadCtx.Thread->GetCurrentStats(stats); - return stats.CpuNs / 1000.0; + return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index cd94a998f1..813f91dc9a 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -153,8 +153,7 @@ namespace NActors { i16 GetMinThreadCount() const override; i16 GetMaxThreadCount() const override; bool IsThreadBeingStopped(i16 threadIdx) const override; - double GetThreadConsumedUs(i16 threadIdx) override; - double GetThreadBookedUs(i16 threadIdx) override; + TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override; i16 GetBlockingThreadCount() const override; i16 GetPriority() const override; diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 6361bc6662..f96f65931a 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined + //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined UNIT_ASSERT(stats[0].ElapsedTicks > 0); UNIT_ASSERT(stats[0].ParkedTicks > 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index 133e9c5f2a..a7c7399d73 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { //UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined + //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined UNIT_ASSERT(stats[0].ElapsedTicks > 0); //UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index f318d8909c..e2fd0c5f24 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -121,6 +121,7 @@ struct TPoolInfo { TAtomic IncreasingThreadsByNeedyState = 0; TAtomic DecreasingThreadsByStarvedState = 0; TAtomic DecreasingThreadsByHoggishState = 0; + TAtomic PotentialMaxThreadCount = 0; bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); @@ -169,9 +170,10 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { void TPoolInfo::PullStats(ui64 ts) { for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { TThreadInfo &threadInfo = ThreadInfo[threadIdx]; - threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx)); + TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); - threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx)); + threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); } } @@ -236,7 +238,7 @@ void THarmonizer::PullStats(ui64 ts) { } Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { - return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; + return consumed < booked * 0.7; } Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { @@ -293,35 +295,43 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); } double budget = total - Max(booked, lastSecondBooked); + i16 budgetInt = static_cast<i16>(Max(budget, 0.0)); if (budget < -0.1) { isStarvedPresent = true; } + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt)); + } double overbooked = consumed - booked; if (isStarvedPresent) { - // last_starved_at_consumed_value = сумма по всем пулам consumed; - // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, - // использовать вместо total - if (beingStopped && beingStopped >= overbooked) { - // do nothing - } else { - TStackVec<size_t> reorder; - for (size_t i = 0; i < Pools.size(); ++i) { - reorder.push_back(i); - } - for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = Pools[poolIdx]; - i64 threadCount = pool.GetThreadCount(); - if (threadCount > pool.DefaultThreadCount) { - pool.SetThreadCount(threadCount - 1); - AtomicIncrement(pool.DecreasingThreadsByStarvedState); - overbooked--; - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); - if (overbooked < 1) { - break; - } - } - } - } + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + TStackVec<size_t> reorder; + for (size_t i = 0; i < Pools.size(); ++i) { + reorder.push_back(i); + } + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + while (threadCount > pool.DefaultThreadCount) { + pool.SetThreadCount(threadCount - 1); + AtomicIncrement(pool.DecreasingThreadsByStarvedState); + overbooked--; + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + if (overbooked < 1) { + break; + } + } + if (overbooked < 1) { + break; + } + } + } } else { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = Pools[needyPoolIdx]; @@ -422,6 +432,7 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast<bool>(flags & 1), .IsStarved = static_cast<bool>(flags & 2), .IsHoggish = static_cast<bool>(flags & 4), diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index 61f13e43ac..bc6b938fe8 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -10,6 +10,7 @@ namespace NActors { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 38629e2aa1..4c664a964a 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -65,6 +65,9 @@ namespace NActors { ui64 DecreasingThreadsByHoggishState = 0; i16 WrongWakenedThreadCount = 0; i16 CurrentThreadCount = 0; + i16 PotentialMaxThreadCount = 0; + i16 DefaultThreadCount = 0; + i16 MaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; @@ -76,7 +79,8 @@ namespace NActors { ui64 PreemptedEvents = 0; // Number of events experienced hard preemption ui64 NonDeliveredEvents = 0; ui64 EmptyMailboxActivation = 0; - ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion) + ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion) + ui64 SafeElapsedTicks = 0; ui64 WorstActivationTimeUs = 0; NHPTimer::STime ElapsedTicks = 0; NHPTimer::STime ParkedTicks = 0; @@ -120,7 +124,8 @@ namespace NActors { PreemptedEvents += RelaxedLoad(&other.PreemptedEvents); NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); - CpuNs += RelaxedLoad(&other.CpuNs); + CpuUs += RelaxedLoad(&other.CpuUs); + SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks); RelaxedStore( &WorstActivationTimeUs, std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index 2179771fb6..c3a2947df1 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -137,7 +137,8 @@ namespace NActors { } void UpdateThreadTime() { - RelaxedStore(&WorkerStats.CpuNs, ThreadCPUTime() * 1000); + RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks)); + RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime()); } #else void GetCurrentStats(TExecutorThreadStats&) const {} diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index b1217b1d63..d80951827d 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -126,6 +126,9 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount; NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount; NMonitoring::TDynamicCounters::TCounterPtr IsNeedy; NMonitoring::TDynamicCounters::TCounterPtr IsStarved; NMonitoring::TDynamicCounters::TCounterPtr IsHoggish; @@ -178,6 +181,9 @@ private: MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true); CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false); + PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false); + DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false); + MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false); IsNeedy = PoolGroup->GetCounter("IsNeedy", false); IsStarved = PoolGroup->GetCounter("IsStarved", false); IsHoggish = PoolGroup->GetCounter("IsHoggish", false); @@ -211,7 +217,7 @@ private: *NonDeliveredEvents = stats.NonDeliveredEvents; *DestroyedActors = stats.PoolDestroyedActors; *EmptyMailboxActivation = stats.EmptyMailboxActivation; - *CpuMicrosec = stats.CpuNs / 1000; + *CpuMicrosec = stats.CpuUs; *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; *ActorRegistrations = stats.PoolActorRegistrations; @@ -222,6 +228,9 @@ private: *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount; *CurrentThreadCount = poolStats.CurrentThreadCount; + *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount; + *DefaultThreadCount = poolStats.DefaultThreadCount; + *MaxThreadCount = poolStats.MaxThreadCount; *IsNeedy = poolStats.IsNeedy; *IsStarved = poolStats.IsStarved; *IsHoggish = poolStats.IsHoggish; diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 43f376038b..b1b8ae0c75 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -52,9 +52,6 @@ namespace NActors { EvProcessPingRequest, EvGetSecureSocket, EvSecureSocket, - HandshakeBrokerTake, - HandshakeBrokerFree, - HandshakeBrokerPermit, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update @@ -101,18 +98,6 @@ namespace NActors { } }; - struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") - }; - - struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") - }; - - struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") - }; - struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h deleted file mode 100644 index 70a7cb91dc..0000000000 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include <library/cpp/actors/core/actor.h> - -#include <deque> - -namespace NActors { - static constexpr ui32 DEFAULT_INFLIGHT = 100; - - class THandshakeBroker : public TActor<THandshakeBroker> { - private: - std::deque<TActorId> Waiting; - ui32 Capacity; - - void Handle(TEvHandshakeBrokerTake::TPtr &ev) { - if (Capacity > 0) { - Capacity -= 1; - Send(ev->Sender, new TEvHandshakeBrokerPermit()); - } else { - Waiting.push_back(ev->Sender); - } - } - - void Handle(TEvHandshakeBrokerFree::TPtr& ev) { - Y_UNUSED(ev); - if (Capacity == 0 && !Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); - } else { - Capacity += 1; - } - } - - void PassAway() override { - while (!Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); - } - TActor::PassAway(); - } - - public: - THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) - : TActor(&TThis::StateFunc) - , Capacity(inflightLimit) - { - } - - static constexpr char ActorName[] = "HANDSHAKE_BROKER_ACTOR"; - - STFUNC(StateFunc) { - Y_UNUSED(ctx); - switch(ev->GetTypeRewrite()) { - hFunc(TEvHandshakeBrokerTake, Handle); - hFunc(TEvHandshakeBrokerFree, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - } - } - - void Bootstrap() { - Become(&TThis::StateFunc); - }; - }; - - inline IActor* CreateHandshakeBroker() { - return new THandshakeBroker(); - } - - inline TActorId MakeHandshakeBrokerOutId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } - - inline TActorId MakeHandshakeBrokerInId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } -}; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index a9c6b1dd11..dc651f3762 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,5 +1,4 @@ #include "interconnect_handshake.h" -#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -97,13 +96,8 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; - TActorId HandshakeBroker; public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::INTERCONNECT_HANDSHAKE; - } - THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors @@ -119,7 +113,6 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); - HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -135,7 +128,6 @@ namespace NActors { } else { PeerAddr.clear(); } - HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -145,64 +137,45 @@ namespace NActors { void Run() override { UpdatePrefix(); - bool isBrokerActive = false; - - if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { - isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + // set up overall handshake process timer + TDuration timeout = Common->Settings.Handshake; + if (timeout == TDuration::Zero()) { + timeout = DEFAULT_HANDSHAKE_TIMEOUT; } + timeout += ResolveTimeout * 2; + Deadline = Now() + timeout; + Schedule(Deadline, new TEvents::TEvWakeup); try { - // set up overall handshake process timer - TDuration timeout = Common->Settings.Handshake; - if (timeout == TDuration::Zero()) { - timeout = DEFAULT_HANDSHAKE_TIMEOUT; - } - timeout += ResolveTimeout * 2; - Deadline = Now() + timeout; - Schedule(Deadline, new TEvents::TEvWakeup); - - try { - if (Socket) { - PerformIncomingHandshake(); - } else { - PerformOutgoingHandshake(); - } - - // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings - if (ProgramInfo) { - if (Params.Encryption) { - EstablishSecureConnection(); - } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); - } - } - } catch (const TExHandshakeFailed&) { - ProgramInfo.Clear(); + if (Socket) { + PerformIncomingHandshake(); + } else { + PerformOutgoingHandshake(); } + // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings if (ProgramInfo) { - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); - Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + if (Params.Encryption) { + EstablishSecureConnection(); + } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); - } - } catch (const TDtorException&) { - throw; // we can't use actor system when handling this exception - } catch (...) { - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); } - throw; + } catch (const TExHandshakeFailed&) { + ProgramInfo.Clear(); } - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + if (ProgramInfo) { + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); + Y_VERIFY(NextPacketFromPeer); + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + } + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } + Socket.Reset(); } @@ -1022,11 +995,12 @@ namespace NActors { const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) { return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket, - std::move(peerHostName), std::move(params))); + std::move(peerHostName), std::move(params)), IActor::INTERCONNECT_HANDSHAKE); } IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) { - return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket))); + return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)), + IActor::INTERCONNECT_HANDSHAKE); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index a8c505d94d..fdf035499f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -39,7 +39,7 @@ namespace NActors { SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId)); Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer); LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created"); - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); ReceiveData(); } @@ -437,7 +437,7 @@ namespace NActors { } } - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); return true; } @@ -473,7 +473,7 @@ namespace NActors { } void TInputSessionTCP::HandleCheckDeadPeer() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastReceiveTimestamp + DeadPeerTimeout) { ReceiveData(); if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) { @@ -481,7 +481,7 @@ namespace NActors { DestroySession(TDisconnectReason::DeadPeer()); } } - Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer); + Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer); } void TInputSessionTCP::HandlePingResponse(TDuration passed) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb94..b4cc263a4c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -40,7 +40,6 @@ namespace NActors { SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId)); SwitchToInitialState(); - PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15); LOG_INFO_IC("ICP01", "ready to work"); } @@ -563,7 +562,7 @@ namespace NActors { ValidateEvent(ev, "EnqueueSessionEvent"); const ui32 size = ev->GetSize(); PendingSessionEventsSize += size; - PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev); + PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev); ScheduleCleanupEventQueue(); CleanupEventQueue(); } @@ -810,7 +809,7 @@ namespace NActors { if (!CleanupEventQueueScheduled && PendingSessionEvents) { // apply batching at 50 ms granularity - Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue); + Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue); CleanupEventQueueScheduled = true; } } @@ -827,7 +826,7 @@ namespace NActors { void TInterconnectProxyTCP::CleanupEventQueue() { ICPROXY_PROFILED; - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (PendingSessionEvents) { TPendingSessionEvent& ev = PendingSessionEvents.front(); if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1ee..b750e278e1 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -175,11 +175,18 @@ namespace NActors { Become(std::forward<TArgs>(args)...); Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state if (CurrentStateFunc() != &TThis::PendingActivation) { - PassAwayTimestamp = TInstant::Max(); + PassAwayTimestamp = TMonotonic::Max(); + } else if (DynamicPtr) { + PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); + if (!PassAwayScheduled) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + PassAwayScheduled = true; + } } } - TInstant PassAwayTimestamp; + TMonotonic PassAwayTimestamp; bool PassAwayScheduled = false; void SwitchToInitialState() { @@ -189,17 +196,18 @@ namespace NActors { " PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(), PendingIncomingHandshakeEvents.size(), State); SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation); - if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), - {}, nullptr, 0)); - PassAwayScheduled = true; - } } void HandlePassAwayIfNeeded() { Y_VERIFY(PassAwayScheduled); - if (PassAwayTimestamp != TInstant::Max()) { + const TMonotonic now = TActivationContext::Monotonic(); + if (now >= PassAwayTimestamp) { PassAway(); + } else if (PassAwayTimestamp != TMonotonic::Max()) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + } else { + PassAwayScheduled = false; } } @@ -387,11 +395,11 @@ namespace NActors { // hold all events before connection is established struct TPendingSessionEvent { - TInstant Deadline; + TMonotonic Deadline; ui32 Size; THolder<IEventHandle> Event; - TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event) + TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event) : Deadline(deadline) , Size(size) , Event(event) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index feb55a16ad..18df8e42ff 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -232,7 +232,7 @@ namespace NActors { CloseOnIdleWatchdog.Arm(SelfId()); // reset activity timestamps - LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); @@ -315,7 +315,7 @@ namespace NActors { bool needConfirm = false; // update activity timer for dead peer checker - LastInputActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = TActivationContext::Monotonic(); if (msg.NumDataBytes) { UnconfirmedBytes += msg.NumDataBytes; @@ -326,7 +326,7 @@ namespace NActors { } // reset payload watchdog that controls close-on-idle behaviour - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); CloseOnIdleWatchdog.Reset(); } @@ -654,7 +654,7 @@ namespace NActors { void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { if (period != TDuration::Max()) { - const TInstant when = TActivationContext::Now() + period; + const TMonotonic when = TActivationContext::Monotonic() + period; if (when < ForcePacketTimestamp) { ForcePacketTimestamp = when; ScheduleFlush(); @@ -664,7 +664,7 @@ namespace NActors { void TInterconnectSessionTCP::ScheduleFlush() { if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { - Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); + Schedule(ForcePacketTimestamp, new TEvFlush); FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; @@ -672,7 +672,7 @@ namespace NActors { } void TInterconnectSessionTCP::HandleFlush() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } @@ -682,14 +682,14 @@ namespace NActors { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this - } else if (ForcePacketTimestamp != TInstant::Max()) { + } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } } } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); + ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { @@ -761,7 +761,7 @@ namespace NActors { } // update payload activity timer - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); } else if (pingMask) { serial = *pingMask; @@ -923,7 +923,7 @@ namespace NActors { flagState = EFlag::GREEN; do { - auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; + auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp; if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { flagState = EFlag::ORANGE; break; @@ -1006,7 +1006,7 @@ namespace NActors { } void TInterconnectSessionTCP::IssuePingRequest() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastPingTimestamp + PingPeriodicity) { LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); if (Socket) { @@ -1175,6 +1175,8 @@ namespace NActors { ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; + const TMonotonic now = TActivationContext::Monotonic(); + MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) MON_VAR(SendQueueCache.size()) @@ -1184,8 +1186,8 @@ namespace NActors { MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(now - LastInputActivityTimestamp) + MON_VAR(now - LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) MON_VAR(LastSentSerial) @@ -1204,7 +1206,7 @@ namespace NActors { clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); } - MON_VAR(LastPingTimestamp) + MON_VAR(now - LastPingTimestamp) MON_VAR(GetPingRTT()) MON_VAR(clockSkew) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 51c5bfa453..598a5c9220 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -266,7 +266,7 @@ namespace NActors { } const TDuration DeadPeerTimeout; - TInstant LastReceiveTimestamp; + TMonotonic LastReceiveTimestamp; void HandleCheckDeadPeer(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -413,15 +413,15 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // pinger - TInstant LastPingTimestamp; + TMonotonic LastPingTimestamp; static constexpr TDuration PingPeriodicity = TDuration::Seconds(1); void IssuePingRequest(); void Handle(TEvProcessPingRequest::TPtr ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TInstant LastInputActivityTimestamp; - TInstant LastPayloadActivityTimestamp; + TMonotonic LastInputActivityTimestamp; + TMonotonic LastPayloadActivityTimestamp; TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog; @@ -481,8 +481,8 @@ namespace NActors { // time at which we want to send confirmation packet even if there was no outgoing data ui64 UnconfirmedBytes = 0; - TInstant ForcePacketTimestamp = TInstant::Max(); - TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule; + TMonotonic ForcePacketTimestamp = TMonotonic::Max(); + TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule; size_t MaxFlushSchedule = 0; ui64 FlushEventsScheduled = 0; ui64 FlushEventsProcessed = 0; diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h index c190105a59..fe62006e3b 100644 --- a/library/cpp/actors/interconnect/watchdog_timer.h +++ b/library/cpp/actors/interconnect/watchdog_timer.h @@ -8,7 +8,7 @@ namespace NActors { const TDuration Timeout; const TCallback Callback; - TInstant LastResetTimestamp; + TMonotonic LastResetTimestamp; TEvent* ExpectedEvent = nullptr; ui32 Iteration = 0; @@ -29,7 +29,7 @@ namespace NActors { } void Reset() { - LastResetTimestamp = TActivationContext::Now(); + LastResetTimestamp = TActivationContext::Monotonic(); } void Disarm() { @@ -38,11 +38,11 @@ namespace NActors { void operator()(typename TEvent::TPtr& ev) { if (ev->Get() == ExpectedEvent) { - const TInstant now = TActivationContext::Now(); - const TInstant barrier = LastResetTimestamp + Timeout; + const TMonotonic now = TActivationContext::Monotonic(); + const TMonotonic barrier = LastResetTimestamp + Timeout; if (now < barrier) { // the time hasn't come yet - Schedule(barrier - now, TActorIdentity(ev->Recipient)); + Schedule(barrier, TActorIdentity(ev->Recipient)); } else if (Iteration < NumIterationsBeforeFiring) { // time has come, but we will still give actor a chance to process some messages and rearm timer ++Iteration; @@ -57,7 +57,8 @@ namespace NActors { } private: - void Schedule(TDuration timeout, const TActorIdentity& actor) { + template<typename T> + void Schedule(T&& timeout, const TActorIdentity& actor) { auto ev = MakeHolder<TEvent>(); ExpectedEvent = ev.Get(); Iteration = 0; diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h index 1a492064ee..a2bce33fba 100644 --- a/library/cpp/actors/util/rc_buf.h +++ b/library/cpp/actors/util/rc_buf.h @@ -113,10 +113,11 @@ public: private: static int Compare(const TContiguousSpan& x, const TContiguousSpan& y) { - if (int res = std::memcmp(x.data(), y.data(), std::min(x.size(), y.size())); res) { - return res; + int res = 0; + if (const size_t common = std::min(x.size(), y.size())) { + res = std::memcmp(x.data(), y.data(), common); } - return x.size() - y.size(); + return res ? res : x.size() - y.size(); } }; |