aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-11-15 13:32:41 +0300
committerkruall <kruall@ydb.tech>2022-11-15 13:32:41 +0300
commitb77b7e9f09f1e17a832b2879f3a5e8cb2e2c072f (patch)
tree92d43929bb664ade3f2c75b474881a7694967b56 /library
parent660fa18bb981ed3fa5b10b6ec2d9248d80e33f87 (diff)
downloadydb-b77b7e9f09f1e17a832b2879f3a5e8cb2e2c072f.tar.gz
Improve semaphore,
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp365
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h37
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp353
3 files changed, 438 insertions, 317 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 4dce16939a..7dff052d3e 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -36,6 +36,8 @@ namespace NActors {
, MaxUtilizationAccumulator(0)
, ThreadCount(threads)
{
+ auto semaphore = TSemaphore();
+ Semaphore = semaphore.ConverToI64();
}
TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg)
@@ -56,126 +58,161 @@ namespace NActors {
Threads.Destroy();
}
+ bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) {
+ do {
+ if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ if (threadCtx.BlockedPad.Park()) // interrupted
+ return true;
+ timers.HPStart = GetCycleCountFast();
+ timers.Blocked += timers.HPStart - timers.HPNow;
+ }
+ } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag));
+ return false;
+ }
+
+ bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) {
+ do {
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ if (threadCtx.Pad.Park()) // interrupted
+ return true;
+ timers.HPStart = GetCycleCountFast();
+ timers.Parked += timers.HPStart - timers.HPNow;
+ } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag));
+ return false;
+ }
+
+ void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) {
+ ui64 start = GetCycleCountFast();
+ bool doSpin = true;
+ while (true) {
+ for (ui32 j = 0; doSpin && j < 12; ++j) {
+ if (GetCycleCountFast() >= (start + SpinThresholdCycles)) {
+ doSpin = false;
+ break;
+ }
+ for (ui32 i = 0; i < 12; ++i) {
+ if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
+ SpinLockPause();
+ } else {
+ doSpin = false;
+ break;
+ }
+ }
+ }
+ if (!doSpin) {
+ break;
+ }
+ if (RelaxedLoad(&StopFlag)) {
+ break;
+ }
+ }
+ }
+
+ bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) {
+#if defined ACTORSLIB_COLLECT_EXEC_STATS
+ if (AtomicGetAndIncrement(ThreadUtilization) == 0) {
+ // Initially counter contains -t0, the pool start timestamp
+ // When the first thread goes to sleep we add t1, so the counter
+ // becomes t1-t0 >= 0, or the duration of max utilization so far.
+ // If the counter was negative and becomes positive, that means
+ // counter just turned into a duration and we should store that
+ // duration. Otherwise another thread raced with us and
+ // subtracted some other timestamp t2.
+ const i64 t = GetCycleCountFast();
+ const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
+ if (x < 0 && x + t > 0)
+ AtomicStore(&MaxUtilizationAccumulator, x + t);
+ }
+#endif
+
+ Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
+
+ if (SpinThreshold > 0 && !needToBlock) {
+ // spin configured period
+ AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE);
+ GoToSpin(threadCtx);
+ // then - sleep
+ if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) {
+ if (GoToSleep(threadCtx, timers)) { // interrupted
+ return true;
+ }
+ }
+ }
+ } else {
+ AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED);
+ if (GoToSleep(threadCtx, timers)) { // interrupted
+ return true;
+ }
+ }
+
+ Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
+
+#if defined ACTORSLIB_COLLECT_EXEC_STATS
+ if (AtomicDecrement(ThreadUtilization) == 0) {
+ // When we started sleeping counter contained t1-t0, or the
+ // last duration of max utilization. Now we subtract t2 >= t1,
+ // which turns counter negative again, and the next sleep cycle
+ // at timestamp t3 would be adding some new duration t3-t2.
+ // If the counter was positive and becomes negative that means
+ // there are no current races with other threads and we should
+ // store the last positive duration we observed. Multiple
+ // threads may be adding and subtracting values in potentially
+ // arbitrary order, which would cause counter to oscillate
+ // around zero. When it crosses zero is a good indication of a
+ // correct value.
+ const i64 t = GetCycleCountFast();
+ const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
+ if (x > 0 && x - t < 0)
+ AtomicStore(&MaxUtilizationAccumulator, x);
+ }
+#endif
+ return false;
+ }
+
ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
ui32 workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
- NHPTimer::STime elapsed = 0;
- NHPTimer::STime parked = 0;
- NHPTimer::STime blocked = 0;
- NHPTimer::STime hpstart = GetCycleCountFast();
- NHPTimer::STime hpnow;
+ TTimers timers;
TThreadCtx& threadCtx = Threads[workerId];
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
if (Y_UNLIKELY(AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE)) {
- do {
- if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.BlockedPad.Park()) // interrupted
- return 0;
- hpstart = GetCycleCountFast();
- blocked += hpstart - hpnow;
- }
- } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !AtomicLoad(&StopFlag));
+ if (GoToBeBlocked(threadCtx, timers)) { // interrupted
+ return 0;
+ }
}
- const TAtomic x = AtomicDecrement(Semaphore);
+ bool needToWait = false;
+ bool needToBlock = false;
- if (x < 0) {
-#if defined ACTORSLIB_COLLECT_EXEC_STATS
- if (AtomicGetAndIncrement(ThreadUtilization) == 0) {
- // Initially counter contains -t0, the pool start timestamp
- // When the first thread goes to sleep we add t1, so the counter
- // becomes t1-t0 >= 0, or the duration of max utilization so far.
- // If the counter was negative and becomes positive, that means
- // counter just turned into a duration and we should store that
- // duration. Otherwise another thread raced with us and
- // subtracted some other timestamp t2.
- const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
- if (x < 0 && x + t > 0)
- AtomicStore(&MaxUtilizationAccumulator, x + t);
- }
-#endif
+ TAtomic x = AtomicGet(Semaphore);
+ do {
+ i64 oldX = x;
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ needToBlock = semaphore.CurrentSleepThreadCount < 0;
+ needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount;
- Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
-
- if (SpinThreshold > 0) {
- // spin configured period
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE);
- ui64 start = GetCycleCountFast();
- bool doSpin = true;
- while (true) {
- for (ui32 j = 0; doSpin && j < 12; ++j) {
- if (GetCycleCountFast() >= (start + SpinThresholdCycles)) {
- doSpin = false;
- break;
- }
- for (ui32 i = 0; i < 12; ++i) {
- if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
- SpinLockPause();
- } else {
- doSpin = false;
- break;
- }
- }
- }
- if (!doSpin) {
- break;
- }
- if (RelaxedLoad(&StopFlag)) {
- break;
- }
- }
- // then - sleep
- if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) {
- do {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.Pad.Park()) // interrupted
- return 0;
- hpstart = GetCycleCountFast();
- parked += hpstart - hpnow;
- } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED);
- }
- }
- } else {
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED);
- do {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- if (threadCtx.Pad.Park()) // interrupted
- return 0;
- hpstart = GetCycleCountFast();
- parked += hpstart - hpnow;
- } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED);
+ semaphore.OldSemaphore--;
+ if (needToWait) {
+ semaphore.CurrentSleepThreadCount++;
}
- Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x);
+ if (x == oldX) {
+ break;
+ }
+ } while (!StopFlag);
-#if defined ACTORSLIB_COLLECT_EXEC_STATS
- if (AtomicDecrement(ThreadUtilization) == 0) {
- // When we started sleeping counter contained t1-t0, or the
- // last duration of max utilization. Now we subtract t2 >= t1,
- // which turns counter negative again, and the next sleep cycle
- // at timestamp t3 would be adding some new duration t3-t2.
- // If the counter was positive and becomes negative that means
- // there are no current races with other threads and we should
- // store the last positive duration we observed. Multiple
- // threads may be adding and subtracting values in potentially
- // arbitrary order, which would cause counter to oscillate
- // around zero. When it crosses zero is a good indication of a
- // correct value.
- const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
- if (x > 0 && x - t < 0)
- AtomicStore(&MaxUtilizationAccumulator, x);
+ if (needToWait) {
+ if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted
+ return 0;
}
-#endif
} else {
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING);
}
@@ -183,14 +220,14 @@ namespace NActors {
// ok, has work suggested, must dequeue
while (!RelaxedLoad(&StopFlag)) {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
- hpnow = GetCycleCountFast();
- elapsed += hpnow - hpstart;
- wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, elapsed);
- if (parked > 0) {
- wctx.AddParkedCycles(parked);
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timers.Elapsed);
+ if (timers.Parked > 0) {
+ wctx.AddParkedCycles(timers.Parked);
}
- if (blocked > 0) {
- wctx.AddBlockedCycles(blocked);
+ if (timers.Blocked > 0) {
+ wctx.AddBlockedCycles(timers.Blocked);
}
return activation;
}
@@ -228,8 +265,25 @@ namespace NActors {
void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
Activations.Push(activation, revolvingCounter);
- const TAtomic x = AtomicIncrement(Semaphore);
- if (x <= 0) { // we must find someone to wake-up
+ bool needToWakeUp = false;
+
+ TAtomic x = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ do {
+ needToWakeUp = semaphore.CurrentSleepThreadCount > 0;
+ i64 oldX = semaphore.ConverToI64();
+ semaphore.OldSemaphore++;
+ if (needToWakeUp) {
+ semaphore.CurrentSleepThreadCount--;
+ }
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), oldX);
+ if (x == oldX) {
+ break;
+ }
+ semaphore = TSemaphore::GetSemaphore(x);
+ } while (true);
+
+ if (needToWakeUp) { // we must find someone to wake-up
WakeUpLoop();
}
}
@@ -345,87 +399,12 @@ namespace NActors {
with_lock (ChangeThreadsLock) {
size_t prevCount = GetThreadCount();
AtomicSet(ThreadCount, threads);
- if (prevCount < threads) {
- for (size_t i = prevCount; i < threads; ++i) {
- bool repeat = true;
- while (repeat) {
- switch (AtomicGet(Threads[i].BlockedFlag)) {
- case TThreadCtx::BS_BLOCKING:
- if (AtomicCas(&Threads[i].BlockedFlag, TThreadCtx::BS_NONE, TThreadCtx::BS_BLOCKING)) {
- // thread not entry to blocked loop
- repeat = false;
- }
- break;
- case TThreadCtx::BS_BLOCKED:
- // thread entry to blocked loop and we wake it
- AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_NONE);
- Threads[i].BlockedPad.Unpark();
- repeat = false;
- break;
- default:
- // thread mustn't has TThreadCtx::BS_NONE because last time it was started to block
- Y_FAIL("BlockedFlag is not TThreadCtx::BS_BLOCKING and TThreadCtx::BS_BLOCKED when thread was waked up");
- }
- }
- }
- } else if (prevCount > threads) {
- // at first, start to block
- for (size_t i = threads; i < prevCount; ++i) {
- Y_VERIFY(AtomicGet(Threads[i].BlockedFlag) == TThreadCtx::BS_NONE);
- AtomicSet(Threads[i].BlockedFlag, TThreadCtx::BS_BLOCKING);
- }
- // after check need to wake up threads
- for (size_t idx = threads; idx < prevCount; ++idx) {
- TThreadCtx& threadCtx = Threads[idx];
- auto waitingFlag = AtomicGet(threadCtx.WaitingFlag);
- auto blockedFlag = AtomicGet(threadCtx.BlockedFlag);
- // while thread has this states (WS_NONE and BS_BLOCKING) we can't guess which way thread will go.
- // Either go to sleep and it will have to wake up,
- // or go to execute task and after completion will be blocked.
- while (waitingFlag == TThreadCtx::WS_NONE && blockedFlag == TThreadCtx::BS_BLOCKING) {
- waitingFlag = AtomicGet(threadCtx.WaitingFlag);
- blockedFlag = AtomicGet(threadCtx.BlockedFlag);
- }
- // next states:
- // 1) WS_ACTIVE BS_BLOCKING - waiting and start spinig | need wake up to block
- // 2) WS_BLOCKED BS_BLOCKING - waiting and start sleep | need wake up to block
- // 3) WS_RUNNING BS_BLOCKING - start execute | not need wake up, will block after executing
- // 4) WS_NONE BS_BLOCKED - blocked | not need wake up, already blocked
-
- if (waitingFlag == TThreadCtx::WS_ACTIVE || waitingFlag == TThreadCtx::WS_BLOCKED) {
- // need wake up
- Y_VERIFY(blockedFlag == TThreadCtx::BS_BLOCKING);
-
- // creaty empty mailBoxHint, where LineIndex == 1 and LineHint == 0, and activations will be ignored
- constexpr auto emptyMailBoxHint = TMailboxTable::LineIndexMask & -TMailboxTable::LineIndexMask;
- ui64 revolvingCounter = AtomicGet(ActivationsRevolvingCounter);
-
- Activations.Push(emptyMailBoxHint, revolvingCounter);
-
- auto x = AtomicIncrement(Semaphore);
- if (x <= 0) {
- // try wake up. if success then go to next thread
- switch (waitingFlag){
- case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
- continue;
- }
- break;
- case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
- threadCtx.Pad.Unpark();
- continue;
- }
- break;
- default:
- ; // other thread woke this sleeping thread
- }
- // if thread has already been awakened then we must awaken the other
- WakeUpLoop();
- }
- }
- }
- }
+
+ TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore));
+ i64 oldX = semaphore.ConverToI64();
+ semaphore.CurrentSleepThreadCount += threads - prevCount;
+ semaphore.OldSemaphore -= threads - prevCount;
+ AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX);
}
}
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 023190f7fe..e65ad20a48 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -45,6 +45,14 @@ namespace NActors {
}
};
+ struct TTimers {
+ NHPTimer::STime Elapsed = 0;
+ NHPTimer::STime Parked = 0;
+ NHPTimer::STime Blocked = 0;
+ NHPTimer::STime HPStart = GetCycleCountFast();
+ NHPTimer::STime HPNow;
+ };
+
const ui64 SpinThreshold;
const ui64 SpinThresholdCycles;
@@ -67,6 +75,31 @@ namespace NActors {
TMutex ChangeThreadsLock;
public:
+ struct TSemaphore {
+ i64 OldSemaphore = 0; // 34 bits
+ // Sign bit
+ i8 Reserved1 = 0; // 5 bits
+ i16 CurrentSleepThreadCount = 0; // 16 bits
+ i8 Reserved2 = 0; // 8 bits
+
+ inline i64 ConverToI64() {
+ i64 value = (1ll << 34) + OldSemaphore;
+ return value
+ | ((i64)Reserved1 << 35)
+ | ((i64)CurrentSleepThreadCount << 40)
+ | ((i64)Reserved2 << 56);
+ }
+
+ static inline TSemaphore GetSemaphore(i64 value) {
+ TSemaphore semaphore;
+ semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34);
+ semaphore.Reserved1 = (value >> 35) & 0x1f;
+ semaphore.CurrentSleepThreadCount = (value >> 40) & 0xffff;
+ semaphore.Reserved2 = (value >> 56) & 0xff;
+ return semaphore;
+ }
+ };
+
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX;
@@ -107,5 +140,9 @@ namespace NActors {
private:
void WakeUpLoop();
+ bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock);
+ void GoToSpin(TThreadCtx& threadCtx);
+ bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers);
+ bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers);
};
}
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 3ef5808d72..574b0b0759 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -6,10 +6,15 @@
#include <library/cpp/actors/util/should_continue.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/actors/protos/unittests.pb.h>
using namespace NActors;
+#define VALUES_EQUAL(a, b, ...) \
+ UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \
+ << ' ' << (i64)semaphore.Reserved1 \
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount \
+ << ' ' << (i64)semaphore.Reserved2 __VA_ARGS__);
+
////////////////////////////////////////////////////////////////////////////////
struct TEvMsg : public NActors::TEventBase<TEvMsg, 10347> {
@@ -90,138 +95,59 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)
Y_UNIT_TEST_SUITE(BasicExecutorPool) {
- Y_UNIT_TEST(DecreaseIncreaseThreadsCount) {
- const size_t msgCount = 1e4;
- const size_t size = 4;
- const size_t halfSize = size / 2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
+ Y_UNIT_TEST(Semaphore) {
+ TBasicExecutorPool::TSemaphore semaphore;
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(0);
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
+ VALUES_EQUAL(0, semaphore.ConverToI64());
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(-1);
+ VALUES_EQUAL(-1, semaphore.ConverToI64());
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(1);
+ VALUES_EQUAL(1, semaphore.ConverToI64());
- executorPool->SetThreadCount(halfSize);
- TTestSenderActor* actors[size];
- TActorId actorIds[size];
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
+ for (i64 value = -1'000'000; value <= 1'000'000; ++value) {
+ VALUES_EQUAL(TBasicExecutorPool::TSemaphore::GetSemaphore(value).ConverToI64(), value);
}
- const int testCount = 2;
-
- TExecutorPoolStats poolStats[testCount];
- TVector<TExecutorThreadStats> statsCopy[testCount];
-
- for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
+ for (i8 sleepThreads = -10; sleepThreads <= 10; ++sleepThreads) {
+
+ semaphore = TBasicExecutorPool::TSemaphore();
+ semaphore.CurrentSleepThreadCount = sleepThreads;
+ i64 initialValue = semaphore.ConverToI64();
+
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(initialValue - 1);
+ VALUES_EQUAL(-1, semaphore.OldSemaphore);
+
+ i64 value = initialValue;
+ value -= 100;
+ for (i32 expected = -100; expected <= 100; ++expected) {
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
+ UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.Reserved2);
+ UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.Reserved2);
+ semaphore = TBasicExecutorPool::TSemaphore();
+ semaphore.OldSemaphore = expected;
+ semaphore.CurrentSleepThreadCount = sleepThreads;
+ UNIT_ASSERT_VALUES_EQUAL(semaphore.ConverToI64(), value);
+ value++;
}
- for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- Sleep(TDuration::MilliSeconds(100));
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Stop();
+ for (i32 expected = 101; expected >= -101; --expected) {
+ semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
+ UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.Reserved2);
+ UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
+ << ' ' << (i64)semaphore.CurrentSleepThreadCount
+ << ' ' << (i64)semaphore.Reserved2);
+ value--;
}
-
- executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
}
- for (size_t i = 1; i <= halfSize; ++i) {
- UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
-
- for (size_t i = halfSize + 1; i <= size; ++i) {
- UNIT_ASSERT_EQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
-
- executorPool->SetThreadCount(size);
-
- for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actors[i]->SelfId(), msgCount);
- }
- for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- Sleep(TDuration::MilliSeconds(100));
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Stop();
- }
-
- executorPool->GetCurrentStats(poolStats[testIdx], statsCopy[testIdx]);
- }
-
- for (size_t i = 1; i <= size; ++i) {
- UNIT_ASSERT_UNEQUAL(statsCopy[0][i].ReceivedEvents, statsCopy[1][i].ReceivedEvents);
- }
- }
-
- Y_UNIT_TEST(ChangeCount) {
- const size_t msgCount = 1e3;
- const size_t size = 4;
- const size_t halfSize = size / 2;
- TBasicExecutorPool* executorPool = new TBasicExecutorPool(0, size, 50);
-
- auto begin = TInstant::Now();
-
- auto setup = GetActorSystemSetup(executorPool);
- TActorSystem actorSystem(setup);
- actorSystem.Start();
- executorPool->SetThreadCount(halfSize);
-
- TTestSenderActor* actors[size];
- TActorId actorIds[size];
- for (size_t i = 0; i < size; ++i) {
- actors[i] = new TTestSenderActor();
- actorIds[i] = actorSystem.Register(actors[i]);
- }
-
- for (size_t i = 0; i < size; ++i) {
- actors[i]->Start(actorIds[i], msgCount);
- }
- for (size_t i = 0; i < size; ++i) {
- actorSystem.Send(actorIds[i], new TEvMsg());
- }
-
- const i32 N = 6;
- const i32 threadsCouns[N] = { 1, 3, 2, 3, 1, 4 };
-
- ui64 counter = 0;
-
- TTestSenderActor* changerActor = new TTestSenderActor([&]{
- executorPool->SetThreadCount(threadsCouns[counter]);
- counter++;
- if (counter == N) {
- counter = 0;
- }
- });
- TActorId changerActorId = actorSystem.Register(changerActor);
- changerActor->Start(changerActorId, msgCount);
- actorSystem.Send(changerActorId, new TEvMsg());
-
- while (true) {
- size_t maxCounter = 0;
- for (size_t i = 0; i < size; ++i) {
- maxCounter = Max(maxCounter, actors[i]->GetCounter());
- }
-
- if (maxCounter == 0) {
- break;
- }
-
- auto now = TInstant::Now();
- UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
-
- Sleep(TDuration::MilliSeconds(1));
- }
-
- changerActor->Stop();
+ //UNIT_ASSERT_VALUES_EQUAL_C(-1, TBasicExecutorPool::TSemaphore::GetSemaphore(value-1).OldSemaphore);
}
Y_UNIT_TEST(CheckCompleteOne) {
@@ -433,3 +359,182 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].MailboxPushedOutBySoftPreemption, 0);
}
}
+
+Y_UNIT_TEST_SUITE(ChangingThreadsCountInBasicExecutorPool) {
+
+ struct TMockState {
+ void ActorDo() {}
+ };
+
+ struct TTestActors {
+ const size_t Count;
+ TArrayHolder<TTestSenderActor*> Actors;
+ TArrayHolder<TActorId> ActorIds;
+
+ TTestActors(size_t count)
+ : Count(count)
+ , Actors(new TTestSenderActor*[count])
+ , ActorIds(new TActorId[count])
+ { }
+
+ void Start(TActorSystem &actorSystem, size_t msgCount) {
+ for (size_t i = 0; i < Count; ++i) {
+ Actors[i]->Start(Actors[i]->SelfId(), msgCount);
+ }
+ for (size_t i = 0; i < Count; ++i) {
+ actorSystem.Send(ActorIds[i], new TEvMsg());
+ }
+ }
+
+ void Stop() {
+ for (size_t i = 0; i < Count; ++i) {
+ Actors[i]->Stop();
+ }
+ }
+ };
+
+ template <typename TState = TMockState>
+ struct TTestCtx {
+ const size_t MaxThreadCount;
+ const size_t SendingMessageCount;
+ std::unique_ptr<TBasicExecutorPool> ExecutorPool;
+ THolder<TActorSystemSetup> Setup;
+ TActorSystem ActorSystem;
+
+ TState State;
+
+ TTestCtx(size_t maxThreadCount, size_t sendingMessageCount)
+ : MaxThreadCount(maxThreadCount)
+ , SendingMessageCount(sendingMessageCount)
+ , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50))
+ , Setup(GetActorSystemSetup(ExecutorPool.get()))
+ , ActorSystem(Setup)
+ {
+ }
+
+ TTestCtx(size_t maxThreadCount, size_t sendingMessageCount, const TState &state)
+ : MaxThreadCount(maxThreadCount)
+ , SendingMessageCount(sendingMessageCount)
+ , ExecutorPool(new TBasicExecutorPool(0, MaxThreadCount, 50))
+ , Setup(GetActorSystemSetup(ExecutorPool.get()))
+ , ActorSystem(Setup)
+ , State(state)
+ {
+ }
+
+ ~TTestCtx() {
+ ExecutorPool.release();
+ }
+
+ TTestActors RegisterCheckActors(size_t actorCount) {
+ TTestActors res(actorCount);
+ for (size_t i = 0; i < actorCount; ++i) {
+ res.Actors[i] = new TTestSenderActor([&] {
+ State.ActorDo();
+ });
+ res.ActorIds[i] = ActorSystem.Register(res.Actors[i]);
+ }
+ return res;
+ }
+ };
+
+ struct TCheckingInFlightState {
+ TAtomic ExpectedMaximum = 0;
+ TAtomic CurrentInFlight = 0;
+
+ void ActorStartProcessing() {
+ ui32 inFlight = AtomicIncrement(CurrentInFlight);
+ ui32 maximum = AtomicGet(ExpectedMaximum);
+ if (maximum) {
+ UNIT_ASSERT_C(inFlight <= maximum, "inFlight# " << inFlight << " maximum# " << maximum);
+ }
+ }
+
+ void ActorStopProcessing() {
+ AtomicDecrement(CurrentInFlight);
+ }
+
+ void ActorDo() {
+ ActorStartProcessing();
+ NanoSleep(1'000'000);
+ ActorStopProcessing();
+ }
+ };
+
+ Y_UNIT_TEST(DecreaseIncreaseThreadCount) {
+ const size_t msgCount = 1e2;
+ const size_t size = 4;
+ const size_t testCount = 2;
+ TTestCtx<TCheckingInFlightState> ctx(size, msgCount);
+ ctx.ActorSystem.Start();
+
+ TVector<TExecutorThreadStats> statsCopy[testCount];
+
+ TTestActors testActors = ctx.RegisterCheckActors(size);
+
+ const size_t N = 6;
+ const size_t threadsCounts[N] = { 1, 3, 2, 3, 1, 4 };
+ for (ui32 idx = 0; idx < 4 * N; ++idx) {
+ size_t currentThreadCount = threadsCounts[idx];
+ ctx.ExecutorPool->SetThreadCount(currentThreadCount);
+ AtomicSet(ctx.State.ExpectedMaximum, currentThreadCount);
+
+ for (size_t testIdx = 0; testIdx < testCount; ++testIdx) {
+ testActors.Start(ctx.ActorSystem, msgCount);
+ Sleep(TDuration::MilliSeconds(100));
+ testActors.Stop();
+ }
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ ctx.ActorSystem.Stop();
+ }
+
+ Y_UNIT_TEST(ContiniousChangingThreadCount) {
+ const size_t msgCount = 1e2;
+ const size_t size = 4;
+
+ auto begin = TInstant::Now();
+ TTestCtx<TCheckingInFlightState> ctx(size, msgCount, TCheckingInFlightState{msgCount});
+ ctx.ActorSystem.Start();
+ TTestActors testActors = ctx.RegisterCheckActors(size);
+
+ testActors.Start(ctx.ActorSystem, msgCount);
+
+ const size_t N = 6;
+ const size_t threadsCouns[N] = { 1, 3, 2, 3, 1, 4 };
+
+ ui64 counter = 0;
+
+ TTestSenderActor* changerActor = new TTestSenderActor([&]{
+ ctx.State.ActorStartProcessing();
+ AtomicSet(ctx.State.ExpectedMaximum, 0);
+ ctx.ExecutorPool->SetThreadCount(threadsCouns[counter]);
+ NanoSleep(10'000'000);
+ AtomicSet(ctx.State.ExpectedMaximum, threadsCouns[counter]);
+ counter++;
+ if (counter == N) {
+ counter = 0;
+ }
+ ctx.State.ActorStopProcessing();
+ });
+ TActorId changerActorId = ctx.ActorSystem.Register(changerActor);
+ changerActor->Start(changerActorId, msgCount);
+ ctx.ActorSystem.Send(changerActorId, new TEvMsg());
+
+ while (true) {
+ size_t maxCounter = 0;
+ for (size_t i = 0; i < size; ++i) {
+ maxCounter = Max(maxCounter, testActors.Actors[i]->GetCounter());
+ }
+ if (maxCounter == 0) {
+ break;
+ }
+ auto now = TInstant::Now();
+ UNIT_ASSERT_C(now - begin < TDuration::Seconds(5), "Max counter is " << maxCounter);
+ Sleep(TDuration::MilliSeconds(1));
+ }
+
+ changerActor->Stop();
+ ctx.ActorSystem.Stop();
+ }
+}