diff options
author | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-08-19 19:42:27 +0300 |
---|---|---|
committer | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-08-19 19:42:27 +0300 |
commit | 9c9f16353b5f81ac978a1773139cc5769d126135 (patch) | |
tree | 432629b176605ce362739db320c246fe4df415f7 /library | |
parent | ab9bc5ca72f57e972d0927aae30fcff37f1d5f7b (diff) | |
download | ydb-9c9f16353b5f81ac978a1773139cc5769d126135.tar.gz |
move parser_ut to YQ
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/util/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/util/cpu_load_log.h | 227 | ||||
-rw-r--r-- | library/cpp/actors/util/cpu_load_log_ut.cpp | 275 |
3 files changed, 503 insertions, 0 deletions
diff --git a/library/cpp/actors/util/CMakeLists.txt b/library/cpp/actors/util/CMakeLists.txt index 40d958d75e..233e1fe0fc 100644 --- a/library/cpp/actors/util/CMakeLists.txt +++ b/library/cpp/actors/util/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-util PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + library-cpp-pop_count ) target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp diff --git a/library/cpp/actors/util/cpu_load_log.h b/library/cpp/actors/util/cpu_load_log.h new file mode 100644 index 0000000000..e4ae612246 --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log.h @@ -0,0 +1,227 @@ +#pragma once + +#include "defs.h" +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/pop_count/popcount.h> + +static constexpr ui64 BitDurationNs = 131'072; // A power of 2 + +template <ui64 DataSize> +struct TCpuLoadLog { + static constexpr ui64 BitsSize = DataSize * 64; + TAtomic LastTimeNs = 0; + ui64 Data[DataSize]; + + TCpuLoadLog() { + LastTimeNs = 0; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + TCpuLoadLog(ui64 timeNs) { + LastTimeNs = timeNs; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + void RegisterBusyPeriod(ui64 timeNs) { + RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs)); + } + + template <bool ModifyLastTime> + void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) { + timeNs |= 1ull; + if (timeNs < lastTimeNs) { + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + const ui64 lastIdx = timeNs / BitDurationNs; + const ui64 curIdx = lastTimeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 firstBitIdx = curIdx % 64; + const ui64 lastElementIdx = lastIdx / 64; + const ui64 lastBitIdx = lastIdx % 64; + if (firstElementIdx == lastElementIdx) { + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx)); + const ui64 newValue = prevValue | bits; + AtomicSet(Data[firstElementIdx % DataSize], newValue); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + // process the first element + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = ((~0ull) << firstBitIdx); + const ui64 newValue = (prevValue | bits); + AtomicSet(Data[firstElementIdx % DataSize], newValue); + ++firstElementIdx; + // process the fully filled elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + for (ui64 i = 0; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } + // process the last element + const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx)); + AtomicSet(Data[lastOffset], newValue2); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + } + + void RegisterIdlePeriod(ui64 timeNs) { + timeNs &= ~1ull; + ui64 lastTimeNs = AtomicGet(LastTimeNs); + if (timeNs < lastTimeNs) { + // Fast check first, slower chec later + if ((timeNs | 1ull) < lastTimeNs) { + // Time goes back, dont panic, just mark the whole array 'busy' + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + AtomicSet(LastTimeNs, timeNs); + return; + } + } + const ui64 curIdx = lastTimeNs / BitDurationNs; + const ui64 lastIdx = timeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 lastElementIdx = lastIdx / 64; + if (firstElementIdx >= lastElementIdx) { + AtomicSet(LastTimeNs, timeNs); + return; + } + // process the first partially filled element + ++firstElementIdx; + // process all other elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], 0); + } + for (ui64 i = 0; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } + AtomicSet(LastTimeNs, timeNs); + } +}; + +template <ui64 DataSize> +struct TMinusOneCpuEstimator { + static constexpr ui64 BitsSize = DataSize * 64; + ui64 BeginDelayIdx; + ui64 EndDelayIdx; + ui64 Idle; + ui64 Delay[BitsSize]; + + ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) { + Y_VERIFY(logCount > 0); + ui64 endTimeNs = timeNs; + + ui64 lastTimeNs = timeNs; + for (i64 log_idx = 0; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->LastTimeNs); + if ((x & 1) == 1) { + lastTimeNs = Min(lastTimeNs, x); + } else { + logs[log_idx]->template RegisterBusyPeriod<false>(endTimeNs, x); + } + } + const ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0; + + ui64 beginIdx = beginTimeNs / BitDurationNs; + ui64 lastIdx = lastTimeNs / BitDurationNs; + ui64 beginElementIdx = beginIdx / 64; + ui64 lastElementIdx = lastIdx / 64; + + BeginDelayIdx = 0; + EndDelayIdx = 0; + Idle = 0; + ui64 maxDelay = 0; + ui64 bucket = 0; + for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) { + ui64 i = idx % DataSize; + ui64 input = AtomicGet(logs[0]->Data[i]); + ui64 all_busy = ~0ull; + for (i64 log_idx = 1; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->Data[i]); + all_busy &= x; + } + if (!input) { + if (!bucket) { + Idle += 64 - PopCount(all_busy); + continue; + } + } + for (i64 bit_idx = 0; bit_idx < 64; ++bit_idx) { + ui64 x = (1ull << bit_idx); + if (all_busy & x) { + if (input & x) { + // Push into the queue + bucket++; + Delay[EndDelayIdx] = EndDelayIdx; + ++EndDelayIdx; + } else { + // All busy + } + } else { + if (input & x) { + // Move success + } else { + if (bucket) { + // Remove from the queue + bucket--; + ui64 stored = Delay[BeginDelayIdx]; + ++BeginDelayIdx; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "bit_idx: " << bit_idx << " stored: " << stored << " delay: " << delay << Endl; + } else { + Idle++; + } + } + } + } + } + if (bucket) { + ui64 stored = Delay[BeginDelayIdx]; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "last stored: " << stored << " delay: " << delay << Endl; + } + return maxDelay * BitDurationNs; + } +}; + diff --git a/library/cpp/actors/util/cpu_load_log_ut.cpp b/library/cpp/actors/util/cpu_load_log_ut.cpp new file mode 100644 index 0000000000..7109123c6e --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log_ut.cpp @@ -0,0 +1,275 @@ +#include "cpu_load_log.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/sanitizers.h> +#include <util/system/thread.h> + +Y_UNIT_TEST_SUITE(CpuLoadLog) { + + TString PrintBits(ui64 x) { + TStringStream str; + for (ui64 i = 0; i < 64; ++i) { + if (x & (1ull << i)) { + str << "1"; + } else { + str << "0"; + } + } + return str.Str(); + } + + Y_UNIT_TEST(FillAll) { + TCpuLoadLog<5> log(100*BitDurationNs); + log.RegisterBusyPeriod(101*BitDurationNs); + log.RegisterBusyPeriod(163*BitDurationNs); + log.RegisterBusyPeriod(164*BitDurationNs); + log.RegisterBusyPeriod(165*BitDurationNs); + log.RegisterBusyPeriod(331*BitDurationNs); + log.RegisterBusyPeriod(340*BitDurationNs); + log.RegisterBusyPeriod(420*BitDurationNs); + log.RegisterBusyPeriod(511*BitDurationNs); + //for (ui64 i = 0; i < 5; ++i) { + // Cerr << "i: " << i << " bits: " << PrintBits(log.Data[i]) << Endl; + //} + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_C((ui64(log.Data[i]) == ~ui64(0)), "Unequal at " << i << "\n got: " << PrintBits(log.Data[i]) + << "\n expected: " << PrintBits(~ui64(0))); + } + } + + Y_UNIT_TEST(PartialFill) { + TCpuLoadLog<5> log(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs/2); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterIdlePeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterBusyPeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1011ull)); + log.RegisterBusyPeriod(63*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(2*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod((192+5*64-1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + log.RegisterIdlePeriod((192+15*64)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + } + + Y_UNIT_TEST(Estimator) { + TCpuLoadLog<5> *log[10]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + log[0]->RegisterIdlePeriod((5*64-2)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[0]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[4]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + log[1]->RegisterIdlePeriod((5*64-2+1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[0]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[4]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64)*BitDurationNs-1, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 1); + + value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64+10)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 12); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator2) { + TCpuLoadLog<5> *log[2]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[i]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + } + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[i]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + } + + log[0]->Data[2] = ~0ull; + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64-1)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 32); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator3) { + TCpuLoadLog<5> *log[3]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + log[2] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=8) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[1]->RegisterIdlePeriod(i*BitDurationNs); + log[1]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[2]->RegisterIdlePeriod(i*BitDurationNs); + log[2]->RegisterBusyPeriod((i+3)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + for (ui64 n = 0; n < 3; ++n) { + UNIT_ASSERT_VALUES_EQUAL_C(PrintBits(log[n]->Data[i]), + PrintBits(0b0000111100001111000011110000111100001111000011110000111100001111ull), + " i: " << i << " n: " << n); + } + } + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 3, (5*64-5)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 4); + + delete log[0]; + delete log[1]; + delete log[2]; + } + /* + class TWorkerThread : public ISimpleThread { + private: + std::function<void()> Func; + double Time = 0.0; + + public: + TWorkerThread(std::function<void()> func) + : Func(std::move(func)) + { } + + double GetTime() const { + return Time; + } + + static THolder<TWorkerThread> Spawn(std::function<void()> func) { + THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func)); + thread->Start(); + return thread; + } + + private: + void* ThreadProc() noexcept override { + THPTimer timer; + Func(); + Time = timer.Passed(); + return nullptr; + } + }; + + void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) { + // Concurrency factor 4 is up to 16 threads + + auto workerFunc = [&](size_t threadIndex) { + }; + + TVector<THolder<TWorkerThread>> workers(threads); + for (size_t i = 0; i < threads; ++i) { + workers[i] = TWorkerThread::Spawn([workerFunc, i]() { + workerFunc(i); + }); + } + + double maxTime = 0; + for (size_t i = 0; i < threads; ++i) { + workers[i]->Join(); + maxTime = Max(maxTime, workers[i]->GetTime()); + } + + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + + Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl; + } + + void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) { + for (size_t i = 0; i < 3; ++i) { + DoConcurrentPushPop(threads, perThreadCount); + } + } + + static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000); + + Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); } + */ +} |