diff options
| author | kruall <[email protected]> | 2022-11-15 13:32:41 +0300 | 
|---|---|---|
| committer | kruall <[email protected]> | 2022-11-15 13:32:41 +0300 | 
| commit | b77b7e9f09f1e17a832b2879f3a5e8cb2e2c072f (patch) | |
| tree | 92d43929bb664ade3f2c75b474881a7694967b56 /library | |
| parent | 660fa18bb981ed3fa5b10b6ec2d9248d80e33f87 (diff) | |
Improve semaphore,
Diffstat (limited to 'library')
| -rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 365 | ||||
| -rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 37 | ||||
| -rw-r--r-- | library/cpp/actors/core/executor_pool_basic_ut.cpp | 353 | 
3 files changed, 438 insertions, 317 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 4dce16939ae..7dff052d3e9 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -36,6 +36,8 @@ namespace NActors {          , MaxUtilizationAccumulator(0)          , ThreadCount(threads)      { +        auto semaphore = TSemaphore(); +        Semaphore = semaphore.ConverToI64();      }      TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg) @@ -56,126 +58,161 @@ namespace NActors {          Threads.Destroy();      } +    bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) { +        do { +            if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { +                timers.HPNow = GetCycleCountFast(); +                timers.Elapsed += timers.HPNow - timers.HPStart; +                if (threadCtx.BlockedPad.Park()) // interrupted +                    return true; +                timers.HPStart = GetCycleCountFast(); +                timers.Blocked += timers.HPStart - timers.HPNow; +            } +        } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag)); +        return false; +    } + +    bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) { +        do { +            timers.HPNow = GetCycleCountFast(); +            timers.Elapsed += timers.HPNow - timers.HPStart; +            if (threadCtx.Pad.Park()) // interrupted +                return true; +            timers.HPStart = GetCycleCountFast(); +            timers.Parked += timers.HPStart - timers.HPNow; +        } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag)); +        return false; +    } + +    void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) { +        ui64 start = GetCycleCountFast(); +        bool doSpin = true; +        while (true) { +            for (ui32 j = 0; doSpin && j < 12; ++j) { +                if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { +                    doSpin = false; +                    break; +                } +                for (ui32 i = 0; i < 12; ++i) { +                    if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { +                        SpinLockPause(); +                    } else { +                        doSpin = false; +                        break; +                    } +                } +            } +            if (!doSpin) { +                break; +            } +            if (RelaxedLoad(&StopFlag)) { +                break; +            } +        } +    } + +    bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) { +#if defined ACTORSLIB_COLLECT_EXEC_STATS +        if (AtomicGetAndIncrement(ThreadUtilization) == 0) { +            // Initially counter contains -t0, the pool start timestamp +            // When the first thread goes to sleep we add t1, so the counter +            // becomes t1-t0 >= 0, or the duration of max utilization so far. +            // If the counter was negative and becomes positive, that means +            // counter just turned into a duration and we should store that +            // duration. Otherwise another thread raced with us and +            // subtracted some other timestamp t2. +            const i64 t = GetCycleCountFast(); +            const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); +            if (x < 0 && x + t > 0) +                AtomicStore(&MaxUtilizationAccumulator, x + t); +        } +#endif + +        Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); + +        if (SpinThreshold > 0 && !needToBlock) { +            // spin configured period +            AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); +            GoToSpin(threadCtx); +            // then - sleep +            if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { +                if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { +                    if (GoToSleep(threadCtx, timers)) {  // interrupted +                        return true; +                    } +                } +            } +        } else { +            AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); +            if (GoToSleep(threadCtx, timers)) {  // interrupted +                return true; +            } +        } + +        Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + +#if defined ACTORSLIB_COLLECT_EXEC_STATS +        if (AtomicDecrement(ThreadUtilization) == 0) { +            // When we started sleeping counter contained t1-t0, or the +            // last duration of max utilization. Now we subtract t2 >= t1, +            // which turns counter negative again, and the next sleep cycle +            // at timestamp t3 would be adding some new duration t3-t2. +            // If the counter was positive and becomes negative that means +            // there are no current races with other threads and we should +            // store the last positive duration we observed. Multiple +            // threads may be adding and subtracting values in potentially +            // arbitrary order, which would cause counter to oscillate +            // around zero. When it crosses zero is a good indication of a +            // correct value. +            const i64 t = GetCycleCountFast(); +            const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); +            if (x > 0 && x - t < 0) +                AtomicStore(&MaxUtilizationAccumulator, x); +        } +#endif +        return false; +    } +      ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {          ui32 workerId = wctx.WorkerId;          Y_VERIFY_DEBUG(workerId < PoolThreads); -        NHPTimer::STime elapsed = 0; -        NHPTimer::STime parked = 0; -        NHPTimer::STime blocked = 0; -        NHPTimer::STime hpstart = GetCycleCountFast(); -        NHPTimer::STime hpnow; +        TTimers timers;          TThreadCtx& threadCtx = Threads[workerId];          AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);          if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) { -            do { -                if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { -                    hpnow = GetCycleCountFast(); -                    elapsed += hpnow - hpstart; -                    if (threadCtx.BlockedPad.Park()) // interrupted -                        return 0; -                    hpstart = GetCycleCountFast(); -                    blocked += hpstart - hpnow; -                } -            } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag)); +            if (GoToBeBlocked(threadCtx, timers)) { // interrupted +                return 0; +            }          } -        const TAtomic x = AtomicDecrement(Semaphore); +        bool needToWait = false; +        bool needToBlock = false; -        if (x < 0) { -#if defined ACTORSLIB_COLLECT_EXEC_STATS -            if (AtomicGetAndIncrement(ThreadUtilization) == 0) { -                // Initially counter contains -t0, the pool start timestamp -                // When the first thread goes to sleep we add t1, so the counter -                // becomes t1-t0 >= 0, or the duration of max utilization so far. -                // If the counter was negative and becomes positive, that means -                // counter just turned into a duration and we should store that -                // duration. Otherwise another thread raced with us and -                // subtracted some other timestamp t2. -                const i64 t = GetCycleCountFast(); -                const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); -                if (x < 0 && x + t > 0) -                    AtomicStore(&MaxUtilizationAccumulator, x + t); -            } -#endif +        TAtomic x = AtomicGet(Semaphore); +        do { +            i64 oldX = x; +            TSemaphore semaphore = TSemaphore::GetSemaphore(x); +            needToBlock = semaphore.CurrentSleepThreadCount < 0; +            needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount; -            Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); - -            if (SpinThreshold > 0) { -                // spin configured period -                AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); -                ui64 start = GetCycleCountFast(); -                bool doSpin = true; -                while (true) { -                    for (ui32 j = 0; doSpin && j < 12; ++j) { -                        if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { -                            doSpin = false; -                            break; -                        } -                        for (ui32 i = 0; i < 12; ++i) { -                            if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { -                                SpinLockPause(); -                            } else { -                                doSpin = false; -                                break; -                            } -                        } -                    } -                    if (!doSpin) { -                        break; -                    } -                    if (RelaxedLoad(&StopFlag)) { -                        break; -                    } -                } -                // then - sleep -                if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { -                    if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { -                        do { -                            hpnow = GetCycleCountFast(); -                            elapsed += hpnow - hpstart; -                            if (threadCtx.Pad.Park()) // interrupted -                                return 0; -                            hpstart = GetCycleCountFast(); -                            parked += hpstart - hpnow; -                        } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); -                    } -                } -            } else { -                AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); -                do { -                    hpnow = GetCycleCountFast(); -                    elapsed += hpnow - hpstart; -                    if (threadCtx.Pad.Park()) // interrupted -                        return 0; -                    hpstart = GetCycleCountFast(); -                    parked += hpstart - hpnow; -                } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED); +            semaphore.OldSemaphore--; +            if (needToWait) { +                semaphore.CurrentSleepThreadCount++;              } -            Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); +            x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); +            if (x == oldX) { +                break; +            } +        } while (!StopFlag); -#if defined ACTORSLIB_COLLECT_EXEC_STATS -            if (AtomicDecrement(ThreadUtilization) == 0) { -                // When we started sleeping counter contained t1-t0, or the -                // last duration of max utilization. Now we subtract t2 >= t1, -                // which turns counter negative again, and the next sleep cycle -                // at timestamp t3 would be adding some new duration t3-t2. -                // If the counter was positive and becomes negative that means -                // there are no current races with other threads and we should -                // store the last positive duration we observed. Multiple -                // threads may be adding and subtracting values in potentially -                // arbitrary order, which would cause counter to oscillate -                // around zero. When it crosses zero is a good indication of a -                // correct value. -                const i64 t = GetCycleCountFast(); -                const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); -                if (x > 0 && x - t < 0) -                    AtomicStore(&MaxUtilizationAccumulator, x); +        if (needToWait) { +            if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted +                return 0;              } -#endif          } else {              AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING);          } @@ -183,14 +220,14 @@ namespace NActors {          // ok, has work suggested, must dequeue          while (!RelaxedLoad(&StopFlag)) {              if (const ui32 activation = Activations.Pop(++revolvingCounter)) { -                hpnow = GetCycleCountFast(); -                elapsed += hpnow - hpstart; -                wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed); -                if (parked > 0) { -                    wctx.AddParkedCycles(parked); +                timers.HPNow = GetCycleCountFast(); +                timers.Elapsed += timers.HPNow - timers.HPStart; +                wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.Elapsed); +                if (timers.Parked > 0) { +                    wctx.AddParkedCycles(timers.Parked);                  } -                if (blocked > 0) { -                    wctx.AddBlockedCycles(blocked); +                if (timers.Blocked > 0) { +                    wctx.AddBlockedCycles(timers.Blocked);                  }                  return activation;              } @@ -228,8 +265,25 @@ namespace NActors {      void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {          Activations.Push(activation, revolvingCounter); -        const TAtomic x = AtomicIncrement(Semaphore); -        if (x <= 0) { // we must find someone to wake-up +        bool needToWakeUp = false; + +        TAtomic x = AtomicGet(Semaphore); +        TSemaphore semaphore = TSemaphore::GetSemaphore(x); +        do { +            needToWakeUp = semaphore.CurrentSleepThreadCount > 0; +            i64 oldX = semaphore.ConverToI64(); +            semaphore.OldSemaphore++; +            if (needToWakeUp) { +                semaphore.CurrentSleepThreadCount--; +            } +            x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), oldX); +            if (x == oldX) { +                break; +            } +            semaphore = TSemaphore::GetSemaphore(x); +        } while (true); + +        if (needToWakeUp) { // we must find someone to wake-up              WakeUpLoop();          }      } @@ -345,87 +399,12 @@ namespace NActors {          with_lock (ChangeThreadsLock) {              size_t prevCount = GetThreadCount();              AtomicSet(ThreadCount, threads); -            if (prevCount < threads) { -                for (size_t i = prevCount; i < threads; ++i) { -                    bool repeat = true; -                    while (repeat) { -                        switch (AtomicGet(Threads[i].BlockedFlag)) { -                        case TThreadCtx::BS_BLOCKING: -                            if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) { -                                // thread not entry to blocked loop -                                repeat = false; -                            } -                            break; -                        case TThreadCtx::BS_BLOCKED: -                            // thread entry to blocked loop and we wake it -                            AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE); -                            Threads[i].BlockedPad.Unpark(); -                            repeat = false; -                            break; -                        default: -                            // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block -                            Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up"); -                        } -                    } -                } -            } else if (prevCount > threads) { -                // at first, start to block -                for (size_t i = threads; i < prevCount; ++i) { -                    Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE); -                    AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING); -                } -                // after check need to wake up threads -                for (size_t idx = threads; idx < prevCount; ++idx) { -                    TThreadCtx& threadCtx = Threads[idx]; -                    auto waitingFlag = AtomicGet(threadCtx.WaitingFlag); -                    auto blockedFlag = AtomicGet(threadCtx.BlockedFlag); -                    // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go. -                    // Either go to sleep and it will have to wake up, -                    // or go to execute task and after completion will be blocked. -                    while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) { -                        waitingFlag = AtomicGet(threadCtx.WaitingFlag); -                        blockedFlag = AtomicGet(threadCtx.BlockedFlag); -                    } -                    // next states: -                    // 1) WS_ACTIVE  BS_BLOCKING - waiting and start spinig | need wake up to block -                    // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep  | need wake up to block -                    // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing -                    // 4) WS_NONE    BS_BLOCKED  - blocked       | not need wake up, already blocked - -                    if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) { -                        // need wake up -                        Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING); - -                        // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored -                        constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask; -                        ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter); - -                        Activations.Push(emptyMailBoxHint, revolvingCounter); - -                        auto x = AtomicIncrement(Semaphore); -                        if (x <= 0) { -                            // try wake up. if success then go to next thread -                            switch (waitingFlag){ -                                case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag -                                    if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { -                                        continue; -                                    } -                                    break; -                                case TThreadCtx::WS_BLOCKED: -                                    if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { -                                        threadCtx.Pad.Unpark(); -                                        continue; -                                    } -                                    break; -                                default: -                                    ; // other thread woke this sleeping thread -                            } -                            // if thread has already been awakened then we must awaken the other -                            WakeUpLoop(); -                        } -                    } -                } -            } + +            TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore)); +            i64 oldX = semaphore.ConverToI64(); +            semaphore.CurrentSleepThreadCount += threads - prevCount; +            semaphore.OldSemaphore -= threads - prevCount; +            AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX);          }      }  } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 023190f7fe3..e65ad20a480 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -45,6 +45,14 @@ namespace NActors {              }          }; +        struct TTimers { +            NHPTimer::STime Elapsed = 0; +            NHPTimer::STime Parked = 0; +            NHPTimer::STime Blocked = 0; +            NHPTimer::STime HPStart = GetCycleCountFast(); +            NHPTimer::STime HPNow; +        }; +          const ui64 SpinThreshold;          const ui64 SpinThresholdCycles; @@ -67,6 +75,31 @@ namespace NActors {          TMutex ChangeThreadsLock;      public: +        struct TSemaphore { +            i64 OldSemaphore = 0; // 34 bits +            // Sign bit +            i8 Reserved1 = 0; // 5 bits +            i16 CurrentSleepThreadCount = 0; // 16 bits +            i8 Reserved2 = 0; // 8 bits + +            inline i64 ConverToI64() { +                i64 value = (1ll << 34) + OldSemaphore; +                return value +                    | ((i64)Reserved1 << 35) +                    | ((i64)CurrentSleepThreadCount << 40) +                    | ((i64)Reserved2 << 56); +            } + +            static inline TSemaphore GetSemaphore(i64 value) { +                TSemaphore semaphore; +                semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34); +                semaphore.Reserved1 = (value >> 35) & 0x1f; +                semaphore.CurrentSleepThreadCount = (value >> 40) & 0xffff; +                semaphore.Reserved2 = (value >> 56) & 0xff; +                return semaphore; +            } +        }; +          static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;          static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX; @@ -107,5 +140,9 @@ namespace NActors {      private:          void WakeUpLoop(); +        bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); +        void GoToSpin(TThreadCtx& threadCtx); +        bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers); +        bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers);      };  } diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 3ef5808d725..574b0b07593 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -6,10 +6,15 @@  #include <library/cpp/actors/util/should_continue.h>  #include <library/cpp/testing/unittest/registar.h> -#include <library/cpp/actors/protos/unittests.pb.h>  using namespace NActors; +#define VALUES_EQUAL(a, b, ...) \ +        UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \ +                << ' ' << (i64)semaphore.Reserved1 \ +                << ' ' << (i64)semaphore.CurrentSleepThreadCount \ +                << ' ' << (i64)semaphore.Reserved2 __VA_ARGS__); +  ////////////////////////////////////////////////////////////////////////////////  struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> { @@ -90,138 +95,59 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)  Y_UNIT_TEST_SUITE(BasicExecutorPool) { -    Y_UNIT_TEST(DecreaseIncreaseThreadsCount) { -        const size_t msgCount = 1e4; -        const size_t size = 4; -        const size_t halfSize = size / 2; -        TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); +    Y_UNIT_TEST(Semaphore) { +        TBasicExecutorPool::TSemaphore semaphore; +        semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(0); -        auto setup = GetActorSystemSetup(executorPool); -        TActorSystem actorSystem(setup); -        actorSystem.Start(); +        VALUES_EQUAL(0, semaphore.ConverToI64()); +        semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(-1); +        VALUES_EQUAL(-1, semaphore.ConverToI64()); +        semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(1); +        VALUES_EQUAL(1, semaphore.ConverToI64()); -        executorPool->SetThreadCount(halfSize); -        TTestSenderActor* actors[size]; -        TActorId actorIds[size]; -        for (size_t i = 0; i < size; ++i) { -            actors[i] = new TTestSenderActor(); -            actorIds[i] = actorSystem.Register(actors[i]); +        for (i64 value = -1'000'000; value <= 1'000'000; ++value) { +            VALUES_EQUAL(TBasicExecutorPool::TSemaphore::GetSemaphore(value).ConverToI64(), value);          } -        const int testCount = 2; - -        TExecutorPoolStats poolStats[testCount]; -        TVector<TExecutorThreadStats> statsCopy[testCount]; - -        for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { -            for (size_t i = 0; i < size; ++i) { -                actors[i]->Start(actors[i]->SelfId(), msgCount); +        for (i8 sleepThreads = -10; sleepThreads <= 10; ++sleepThreads) { + +            semaphore = TBasicExecutorPool::TSemaphore(); +            semaphore.CurrentSleepThreadCount = sleepThreads; +            i64 initialValue = semaphore.ConverToI64(); + +            semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(initialValue - 1); +            VALUES_EQUAL(-1, semaphore.OldSemaphore); + +            i64 value = initialValue; +            value -= 100; +            for (i32 expected = -100; expected <= 100; ++expected) { +                semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); +                UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore +                        << ' ' << (i64)semaphore.CurrentSleepThreadCount +                        << ' ' << (i64)semaphore.Reserved2); +                UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore +                        << ' ' << (i64)semaphore.CurrentSleepThreadCount +                        << ' ' << (i64)semaphore.Reserved2); +                semaphore = TBasicExecutorPool::TSemaphore(); +                semaphore.OldSemaphore = expected; +                semaphore.CurrentSleepThreadCount = sleepThreads; +                UNIT_ASSERT_VALUES_EQUAL(semaphore.ConverToI64(), value); +                value++;              } -            for (size_t i = 0; i < size; ++i) { -                actorSystem.Send(actorIds[i], new TEvMsg()); -            } - -            Sleep(TDuration::MilliSeconds(100)); -            for (size_t i = 0; i < size; ++i) { -                actors[i]->Stop(); +            for (i32 expected = 101; expected >= -101; --expected) { +                semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); +                UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore +                        << ' ' << (i64)semaphore.CurrentSleepThreadCount +                        << ' ' << (i64)semaphore.Reserved2); +                UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore +                        << ' ' << (i64)semaphore.CurrentSleepThreadCount +                        << ' ' << (i64)semaphore.Reserved2); +                value--;              } - -            executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);          } -        for (size_t i = 1; i <= halfSize; ++i) { -            UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); -        } - -        for (size_t i = halfSize + 1; i <= size; ++i) { -            UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); -        } - -        executorPool->SetThreadCount(size); - -        for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { -            for (size_t i = 0; i < size; ++i) { -                actors[i]->Start(actors[i]->SelfId(), msgCount); -            } -            for (size_t i = 0; i < size; ++i) { -                actorSystem.Send(actorIds[i], new TEvMsg()); -            } - -            Sleep(TDuration::MilliSeconds(100)); - -            for (size_t i = 0; i < size; ++i) { -                actors[i]->Stop(); -            } - -            executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]); -        } - -        for (size_t i = 1; i <= size; ++i) { -            UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents); -        } -    } - -    Y_UNIT_TEST(ChangeCount) { -        const size_t msgCount = 1e3; -        const size_t size = 4; -        const size_t halfSize = size / 2; -        TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50); - -        auto begin = TInstant::Now(); - -        auto setup = GetActorSystemSetup(executorPool); -        TActorSystem actorSystem(setup); -        actorSystem.Start(); -        executorPool->SetThreadCount(halfSize); - -        TTestSenderActor* actors[size]; -        TActorId actorIds[size]; -        for (size_t i = 0; i < size; ++i) { -            actors[i] = new TTestSenderActor(); -            actorIds[i] = actorSystem.Register(actors[i]); -        } - -        for (size_t i = 0; i < size; ++i) { -            actors[i]->Start(actorIds[i], msgCount); -        } -        for (size_t i = 0; i < size; ++i) { -            actorSystem.Send(actorIds[i], new TEvMsg()); -        } - -        const i32 N = 6; -        const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; - -        ui64 counter = 0; - -        TTestSenderActor* changerActor = new TTestSenderActor([&]{ -            executorPool->SetThreadCount(threadsCouns[counter]); -            counter++; -            if (counter == N) { -                counter = 0; -            } -        }); -        TActorId changerActorId = actorSystem.Register(changerActor); -        changerActor->Start(changerActorId, msgCount); -        actorSystem.Send(changerActorId, new TEvMsg()); - -        while (true) { -            size_t maxCounter = 0; -            for (size_t i = 0; i < size; ++i) { -                maxCounter = Max(maxCounter, actors[i]->GetCounter()); -            } - -            if (maxCounter == 0) { -                break; -            } - -            auto now = TInstant::Now(); -            UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); - -            Sleep(TDuration::MilliSeconds(1)); -        } - -        changerActor->Stop(); +        //UNIT_ASSERT_VALUES_EQUAL_C(-1, TBasicExecutorPool::TSemaphore::GetSemaphore(value-1).OldSemaphore);      }      Y_UNIT_TEST(CheckCompleteOne) { @@ -433,3 +359,182 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {          UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0);      }  } + +Y_UNIT_TEST_SUITE(ChangingThreadsCountInBasicExecutorPool) { + +    struct TMockState { +        void ActorDo() {} +    }; + +    struct TTestActors { +        const size_t Count; +        TArrayHolder<TTestSenderActor*> Actors; +        TArrayHolder<TActorId> ActorIds; + +        TTestActors(size_t count) +            : Count(count) +            , Actors(new TTestSenderActor*[count]) +            , ActorIds(new TActorId[count]) +        { } + +        void Start(TActorSystem &actorSystem, size_t msgCount) { +            for (size_t i = 0; i < Count; ++i) { +                Actors[i]->Start(Actors[i]->SelfId(), msgCount); +            } +            for (size_t i = 0; i < Count; ++i) { +                actorSystem.Send(ActorIds[i], new TEvMsg()); +            } +        } + +        void Stop() { +            for (size_t i = 0; i < Count; ++i) { +                Actors[i]->Stop(); +            } +        } +    }; + +    template <typename TState = TMockState> +    struct TTestCtx { +        const size_t MaxThreadCount; +        const size_t SendingMessageCount; +        std::unique_ptr<TBasicExecutorPool> ExecutorPool; +        THolder<TActorSystemSetup> Setup; +        TActorSystem ActorSystem; + +        TState State; + +        TTestCtx(size_t maxThreadCount, size_t sendingMessageCount) +            : MaxThreadCount(maxThreadCount) +            , SendingMessageCount(sendingMessageCount) +            , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50)) +            , Setup(GetActorSystemSetup(ExecutorPool.get())) +            , ActorSystem(Setup) +        { +        } + +        TTestCtx(size_t maxThreadCount, size_t sendingMessageCount, const TState &state) +            : MaxThreadCount(maxThreadCount) +            , SendingMessageCount(sendingMessageCount) +            , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50)) +            , Setup(GetActorSystemSetup(ExecutorPool.get())) +            , ActorSystem(Setup) +            , State(state) +        { +        } + +        ~TTestCtx() { +            ExecutorPool.release(); +        } + +        TTestActors RegisterCheckActors(size_t actorCount) { +            TTestActors res(actorCount); +            for (size_t i = 0; i < actorCount; ++i) { +                res.Actors[i] = new TTestSenderActor([&] { +                    State.ActorDo(); +                }); +                res.ActorIds[i] = ActorSystem.Register(res.Actors[i]); +            } +            return res; +        } +    }; + +    struct TCheckingInFlightState { +        TAtomic ExpectedMaximum = 0; +        TAtomic CurrentInFlight = 0; + +        void ActorStartProcessing() { +            ui32 inFlight = AtomicIncrement(CurrentInFlight); +            ui32 maximum = AtomicGet(ExpectedMaximum); +            if (maximum) { +                UNIT_ASSERT_C(inFlight <= maximum, "inFlight# " << inFlight << " maximum# " << maximum); +            } +        } + +        void ActorStopProcessing() { +            AtomicDecrement(CurrentInFlight); +        } + +        void ActorDo() { +            ActorStartProcessing(); +            NanoSleep(1'000'000); +            ActorStopProcessing(); +        } +    }; + +    Y_UNIT_TEST(DecreaseIncreaseThreadCount) { +        const size_t msgCount = 1e2; +        const size_t size = 4; +        const size_t testCount = 2; +        TTestCtx<TCheckingInFlightState> ctx(size, msgCount); +        ctx.ActorSystem.Start(); + +        TVector<TExecutorThreadStats> statsCopy[testCount]; + +        TTestActors testActors = ctx.RegisterCheckActors(size); + +        const size_t N = 6; +        const size_t threadsCounts[N] = { 1, 3, 2, 3, 1, 4 }; +        for (ui32 idx = 0; idx < 4 * N; ++idx) { +            size_t currentThreadCount = threadsCounts[idx]; +            ctx.ExecutorPool->SetThreadCount(currentThreadCount); +            AtomicSet(ctx.State.ExpectedMaximum, currentThreadCount); + +            for (size_t testIdx = 0; testIdx < testCount; ++testIdx) { +                testActors.Start(ctx.ActorSystem, msgCount); +                Sleep(TDuration::MilliSeconds(100)); +                testActors.Stop(); +            } +            Sleep(TDuration::MilliSeconds(10)); +        } +        ctx.ActorSystem.Stop(); +    } + +    Y_UNIT_TEST(ContiniousChangingThreadCount) { +        const size_t msgCount = 1e2; +        const size_t size = 4; + +        auto begin = TInstant::Now(); +        TTestCtx<TCheckingInFlightState> ctx(size, msgCount, TCheckingInFlightState{msgCount}); +        ctx.ActorSystem.Start(); +        TTestActors testActors = ctx.RegisterCheckActors(size); + +        testActors.Start(ctx.ActorSystem, msgCount); + +        const size_t N = 6; +        const size_t threadsCouns[N] = { 1, 3, 2, 3, 1, 4 }; + +        ui64 counter = 0; + +        TTestSenderActor* changerActor = new TTestSenderActor([&]{ +            ctx.State.ActorStartProcessing(); +            AtomicSet(ctx.State.ExpectedMaximum, 0); +            ctx.ExecutorPool->SetThreadCount(threadsCouns[counter]); +            NanoSleep(10'000'000); +            AtomicSet(ctx.State.ExpectedMaximum, threadsCouns[counter]); +            counter++; +            if (counter == N) { +                counter = 0; +            } +            ctx.State.ActorStopProcessing(); +        }); +        TActorId changerActorId = ctx.ActorSystem.Register(changerActor); +        changerActor->Start(changerActorId, msgCount); +        ctx.ActorSystem.Send(changerActorId, new TEvMsg()); + +        while (true) { +            size_t maxCounter = 0; +            for (size_t i = 0; i < size; ++i) { +                maxCounter = Max(maxCounter, testActors.Actors[i]->GetCounter()); +            } +            if (maxCounter == 0) { +                break; +            } +            auto now = TInstant::Now(); +            UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter); +            Sleep(TDuration::MilliSeconds(1)); +        } + +        changerActor->Stop(); +        ctx.ActorSystem.Stop(); +    } +}  | 
