diff options
author | kruall <kruall@ydb.tech> | 2022-11-15 13:32:41 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-11-15 13:32:41 +0300 |
commit | b77b7e9f09f1e17a832b2879f3a5e8cb2e2c072f (patch) | |
tree | 92d43929bb664ade3f2c75b474881a7694967b56 /library | |
parent | 660fa18bb981ed3fa5b10b6ec2d9248d80e33f87 (diff) | |
download | ydb-b77b7e9f09f1e17a832b2879f3a5e8cb2e2c072f.tar.gz |
Improve semaphore,
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 365 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 37 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic_ut.cpp | 353 |
3 files changed, 438 insertions, 317 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 4dce16939a..7dff052d3e 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -36,6 +36,8 @@ namespace NActors { , MaxUtilizationAccumulator(0) , ThreadCount(threads) { + auto semaphore = TSemaphore(); + Semaphore = semaphore.ConverToI64(); } TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg) @@ -56,126 +58,161 @@ namespace NActors { Threads.Destroy(); } + bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) { + do { + if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + if (threadCtx.BlockedPad.Park()) // interrupted + return true; + timers.HPStart = GetCycleCountFast(); + timers.Blocked += timers.HPStart - timers.HPNow; + } + } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag)); + return false; + } + + bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) { + do { + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + if (threadCtx.Pad.Park()) // interrupted + return true; + timers.HPStart = GetCycleCountFast(); + timers.Parked += timers.HPStart - timers.HPNow; + } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag)); + return false; + } + + void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) { + ui64 start = GetCycleCountFast(); + bool doSpin = true; + while (true) { + for (ui32 j = 0; doSpin && j < 12; ++j) { + if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { + doSpin = false; + break; + } + for (ui32 i = 0; i < 12; ++i) { + if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { + SpinLockPause(); + } else { + doSpin = false; + break; + } + } + } + if (!doSpin) { + break; + } + if (RelaxedLoad(&StopFlag)) { + break; + } + } + } + + bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) { +#if defined ACTORSLIB_COLLECT_EXEC_STATS + if (AtomicGetAndIncrement(ThreadUtilization) == 0) { + // Initially counter contains -t0, the pool start timestamp + // When the first thread goes to sleep we add t1, so the counter + // becomes t1-t0 >= 0, or the duration of max utilization so far. + // If the counter was negative and becomes positive, that means + // counter just turned into a duration and we should store that + // duration. Otherwise another thread raced with us and + // subtracted some other timestamp t2. + const i64 t = GetCycleCountFast(); + const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); + if (x < 0 && x + t > 0) + AtomicStore(&MaxUtilizationAccumulator, x + t); + } +#endif + + Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); + + if (SpinThreshold > 0 && !needToBlock) { + // spin configured period + AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); + GoToSpin(threadCtx); + // then - sleep + if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { + if (GoToSleep(threadCtx, timers)) { // interrupted + return true; + } + } + } + } else { + AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); + if (GoToSleep(threadCtx, timers)) { // interrupted + return true; + } + } + + Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + +#if defined ACTORSLIB_COLLECT_EXEC_STATS + if (AtomicDecrement(ThreadUtilization) == 0) { + // When we started sleeping counter contained t1-t0, or the + // last duration of max utilization. Now we subtract t2 >= t1, + // which turns counter negative again, and the next sleep cycle + // at timestamp t3 would be adding some new duration t3-t2. + // If the counter was positive and becomes negative that means + // there are no current races with other threads and we should + // store the last positive duration we observed. Multiple + // threads may be adding and subtracting values in potentially + // arbitrary order, which would cause counter to oscillate + // around zero. When it crosses zero is a good indication of a + // correct value. + const i64 t = GetCycleCountFast(); + const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); + if (x > 0 && x - t < 0) + AtomicStore(&MaxUtilizationAccumulator, x); + } +#endif + return false; + } + ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { ui32 workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); - NHPTimer::STime elapsed = 0; - NHPTimer::STime parked = 0; - NHPTimer::STime blocked = 0; - NHPTimer::STime hpstart = GetCycleCountFast(); - NHPTimer::STime hpnow; + TTimers timers; TThreadCtx& threadCtx = Threads[workerId]; AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) { - do { - if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.BlockedPad.Park()) // interrupted - return 0; - hpstart = GetCycleCountFast(); - blocked += hpstart - hpnow; - } - } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag)); + if (GoToBeBlocked(threadCtx, timers)) { // interrupted + return 0; + } } - const TAtomic x = AtomicDecrement(Semaphore); + bool needToWait = false; + bool needToBlock = false; - if (x < 0) { -#if defined ACTORSLIB_COLLECT_EXEC_STATS - if (AtomicGetAndIncrement(ThreadUtilization) == 0) { - // Initially counter contains -t0, the pool start timestamp - // When the first thread goes to sleep we add t1, so the counter - // becomes t1-t0 >= 0, or the duration of max utilization so far. - // If the counter was negative and becomes positive, that means - // counter just turned into a duration and we should store that - // duration. Otherwise another thread raced with us and - // subtracted some other timestamp t2. - const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); - if (x < 0 && x + t > 0) - AtomicStore(&MaxUtilizationAccumulator, x + t); - } -#endif + TAtomic x = AtomicGet(Semaphore); + do { + i64 oldX = x; + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + needToBlock = semaphore.CurrentSleepThreadCount < 0; + needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount; - Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); - - if (SpinThreshold > 0) { - // spin configured period - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); - ui64 start = GetCycleCountFast(); - bool doSpin = true; - while (true) { - for (ui32 j = 0; doSpin && j < 12; ++j) { - if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { - doSpin = false; - break; - } - for (ui32 i = 0; i < 12; ++i) { - if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { - SpinLockPause(); - } else { - doSpin = false; - break; - } - } - } - if (!doSpin) { - break; - } - if (RelaxedLoad(&StopFlag)) { - break; - } - } - // then - sleep - if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { - do { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.Pad.Park()) // interrupted - return 0; - hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; - } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); - } - } - } else { - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); - do { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - if (threadCtx.Pad.Park()) // interrupted - return 0; - hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; - } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); + semaphore.OldSemaphore--; + if (needToWait) { + semaphore.CurrentSleepThreadCount++; } - Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); + if (x == oldX) { + break; + } + } while (!StopFlag); -#if defined ACTORSLIB_COLLECT_EXEC_STATS - if (AtomicDecrement(ThreadUtilization) == 0) { - // When we started sleeping counter contained t1-t0, or the - // last duration of max utilization. Now we subtract t2 >= t1, - // which turns counter negative again, and the next sleep cycle - // at timestamp t3 would be adding some new duration t3-t2. - // If the counter was positive and becomes negative that means - // there are no current races with other threads and we should - // store the last positive duration we observed. Multiple - // threads may be adding and subtracting values in potentially - // arbitrary order, which would cause counter to oscillate - // around zero. When it crosses zero is a good indication of a - // correct value. - const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); - if (x > 0 && x - t < 0) - AtomicStore(&MaxUtilizationAccumulator, x); + if (needToWait) { + if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted + return 0; } -#endif } else { AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING); } @@ -183,14 +220,14 @@ namespace NActors { // ok, has work suggested, must dequeue while (!RelaxedLoad(&StopFlag)) { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed); - if (parked > 0) { - wctx.AddParkedCycles(parked); + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.Elapsed); + if (timers.Parked > 0) { + wctx.AddParkedCycles(timers.Parked); } - if (blocked > 0) { - wctx.AddBlockedCycles(blocked); + if (timers.Blocked > 0) { + wctx.AddBlockedCycles(timers.Blocked); } return activation; } @@ -228,8 +265,25 @@ namespace NActors { void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { Activations.Push(activation, revolvingCounter); - const TAtomic x = AtomicIncrement(Semaphore); - if (x <= 0) { // we must find someone to wake-up + bool needToWakeUp = false; + + TAtomic x = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + do { + needToWakeUp = semaphore.CurrentSleepThreadCount > 0; + i64 oldX = semaphore.ConverToI64(); + semaphore.OldSemaphore++; + if (needToWakeUp) { + semaphore.CurrentSleepThreadCount--; + } + x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), oldX); + if (x == oldX) { + break; + } + semaphore = TSemaphore::GetSemaphore(x); + } while (true); + + if (needToWakeUp) { // we must find someone to wake-up WakeUpLoop(); } } @@ -345,87 +399,12 @@ namespace NActors { with_lock (ChangeThreadsLock) { size_t prevCount = GetThreadCount(); AtomicSet(ThreadCount, threads); - if (prevCount < threads) { - for (size_t i = prevCount; i < threads; ++i) { - bool repeat = true; - while (repeat) { - switch (AtomicGet(Threads[i].BlockedFlag)) { - case TThreadCtx::BS_BLOCKING: - if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) { - // thread not entry to blocked loop - repeat = false; - } - break; - case TThreadCtx::BS_BLOCKED: - // thread entry to blocked loop and we wake it - AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE); - Threads[i].BlockedPad.Unpark(); - repeat = false; - break; - default: - // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block - Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up"); - } - } - } - } else if (prevCount > threads) { - // at first, start to block - for (size_t i = threads; i < prevCount; ++i) { - Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE); - AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING); - } - // after check need to wake up threads - for (size_t idx = threads; idx < prevCount; ++idx) { - TThreadCtx& threadCtx = Threads[idx]; - auto waitingFlag = AtomicGet(threadCtx.WaitingFlag); - auto blockedFlag = AtomicGet(threadCtx.BlockedFlag); - // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go. - // Either go to sleep and it will have to wake up, - // or go to execute task and after completion will be blocked. - while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) { - waitingFlag = AtomicGet(threadCtx.WaitingFlag); - blockedFlag = AtomicGet(threadCtx.BlockedFlag); - } - // next states: - // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block - // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block - // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing - // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked - - if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) { - // need wake up - Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING); - - // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored - constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask; - ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter); - - Activations.Push(emptyMailBoxHint, revolvingCounter); - - auto x = AtomicIncrement(Semaphore); - if (x <= 0) { - // try wake up. if success then go to next thread - switch (waitingFlag){ - case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { - continue; - } - break; - case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { - threadCtx.Pad.Unpark(); - continue; - } - break; - default: - ; // other thread woke this sleeping thread - } - // if thread has already been awakened then we must awaken the other - WakeUpLoop(); - } - } - } - } + + TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore)); + i64 oldX = semaphore.ConverToI64(); + semaphore.CurrentSleepThreadCount += threads - prevCount; + semaphore.OldSemaphore -= threads - prevCount; + AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX); } } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 023190f7fe..e65ad20a48 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -45,6 +45,14 @@ namespace NActors { } }; + struct TTimers { + NHPTimer::STime Elapsed = 0; + NHPTimer::STime Parked = 0; + NHPTimer::STime Blocked = 0; + NHPTimer::STime HPStart = GetCycleCountFast(); + NHPTimer::STime HPNow; + }; + const ui64 SpinThreshold; const ui64 SpinThresholdCycles; @@ -67,6 +75,31 @@ namespace NActors { TMutex ChangeThreadsLock; public: + struct TSemaphore { + i64 OldSemaphore = 0; // 34 bits + // Sign bit + i8 Reserved1 = 0; // 5 bits + i16 CurrentSleepThreadCount = 0; // 16 bits + i8 Reserved2 = 0; // 8 bits + + inline i64 ConverToI64() { + i64 value = (1ll << 34) + OldSemaphore; + return value + | ((i64)Reserved1 << 35) + | ((i64)CurrentSleepThreadCount << 40) + | ((i64)Reserved2 << 56); + } + + static inline TSemaphore GetSemaphore(i64 value) { + TSemaphore semaphore; + semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34); + semaphore.Reserved1 = (value >> 35) & 0x1f; + semaphore.CurrentSleepThreadCount = (value >> 40) & 0xffff; + semaphore.Reserved2 = (value >> 56) & 0xff; + return semaphore; + } + }; + static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX; static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX; @@ -107,5 +140,9 @@ namespace NActors { private: void WakeUpLoop(); + bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); + void GoToSpin(TThreadCtx& threadCtx); + bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers); + bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers); }; } diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 3ef5808d72..574b0b0759 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -6,10 +6,15 @@ #include <library/cpp/actors/util/should_continue.h> #include <library/cpp/testing/unittest/registar.h> -#include <library/cpp/actors/protos/unittests.pb.h> using namespace NActors; +#define VALUES_EQUAL(a, b, ...) \ + UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \ + << ' ' << (i64)semaphore.Reserved1 \ + << ' ' << (i64)semaphore.CurrentSleepThreadCount \ + << ' ' << (i64)semaphore.Reserved2 __VA_ARGS__); + //////////////////////////////////////////////////////////////////////////////// struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { @@ -90,138 +95,59 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool) Y_UNIT_TEST_SUITE(BasicExecutorPool) { - Y_UNIT_TEST(DecreaseIncreaseThreadsCount) { - const size_t msgCount = 1e4; - const size_t size = 4; - const size_t halfSize = size / 2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); + Y_UNIT_TEST(Semaphore) { + TBasicExecutorPool::TSemaphore semaphore; + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(0); - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); + VALUES_EQUAL(0, semaphore.ConverToI64()); + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(-1); + VALUES_EQUAL(-1, semaphore.ConverToI64()); + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(1); + VALUES_EQUAL(1, semaphore.ConverToI64()); - executorPool->SetThreadCount(halfSize); - TTestSenderActor* actors[size]; - TActorId actorIds[size]; - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); + for (i64 value = -1'000'000; value <= 1'000'000; ++value) { + VALUES_EQUAL(TBasicExecutorPool::TSemaphore::GetSemaphore(value).ConverToI64(), value); } - const int testCount = 2; - - TExecutorPoolStats poolStats[testCount]; - TVector<TExecutorThreadStats> statsCopy[testCount]; - - for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); + for (i8 sleepThreads = -10; sleepThreads <= 10; ++sleepThreads) { + + semaphore = TBasicExecutorPool::TSemaphore(); + semaphore.CurrentSleepThreadCount = sleepThreads; + i64 initialValue = semaphore.ConverToI64(); + + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(initialValue - 1); + VALUES_EQUAL(-1, semaphore.OldSemaphore); + + i64 value = initialValue; + value -= 100; + for (i32 expected = -100; expected <= 100; ++expected) { + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); + UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.Reserved2); + UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.Reserved2); + semaphore = TBasicExecutorPool::TSemaphore(); + semaphore.OldSemaphore = expected; + semaphore.CurrentSleepThreadCount = sleepThreads; + UNIT_ASSERT_VALUES_EQUAL(semaphore.ConverToI64(), value); + value++; } - for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - Sleep(TDuration::MilliSeconds(100)); - for (size_t i = 0; i < size; ++i) { - actors[i]->Stop(); + for (i32 expected = 101; expected >= -101; --expected) { + semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); + UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.Reserved2); + UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore + << ' ' << (i64)semaphore.CurrentSleepThreadCount + << ' ' << (i64)semaphore.Reserved2); + value--; } - - executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); } - for (size_t i = 1; i <= halfSize; ++i) { - UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - - for (size_t i = halfSize + 1; i <= size; ++i) { - UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - - executorPool->SetThreadCount(size); - - for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actors[i]->SelfId(), msgCount); - } - for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - Sleep(TDuration::MilliSeconds(100)); - - for (size_t i = 0; i < size; ++i) { - actors[i]->Stop(); - } - - executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); - } - - for (size_t i = 1; i <= size; ++i) { - UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); - } - } - - Y_UNIT_TEST(ChangeCount) { - const size_t msgCount = 1e3; - const size_t size = 4; - const size_t halfSize = size / 2; - TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - - auto begin = TInstant::Now(); - - auto setup = GetActorSystemSetup(executorPool); - TActorSystem actorSystem(setup); - actorSystem.Start(); - executorPool->SetThreadCount(halfSize); - - TTestSenderActor* actors[size]; - TActorId actorIds[size]; - for (size_t i = 0; i < size; ++i) { - actors[i] = new TTestSenderActor(); - actorIds[i] = actorSystem.Register(actors[i]); - } - - for (size_t i = 0; i < size; ++i) { - actors[i]->Start(actorIds[i], msgCount); - } - for (size_t i = 0; i < size; ++i) { - actorSystem.Send(actorIds[i], new TEvMsg()); - } - - const i32 N = 6; - const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; - - ui64 counter = 0; - - TTestSenderActor* changerActor = new TTestSenderActor([&]{ - executorPool->SetThreadCount(threadsCouns[counter]); - counter++; - if (counter == N) { - counter = 0; - } - }); - TActorId changerActorId = actorSystem.Register(changerActor); - changerActor->Start(changerActorId, msgCount); - actorSystem.Send(changerActorId, new TEvMsg()); - - while (true) { - size_t maxCounter = 0; - for (size_t i = 0; i < size; ++i) { - maxCounter = Max(maxCounter, actors[i]->GetCounter()); - } - - if (maxCounter == 0) { - break; - } - - auto now = TInstant::Now(); - UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - - Sleep(TDuration::MilliSeconds(1)); - } - - changerActor->Stop(); + //UNIT_ASSERT_VALUES_EQUAL_C(-1, TBasicExecutorPool::TSemaphore::GetSemaphore(value-1).OldSemaphore); } Y_UNIT_TEST(CheckCompleteOne) { @@ -433,3 +359,182 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0); } } + +Y_UNIT_TEST_SUITE(ChangingThreadsCountInBasicExecutorPool) { + + struct TMockState { + void ActorDo() {} + }; + + struct TTestActors { + const size_t Count; + TArrayHolder<TTestSenderActor*> Actors; + TArrayHolder<TActorId> ActorIds; + + TTestActors(size_t count) + : Count(count) + , Actors(new TTestSenderActor*[count]) + , ActorIds(new TActorId[count]) + { } + + void Start(TActorSystem &actorSystem, size_t msgCount) { + for (size_t i = 0; i < Count; ++i) { + Actors[i]->Start(Actors[i]->SelfId(), msgCount); + } + for (size_t i = 0; i < Count; ++i) { + actorSystem.Send(ActorIds[i], new TEvMsg()); + } + } + + void Stop() { + for (size_t i = 0; i < Count; ++i) { + Actors[i]->Stop(); + } + } + }; + + template <typename TState = TMockState> + struct TTestCtx { + const size_t MaxThreadCount; + const size_t SendingMessageCount; + std::unique_ptr<TBasicExecutorPool> ExecutorPool; + THolder<TActorSystemSetup> Setup; + TActorSystem ActorSystem; + + TState State; + + TTestCtx(size_t maxThreadCount, size_t sendingMessageCount) + : MaxThreadCount(maxThreadCount) + , SendingMessageCount(sendingMessageCount) + , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50)) + , Setup(GetActorSystemSetup(ExecutorPool.get())) + , ActorSystem(Setup) + { + } + + TTestCtx(size_t maxThreadCount, size_t sendingMessageCount, const TState &state) + : MaxThreadCount(maxThreadCount) + , SendingMessageCount(sendingMessageCount) + , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50)) + , Setup(GetActorSystemSetup(ExecutorPool.get())) + , ActorSystem(Setup) + , State(state) + { + } + + ~TTestCtx() { + ExecutorPool.release(); + } + + TTestActors RegisterCheckActors(size_t actorCount) { + TTestActors res(actorCount); + for (size_t i = 0; i < actorCount; ++i) { + res.Actors[i] = new TTestSenderActor([&] { + State.ActorDo(); + }); + res.ActorIds[i] = ActorSystem.Register(res.Actors[i]); + } + return res; + } + }; + + struct TCheckingInFlightState { + TAtomic ExpectedMaximum = 0; + TAtomic CurrentInFlight = 0; + + void ActorStartProcessing() { + ui32 inFlight = AtomicIncrement(CurrentInFlight); + ui32 maximum = AtomicGet(ExpectedMaximum); + if (maximum) { + UNIT_ASSERT_C(inFlight <= maximum, "inFlight# " << inFlight << " maximum# " << maximum); + } + } + + void ActorStopProcessing() { + AtomicDecrement(CurrentInFlight); + } + + void ActorDo() { + ActorStartProcessing(); + NanoSleep(1'000'000); + ActorStopProcessing(); + } + }; + + Y_UNIT_TEST(DecreaseIncreaseThreadCount) { + const size_t msgCount = 1e2; + const size_t size = 4; + const size_t testCount = 2; + TTestCtx<TCheckingInFlightState> ctx(size, msgCount); + ctx.ActorSystem.Start(); + + TVector<TExecutorThreadStats> statsCopy[testCount]; + + TTestActors testActors = ctx.RegisterCheckActors(size); + + const size_t N = 6; + const size_t threadsCounts[N] = { 1, 3, 2, 3, 1, 4 }; + for (ui32 idx = 0; idx < 4 * N; ++idx) { + size_t currentThreadCount = threadsCounts[idx]; + ctx.ExecutorPool->SetThreadCount(currentThreadCount); + AtomicSet(ctx.State.ExpectedMaximum, currentThreadCount); + + for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { + testActors.Start(ctx.ActorSystem, msgCount); + Sleep(TDuration::MilliSeconds(100)); + testActors.Stop(); + } + Sleep(TDuration::MilliSeconds(10)); + } + ctx.ActorSystem.Stop(); + } + + Y_UNIT_TEST(ContiniousChangingThreadCount) { + const size_t msgCount = 1e2; + const size_t size = 4; + + auto begin = TInstant::Now(); + TTestCtx<TCheckingInFlightState> ctx(size, msgCount, TCheckingInFlightState{msgCount}); + ctx.ActorSystem.Start(); + TTestActors testActors = ctx.RegisterCheckActors(size); + + testActors.Start(ctx.ActorSystem, msgCount); + + const size_t N = 6; + const size_t threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; + + ui64 counter = 0; + + TTestSenderActor* changerActor = new TTestSenderActor([&]{ + ctx.State.ActorStartProcessing(); + AtomicSet(ctx.State.ExpectedMaximum, 0); + ctx.ExecutorPool->SetThreadCount(threadsCouns[counter]); + NanoSleep(10'000'000); + AtomicSet(ctx.State.ExpectedMaximum, threadsCouns[counter]); + counter++; + if (counter == N) { + counter = 0; + } + ctx.State.ActorStopProcessing(); + }); + TActorId changerActorId = ctx.ActorSystem.Register(changerActor); + changerActor->Start(changerActorId, msgCount); + ctx.ActorSystem.Send(changerActorId, new TEvMsg()); + + while (true) { + size_t maxCounter = 0; + for (size_t i = 0; i < size; ++i) { + maxCounter = Max(maxCounter, testActors.Actors[i]->GetCounter()); + } + if (maxCounter == 0) { + break; + } + auto now = TInstant::Now(); + UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); + Sleep(TDuration::MilliSeconds(1)); + } + + changerActor->Stop(); + ctx.ActorSystem.Stop(); + } +} |