diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-02-09 11:44:35 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-02-09 11:46:17 +0300 |
commit | b0967c30d3706b650b679fe119b6bd7b0924d328 (patch) | |
tree | 25579dfda238c2cc5b00324878303b3a05d09f45 /library/cpp/actors/util | |
parent | 9b78acb9998e4a817a21fe60443c7c5d6a06b947 (diff) | |
download | ydb-470a39568672a2a0d7a4476a228e3ca3bfe4fd8a.tar.gz |
Ydb stable 22-5-1022.5.10stable-22-5
x-stable-origin-commit: f696baac1a4b8d48eb52b52b35930eef6d0eab42
Diffstat (limited to 'library/cpp/actors/util')
-rw-r--r-- | library/cpp/actors/util/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/util/cpu_load_log.h | 227 | ||||
-rw-r--r-- | library/cpp/actors/util/cpu_load_log_ut.cpp | 275 | ||||
-rw-r--r-- | library/cpp/actors/util/thread_load_log.h | 363 | ||||
-rw-r--r-- | library/cpp/actors/util/thread_load_log_ut.cpp | 966 |
5 files changed, 1832 insertions, 0 deletions
diff --git a/library/cpp/actors/util/CMakeLists.txt b/library/cpp/actors/util/CMakeLists.txt index 40d958d75e..233e1fe0fc 100644 --- a/library/cpp/actors/util/CMakeLists.txt +++ b/library/cpp/actors/util/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-util PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + library-cpp-pop_count ) target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp diff --git a/library/cpp/actors/util/cpu_load_log.h b/library/cpp/actors/util/cpu_load_log.h new file mode 100644 index 0000000000..e4ae612246 --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log.h @@ -0,0 +1,227 @@ +#pragma once + +#include "defs.h" +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/pop_count/popcount.h> + +static constexpr ui64 BitDurationNs = 131'072; // A power of 2 + +template <ui64 DataSize> +struct TCpuLoadLog { + static constexpr ui64 BitsSize = DataSize * 64; + TAtomic LastTimeNs = 0; + ui64 Data[DataSize]; + + TCpuLoadLog() { + LastTimeNs = 0; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + TCpuLoadLog(ui64 timeNs) { + LastTimeNs = timeNs; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + void RegisterBusyPeriod(ui64 timeNs) { + RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs)); + } + + template <bool ModifyLastTime> + void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) { + timeNs |= 1ull; + if (timeNs < lastTimeNs) { + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + const ui64 lastIdx = timeNs / BitDurationNs; + const ui64 curIdx = lastTimeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 firstBitIdx = curIdx % 64; + const ui64 lastElementIdx = lastIdx / 64; + const ui64 lastBitIdx = lastIdx % 64; + if (firstElementIdx == lastElementIdx) { + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx)); + const ui64 newValue = prevValue | bits; + AtomicSet(Data[firstElementIdx % DataSize], newValue); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + // process the first element + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = ((~0ull) << firstBitIdx); + const ui64 newValue = (prevValue | bits); + AtomicSet(Data[firstElementIdx % DataSize], newValue); + ++firstElementIdx; + // process the fully filled elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + for (ui64 i = 0; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } + // process the last element + const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx)); + AtomicSet(Data[lastOffset], newValue2); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + } + + void RegisterIdlePeriod(ui64 timeNs) { + timeNs &= ~1ull; + ui64 lastTimeNs = AtomicGet(LastTimeNs); + if (timeNs < lastTimeNs) { + // Fast check first, slower chec later + if ((timeNs | 1ull) < lastTimeNs) { + // Time goes back, dont panic, just mark the whole array 'busy' + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + AtomicSet(LastTimeNs, timeNs); + return; + } + } + const ui64 curIdx = lastTimeNs / BitDurationNs; + const ui64 lastIdx = timeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 lastElementIdx = lastIdx / 64; + if (firstElementIdx >= lastElementIdx) { + AtomicSet(LastTimeNs, timeNs); + return; + } + // process the first partially filled element + ++firstElementIdx; + // process all other elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], 0); + } + for (ui64 i = 0; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } + AtomicSet(LastTimeNs, timeNs); + } +}; + +template <ui64 DataSize> +struct TMinusOneCpuEstimator { + static constexpr ui64 BitsSize = DataSize * 64; + ui64 BeginDelayIdx; + ui64 EndDelayIdx; + ui64 Idle; + ui64 Delay[BitsSize]; + + ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) { + Y_VERIFY(logCount > 0); + ui64 endTimeNs = timeNs; + + ui64 lastTimeNs = timeNs; + for (i64 log_idx = 0; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->LastTimeNs); + if ((x & 1) == 1) { + lastTimeNs = Min(lastTimeNs, x); + } else { + logs[log_idx]->template RegisterBusyPeriod<false>(endTimeNs, x); + } + } + const ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0; + + ui64 beginIdx = beginTimeNs / BitDurationNs; + ui64 lastIdx = lastTimeNs / BitDurationNs; + ui64 beginElementIdx = beginIdx / 64; + ui64 lastElementIdx = lastIdx / 64; + + BeginDelayIdx = 0; + EndDelayIdx = 0; + Idle = 0; + ui64 maxDelay = 0; + ui64 bucket = 0; + for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) { + ui64 i = idx % DataSize; + ui64 input = AtomicGet(logs[0]->Data[i]); + ui64 all_busy = ~0ull; + for (i64 log_idx = 1; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->Data[i]); + all_busy &= x; + } + if (!input) { + if (!bucket) { + Idle += 64 - PopCount(all_busy); + continue; + } + } + for (i64 bit_idx = 0; bit_idx < 64; ++bit_idx) { + ui64 x = (1ull << bit_idx); + if (all_busy & x) { + if (input & x) { + // Push into the queue + bucket++; + Delay[EndDelayIdx] = EndDelayIdx; + ++EndDelayIdx; + } else { + // All busy + } + } else { + if (input & x) { + // Move success + } else { + if (bucket) { + // Remove from the queue + bucket--; + ui64 stored = Delay[BeginDelayIdx]; + ++BeginDelayIdx; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "bit_idx: " << bit_idx << " stored: " << stored << " delay: " << delay << Endl; + } else { + Idle++; + } + } + } + } + } + if (bucket) { + ui64 stored = Delay[BeginDelayIdx]; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "last stored: " << stored << " delay: " << delay << Endl; + } + return maxDelay * BitDurationNs; + } +}; + diff --git a/library/cpp/actors/util/cpu_load_log_ut.cpp b/library/cpp/actors/util/cpu_load_log_ut.cpp new file mode 100644 index 0000000000..7109123c6e --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log_ut.cpp @@ -0,0 +1,275 @@ +#include "cpu_load_log.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/sanitizers.h> +#include <util/system/thread.h> + +Y_UNIT_TEST_SUITE(CpuLoadLog) { + + TString PrintBits(ui64 x) { + TStringStream str; + for (ui64 i = 0; i < 64; ++i) { + if (x & (1ull << i)) { + str << "1"; + } else { + str << "0"; + } + } + return str.Str(); + } + + Y_UNIT_TEST(FillAll) { + TCpuLoadLog<5> log(100*BitDurationNs); + log.RegisterBusyPeriod(101*BitDurationNs); + log.RegisterBusyPeriod(163*BitDurationNs); + log.RegisterBusyPeriod(164*BitDurationNs); + log.RegisterBusyPeriod(165*BitDurationNs); + log.RegisterBusyPeriod(331*BitDurationNs); + log.RegisterBusyPeriod(340*BitDurationNs); + log.RegisterBusyPeriod(420*BitDurationNs); + log.RegisterBusyPeriod(511*BitDurationNs); + //for (ui64 i = 0; i < 5; ++i) { + // Cerr << "i: " << i << " bits: " << PrintBits(log.Data[i]) << Endl; + //} + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_C((ui64(log.Data[i]) == ~ui64(0)), "Unequal at " << i << "\n got: " << PrintBits(log.Data[i]) + << "\n expected: " << PrintBits(~ui64(0))); + } + } + + Y_UNIT_TEST(PartialFill) { + TCpuLoadLog<5> log(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs/2); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterIdlePeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterBusyPeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1011ull)); + log.RegisterBusyPeriod(63*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(2*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod((192+5*64-1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + log.RegisterIdlePeriod((192+15*64)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + } + + Y_UNIT_TEST(Estimator) { + TCpuLoadLog<5> *log[10]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + log[0]->RegisterIdlePeriod((5*64-2)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[0]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[4]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + log[1]->RegisterIdlePeriod((5*64-2+1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[0]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[4]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64)*BitDurationNs-1, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 1); + + value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64+10)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 12); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator2) { + TCpuLoadLog<5> *log[2]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[i]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + } + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[i]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + } + + log[0]->Data[2] = ~0ull; + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64-1)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 32); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator3) { + TCpuLoadLog<5> *log[3]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + log[2] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=8) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[1]->RegisterIdlePeriod(i*BitDurationNs); + log[1]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[2]->RegisterIdlePeriod(i*BitDurationNs); + log[2]->RegisterBusyPeriod((i+3)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + for (ui64 n = 0; n < 3; ++n) { + UNIT_ASSERT_VALUES_EQUAL_C(PrintBits(log[n]->Data[i]), + PrintBits(0b0000111100001111000011110000111100001111000011110000111100001111ull), + " i: " << i << " n: " << n); + } + } + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 3, (5*64-5)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 4); + + delete log[0]; + delete log[1]; + delete log[2]; + } + /* + class TWorkerThread : public ISimpleThread { + private: + std::function<void()> Func; + double Time = 0.0; + + public: + TWorkerThread(std::function<void()> func) + : Func(std::move(func)) + { } + + double GetTime() const { + return Time; + } + + static THolder<TWorkerThread> Spawn(std::function<void()> func) { + THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func)); + thread->Start(); + return thread; + } + + private: + void* ThreadProc() noexcept override { + THPTimer timer; + Func(); + Time = timer.Passed(); + return nullptr; + } + }; + + void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) { + // Concurrency factor 4 is up to 16 threads + + auto workerFunc = [&](size_t threadIndex) { + }; + + TVector<THolder<TWorkerThread>> workers(threads); + for (size_t i = 0; i < threads; ++i) { + workers[i] = TWorkerThread::Spawn([workerFunc, i]() { + workerFunc(i); + }); + } + + double maxTime = 0; + for (size_t i = 0; i < threads; ++i) { + workers[i]->Join(); + maxTime = Max(maxTime, workers[i]->GetTime()); + } + + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + + Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl; + } + + void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) { + for (size_t i = 0; i < 3; ++i) { + DoConcurrentPushPop(threads, perThreadCount); + } + } + + static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000); + + Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); } + */ +} diff --git a/library/cpp/actors/util/thread_load_log.h b/library/cpp/actors/util/thread_load_log.h new file mode 100644 index 0000000000..b4b34d47bb --- /dev/null +++ b/library/cpp/actors/util/thread_load_log.h @@ -0,0 +1,363 @@ +#pragma once + +#include "defs.h" + +#include <util/system/types.h> + +#include <type_traits> +#include <algorithm> +#include <atomic> +#include <limits> +#include <queue> + +template <ui64 TIME_SLOT_COUNT, ui64 TIME_SLOT_LENGTH_NS = 131'072, typename Type = std::uint8_t> +class TThreadLoad { +public: + using TimeSlotType = Type; + +private: + static constexpr auto TIME_SLOT_MAX_VALUE = std::numeric_limits<TimeSlotType>::max(); + static constexpr ui64 TIME_SLOT_PART_COUNT = TIME_SLOT_MAX_VALUE + 1; + static constexpr auto TIME_SLOT_PART_LENGTH_NS = TIME_SLOT_LENGTH_NS / TIME_SLOT_PART_COUNT; + + template <typename T> + static void AtomicAddBound(std::atomic<T>& val, i64 inc) { + if (inc == 0) { + return; + } + + auto newVal = val.load(); + auto oldVal = newVal; + + do { + static constexpr auto MAX_VALUE = std::numeric_limits<T>::max(); + + if (oldVal >= MAX_VALUE) { + return; + } + newVal = std::min<i64>(MAX_VALUE, static_cast<i64>(oldVal) + inc); + } while (!val.compare_exchange_weak(oldVal, newVal)); + } + + template <typename T> + static void AtomicSubBound(std::atomic<T>& val, i64 sub) { + if (sub == 0) { + return; + } + + auto newVal = val.load(); + auto oldVal = newVal; + + do { + if (oldVal == 0) { + return; + } + newVal = std::max<i64>(0, static_cast<i64>(oldVal) - sub); + } while (!val.compare_exchange_weak(oldVal, newVal)); + } + + void UpdateCompleteTimeSlots(ui64 firstSlotNumber, ui64 lastSlotNumber, TimeSlotType timeSlotValue) { + ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT; + ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT; + + const ui64 firstTimeSlotsPass = firstSlotNumber / TIME_SLOT_COUNT; + const ui64 lastTimeSlotsPass = lastSlotNumber / TIME_SLOT_COUNT; + + if (firstTimeSlotsPass == lastTimeSlotsPass) { + // first and last time slots are in the same pass + for (auto slotNumber = firstSlotNumber + 1; slotNumber < lastSlotNumber; ++slotNumber) { + auto slotIndex = slotNumber % TIME_SLOT_COUNT; + TimeSlots[slotIndex] = timeSlotValue; + } + } else if (firstTimeSlotsPass + 1 == lastTimeSlotsPass) { + for (auto slotIndex = (firstSlotNumber + 1) % TIME_SLOT_COUNT; firstSlotIndex < slotIndex && slotIndex < TIME_SLOT_COUNT; ++slotIndex) { + TimeSlots[slotIndex] = timeSlotValue; + } + for (auto slotIndex = 0u; slotIndex < lastSlotIndex; ++slotIndex) { + TimeSlots[slotIndex] = timeSlotValue; + } + } else { + for (auto slotIndex = 0u; slotIndex < TIME_SLOT_COUNT; ++slotIndex) { + TimeSlots[slotIndex] = timeSlotValue; + } + } + } + +public: + std::atomic<ui64> LastTimeNs; + std::atomic<TimeSlotType> TimeSlots[TIME_SLOT_COUNT]; + std::atomic<bool> LastRegisteredPeriodIsBusy = false; + + explicit TThreadLoad(ui64 timeNs = 0) { + static_assert(std::is_unsigned<TimeSlotType>::value); + + LastTimeNs = timeNs; + for (size_t i = 0; i < TIME_SLOT_COUNT; ++i) { + TimeSlots[i] = 0; + } + } + + static constexpr auto GetTimeSlotCount() { + return TIME_SLOT_COUNT; + } + + static constexpr auto GetTimeSlotLengthNs() { + return TIME_SLOT_LENGTH_NS; + } + + static constexpr auto GetTimeSlotPartLengthNs() { + return TIME_SLOT_PART_LENGTH_NS; + } + + static constexpr auto GetTimeSlotPartCount() { + return TIME_SLOT_PART_COUNT; + } + + static constexpr auto GetTimeSlotMaxValue() { + return TIME_SLOT_MAX_VALUE; + } + + static constexpr auto GetTimeWindowLengthNs() { + return TIME_SLOT_COUNT * TIME_SLOT_LENGTH_NS; + } + + void RegisterBusyPeriod(ui64 timeNs) { + RegisterBusyPeriod<true>(timeNs, LastTimeNs.load()); + } + + template <bool ModifyLastTime> + void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) { + LastRegisteredPeriodIsBusy = true; + + if (timeNs < lastTimeNs) { + // when time goes back, mark all time slots as 'free' + for (size_t i = 0u; i < TIME_SLOT_COUNT; ++i) { + TimeSlots[i] = 0; + } + + if (ModifyLastTime) { + LastTimeNs = timeNs; + } + + return; + } + + // lastTimeNs <= timeNs + ui64 firstSlotNumber = lastTimeNs / TIME_SLOT_LENGTH_NS; + ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT; + ui64 lastSlotNumber = timeNs / TIME_SLOT_LENGTH_NS; + ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT; + + if (firstSlotNumber == lastSlotNumber) { + ui32 slotLengthNs = timeNs - lastTimeNs; + ui32 slotPartsCount = (slotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + AtomicAddBound(TimeSlots[firstSlotIndex], slotPartsCount); + + if (ModifyLastTime) { + LastTimeNs = timeNs; + } + return; + } + + ui32 firstSlotLengthNs = TIME_SLOT_LENGTH_NS - (lastTimeNs % TIME_SLOT_LENGTH_NS); + ui32 firstSlotPartsCount = (firstSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + ui32 lastSlotLengthNs = timeNs % TIME_SLOT_LENGTH_NS; + ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + + // process first time slot + AtomicAddBound(TimeSlots[firstSlotIndex], firstSlotPartsCount); + + // process complete time slots + UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, TIME_SLOT_MAX_VALUE); + + // process last time slot + AtomicAddBound(TimeSlots[lastSlotIndex], lastSlotPartsCount); + + if (ModifyLastTime) { + LastTimeNs = timeNs; + } + } + + void RegisterIdlePeriod(ui64 timeNs) { + LastRegisteredPeriodIsBusy = false; + + ui64 lastTimeNs = LastTimeNs.load(); + if (timeNs < lastTimeNs) { + // when time goes back, mark all time slots as 'busy' + for (size_t i = 0u; i < TIME_SLOT_COUNT; ++i) { + TimeSlots[i] = TIME_SLOT_MAX_VALUE; + } + LastTimeNs = timeNs; + return; + } + + // lastTimeNs <= timeNs + ui64 firstSlotNumber = lastTimeNs / TIME_SLOT_LENGTH_NS; + ui32 firstSlotIndex = firstSlotNumber % TIME_SLOT_COUNT; + ui64 lastSlotNumber = timeNs / TIME_SLOT_LENGTH_NS; + ui32 lastSlotIndex = lastSlotNumber % TIME_SLOT_COUNT; + + if (firstSlotNumber == lastSlotNumber) { + ui32 slotLengthNs = timeNs - lastTimeNs; + ui32 slotPartsCount = slotLengthNs / TIME_SLOT_PART_LENGTH_NS; + + AtomicSubBound(TimeSlots[firstSlotIndex], slotPartsCount); + + LastTimeNs = timeNs; + return; + } + + ui32 firstSlotLengthNs = TIME_SLOT_LENGTH_NS - (lastTimeNs % TIME_SLOT_LENGTH_NS); + ui32 firstSlotPartsCount = (firstSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + ui32 lastSlotLengthNs = timeNs % TIME_SLOT_LENGTH_NS; + ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; + + // process first time slot + AtomicSubBound(TimeSlots[firstSlotIndex], firstSlotPartsCount); + + // process complete time slots + UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, 0); + + // process last time slot + AtomicSubBound(TimeSlots[lastSlotIndex], lastSlotPartsCount); + + LastTimeNs = timeNs; + } +}; + +class TMinusOneThreadEstimator { +private: + template <typename T, int MaxSize> + class TArrayQueue { + public: + bool empty() const { + return FrontIndex == -1; + } + + bool full() const { + return (RearIndex + 1) % MaxSize == FrontIndex; + } + + T& front() { + return Data[FrontIndex]; + } + + bool push(T &&t) { + if (full()) { + return false; + } + + if (FrontIndex == -1) { + FrontIndex = 0; + } + + RearIndex = (RearIndex + 1) % MaxSize; + Data[RearIndex] = std::move(t); + return true; + } + + bool pop() { + if (empty()) { + return false; + } + + if (FrontIndex == RearIndex) { + FrontIndex = RearIndex = -1; + } else { + FrontIndex = (FrontIndex + 1) % MaxSize; + } + + return true; + } + + private: + int FrontIndex = -1; + int RearIndex = -1; + T Data[MaxSize]; + }; + +public: + template <typename T> + ui64 MaxLatencyIncreaseWithOneLessCpu(T **threadLoads, ui32 threadCount, ui64 timeNs, ui64 periodNs) { + Y_VERIFY(threadCount > 0); + + struct TTimeSlotData { + typename T::TimeSlotType Load; + ui64 Index; + }; + + ui64 lastTimeNs = timeNs; + for (auto threadIndex = 0u; threadIndex < threadCount; ++threadIndex) { + if (threadLoads[threadIndex]->LastRegisteredPeriodIsBusy.load()) { + lastTimeNs = std::min(lastTimeNs, threadLoads[threadIndex]->LastTimeNs.load()); + } else { + // make interval [lastTimeNs, timeNs] 'busy' + threadLoads[threadIndex]->template RegisterBusyPeriod<false>(timeNs, threadLoads[threadIndex]->LastTimeNs.load()); + } + } + + periodNs = std::min(T::GetTimeWindowLengthNs(), periodNs); + + ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0; + + ui64 firstSlotNumber = beginTimeNs / T::GetTimeSlotLengthNs(); + ui64 lastSlotNumber = (lastTimeNs + T::GetTimeSlotLengthNs() - 1) / T::GetTimeSlotLengthNs(); + + ui64 maxTimeSlotShiftCount = 0u; + TArrayQueue<TTimeSlotData, T::GetTimeSlotCount()> firstThreadLoadDataQueue; + + for (auto slotNumber = firstSlotNumber; slotNumber < lastSlotNumber; ++slotNumber) { + ui64 slotIndex = slotNumber % T::GetTimeSlotCount(); + + typename T::TimeSlotType firstThreadTimeSlotValue = threadLoads[0]->TimeSlots[slotIndex].load(); + + // distribute previous load of the first thread by other threads + auto foundIdleThread = false; + + for (auto threadIndex = 1u; threadIndex < threadCount; ++threadIndex) { + typename T::TimeSlotType thisThreadAvailableTimeSlotLoad = threadLoads[threadIndex]->GetTimeSlotMaxValue() - threadLoads[threadIndex]->TimeSlots[slotIndex].load(); + + while (!firstThreadLoadDataQueue.empty() && thisThreadAvailableTimeSlotLoad > 0) { + auto& firstThreadLoadData = firstThreadLoadDataQueue.front(); + + auto distributedLoad = std::min(thisThreadAvailableTimeSlotLoad, firstThreadLoadData.Load); + + thisThreadAvailableTimeSlotLoad -= distributedLoad; + firstThreadLoadData.Load -= distributedLoad; + + if (firstThreadLoadData.Load == 0) { + auto timeSlotShiftCount = slotIndex - firstThreadLoadData.Index; + maxTimeSlotShiftCount = std::max(maxTimeSlotShiftCount, timeSlotShiftCount); + auto res = firstThreadLoadDataQueue.pop(); + Y_VERIFY(res); + } + } + + if (thisThreadAvailableTimeSlotLoad == threadLoads[threadIndex]->GetTimeSlotMaxValue()) { + foundIdleThread = true; + } + } + + // distribute current load of the first thread by other threads + if (firstThreadTimeSlotValue > 0) { + if (foundIdleThread) { + // The current load of the first thead can be + // moved to the idle thread so there is nothing to do + } else { + // The current load of the first thread can be later + // processed by the following time slots of other threads + auto res = firstThreadLoadDataQueue.push({firstThreadTimeSlotValue, slotIndex}); + Y_VERIFY(res); + } + } + } + + if (!firstThreadLoadDataQueue.empty()) { + const auto& timeSlotData = firstThreadLoadDataQueue.front(); + auto timeSlotShiftCount = T::GetTimeSlotCount() - timeSlotData.Index; + maxTimeSlotShiftCount = std::max(maxTimeSlotShiftCount, timeSlotShiftCount); + } + + return maxTimeSlotShiftCount * T::GetTimeSlotLengthNs(); + } +}; diff --git a/library/cpp/actors/util/thread_load_log_ut.cpp b/library/cpp/actors/util/thread_load_log_ut.cpp new file mode 100644 index 0000000000..20e776cff6 --- /dev/null +++ b/library/cpp/actors/util/thread_load_log_ut.cpp @@ -0,0 +1,966 @@ +#include "thread_load_log.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/thread.h> +#include <util/system/types.h> +#include <util/system/sanitizers.h> + +#include <limits> + +Y_UNIT_TEST_SUITE(ThreadLoadLog) { + + Y_UNIT_TEST(TThreadLoad8BitSlotType) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint8_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeWindowLengthNs(), timeWindowLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotLengthNs(), timeSlotLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotCount(), timeSlotCount); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotMaxValue(), std::numeric_limits<TSlotType>::max()); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<TSlotType>::max() + 1); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartLengthNs(), T::GetTimeSlotLengthNs() / T::GetTimeSlotPartCount()); + } + + Y_UNIT_TEST(TThreadLoad16BitSlotType) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint16_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeWindowLengthNs(), timeWindowLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotLengthNs(), timeSlotLengthNs); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotCount(), timeSlotCount); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotMaxValue(), std::numeric_limits<TSlotType>::max()); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<TSlotType>::max() + 1); + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartLengthNs(), T::GetTimeSlotLengthNs() / T::GetTimeSlotPartCount()); + } + + Y_UNIT_TEST(TThreadLoad8BitSlotTypeWindowBusy) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint8_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + T threadLoad; + threadLoad.RegisterBusyPeriod(T::GetTimeWindowLengthNs()); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), T::GetTimeWindowLengthNs()); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), T::GetTimeSlotMaxValue()); + } + } + + Y_UNIT_TEST(TThreadLoad16BitSlotTypeWindowBusy) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using TSlotType = std::uint16_t; + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, TSlotType>; + + T threadLoad; + threadLoad.RegisterBusyPeriod(T::GetTimeWindowLengthNs()); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), T::GetTimeWindowLengthNs()); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), T::GetTimeSlotMaxValue()); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot1) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot2) { + using T = TThreadLoad<38400>; + + ui32 startNs = 2 * T::GetTimeSlotPartLengthNs(); + T threadLoad(startNs); + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot3) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTimeSlot4) { + using T = TThreadLoad<38400>; + + ui32 startNs = 2 * T::GetTimeSlotPartLengthNs(); + T threadLoad(startNs); + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), (timeNs - startNs) / T::GetTimeSlotPartLengthNs()); + + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTwoTimeSlots1) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstTwoTimeSlots2) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots1) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots2) { + TThreadLoad<38400> threadLoad; + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterBusyPeriodFirstThreeTimeSlots3) { + using T = TThreadLoad<38400>; + + ui32 startNs = 3 * T::GetTimeSlotPartLengthNs(); + T threadLoad(startNs); + + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 0; + threadLoad.RegisterBusyPeriod(timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot1) { + using T = TThreadLoad<38400>; + + ui64 timeNs = T::GetTimeSlotPartLengthNs(); + T threadLoad(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 3 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 4 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot2) { + using T = TThreadLoad<38400>; + + ui64 timeNs = T::GetTimeSlotPartLengthNs(); + T threadLoad(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 4 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 3); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTimeSlot3) { + using T = TThreadLoad<38400>; + + ui64 timeNs = T::GetTimeSlotPartLengthNs(); + T threadLoad(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 3 * T::GetTimeSlotPartLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 4 * T::GetTimeSlotPartLengthNs() - 2; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 5 * T::GetTimeSlotPartLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 3); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots1) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots2) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstTwoTimeSlots3) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs() - 1; + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 1); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots1) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots2) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 1u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots3) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots4) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = threadLoad.GetTimeSlotLengthNs() + 2 * threadLoad.GetTimeSlotPartLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + timeNs = 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotPartCount() - 2); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 3u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodFirstThreeTimeSlots5) { + using T = TThreadLoad<38400>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 2 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 2u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = timeNs + threadLoad.GetTimeWindowLengthNs() + threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(TThreadLoadRegisterIdlePeriodOverTimeWindow) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint8_t>; + + T threadLoad; + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), 0); + for (auto slotIndex = 0u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + ui64 timeNs = 5 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[3].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[4].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 5u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + + timeNs = timeNs + threadLoad.GetTimeWindowLengthNs() - 3 * threadLoad.GetTimeSlotLengthNs(); + threadLoad.RegisterIdlePeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoad.LastTimeNs.load(), timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[0].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[1].load(), 0); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[2].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[3].load(), threadLoad.GetTimeSlotMaxValue()); + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[4].load(), threadLoad.GetTimeSlotMaxValue()); + for (auto slotIndex = 5u; slotIndex < threadLoad.GetTimeSlotCount(); ++slotIndex) { + UNIT_ASSERT_VALUES_EQUAL(threadLoad.TimeSlots[slotIndex].load(), 0); + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsZeroShiftNs) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + UNIT_ASSERT_VALUES_EQUAL(T::GetTimeSlotPartCount(), (ui64)std::numeric_limits<std::uint16_t>::max() + 1); + + T *threadLoads[2]; + threadLoads[0] = new T; + threadLoads[1] = new T; + + for (ui64 i = 1; i < timeSlotCount; i += 2) { + threadLoads[0]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs()); + threadLoads[0]->RegisterBusyPeriod((i + 1) * T::GetTimeSlotLengthNs()); + } + + for (ui64 i = 1; i < timeSlotCount; i += 2) { + threadLoads[1]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs()); + threadLoads[1]->RegisterIdlePeriod((i + 1) * T::GetTimeSlotLengthNs()); + } + + TMinusOneThreadEstimator estimator; + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, 2, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + UNIT_ASSERT_VALUES_EQUAL(value, 0); + + delete threadLoads[0]; + delete threadLoads[1]; + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsOneTimeSlotShift1) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 2; i < threadLoads[t]->GetTimeSlotCount(); i += 2) { + threadLoads[t]->RegisterIdlePeriod((i - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterIdlePeriod((threadLoads[t]->GetTimeSlotCount() - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsOneTimeSlotShift2) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 2; i < threadLoads[t]->GetTimeSlotCount(); i += 2) { + threadLoads[t]->RegisterBusyPeriod((i - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterBusyPeriod((threadLoads[t]->GetTimeSlotCount() - 1) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 0) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 2 == 0) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift1) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 4; i < threadLoads[t]->GetTimeSlotCount(); i += 4) { + threadLoads[t]->RegisterIdlePeriod((i - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterIdlePeriod((threadLoads[t]->GetTimeSlotCount() - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterBusyPeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 2 || s % 4 == 3) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 2 || s % 4 == 3) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[s].load(), 0); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift2) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + for (ui64 i = 4; i < threadLoads[t]->GetTimeSlotCount(); i += 4) { + threadLoads[t]->RegisterBusyPeriod((i - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(i * T::GetTimeSlotLengthNs()); + } + + threadLoads[t]->RegisterBusyPeriod((threadLoads[t]->GetTimeSlotCount() - 2) * T::GetTimeSlotLengthNs()); + threadLoads[t]->RegisterIdlePeriod(threadLoads[t]->GetTimeSlotCount() * T::GetTimeSlotLengthNs()); + + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 0 || s % 4 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (ui64 t = 0; t < threadCount; ++t) { + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + if (s % 4 == 0 || s % 4 == 1) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimatorTwoThreadLoadsTwoTimeSlotsShift3) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 2; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + + auto timeNs = T::GetTimeWindowLengthNs() - 1.5 * T::GetTimeSlotLengthNs(); + threadLoads[t]->RegisterIdlePeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs); + + timeNs = T::GetTimeWindowLengthNs(); + threadLoads[t]->RegisterBusyPeriod(timeNs); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs); + + for (ui64 s = 0; s + 2 < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 2].load(), T::GetTimeSlotPartCount() / 2); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 1].load(), T::GetTimeSlotMaxValue()); + } + + TMinusOneThreadEstimator estimator; + auto result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + for (ui64 s = 0; s + 2 < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), 0, ToString(s).c_str()); + } + + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 2].load(), T::GetTimeSlotPartCount() / 2); + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->TimeSlots[timeSlotCount - 1].load(), T::GetTimeSlotMaxValue()); + } + + UNIT_ASSERT_VALUES_EQUAL(result, 2 * T::GetTimeSlotLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + + Y_UNIT_TEST(MinusOneThreadEstimator16ThreadLoadsAllTimeSlots) { + constexpr auto timeWindowLengthNs = 5368709120ull; // 5 * 2 ^ 30 ~5 sec + constexpr auto timeSlotLengthNs = 524288ull; // 2 ^ 19 ns ~ 512 usec + constexpr auto timeSlotCount = timeWindowLengthNs / timeSlotLengthNs; + constexpr auto threadCount = 16; + constexpr auto estimatesCount = 16; + + using T = TThreadLoad<timeSlotCount, timeSlotLengthNs, std::uint16_t>; + + for (auto e = 0u; e < estimatesCount; ++e) { + T *threadLoads[threadCount]; + + for (auto t = 0u; t < threadCount; ++t) { + threadLoads[t] = new T; + auto timeNs = threadLoads[t]->GetTimeWindowLengthNs(); + threadLoads[t]->RegisterBusyPeriod(timeNs); + + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), timeNs); + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } + } + + ui64 result = 0; + { + THPTimer timer; + TMinusOneThreadEstimator estimator; + result = estimator.MaxLatencyIncreaseWithOneLessCpu(threadLoads, threadCount, T::GetTimeWindowLengthNs(), T::GetTimeWindowLengthNs()); + // output in microseconds + auto passed = timer.Passed() * 1000000; + Y_UNUSED(passed); + // Cerr << "timer : " << passed << " " << __LINE__ << Endl; + } + + for (ui64 t = 0; t < threadCount; ++t) { + UNIT_ASSERT_VALUES_EQUAL(threadLoads[t]->LastTimeNs.load(), T::GetTimeWindowLengthNs()); + for (ui64 s = 0; s < threadLoads[t]->GetTimeSlotCount(); ++s) { + UNIT_ASSERT_VALUES_EQUAL_C(threadLoads[t]->TimeSlots[s].load(), T::GetTimeSlotMaxValue(), ToString(s).c_str()); + } + } + + UNIT_ASSERT_VALUES_EQUAL(result, T::GetTimeWindowLengthNs()); + + for (auto t = 0u; t < threadCount; ++t) { + delete threadLoads[t]; + } + } + } +} |