diff options
| author | arkady-e1ppa <[email protected]> | 2024-07-23 00:21:25 +0300 |
|---|---|---|
| committer | arkady-e1ppa <[email protected]> | 2024-07-23 00:31:10 +0300 |
| commit | bf4bccbe064e695aa38836745858955613a416fd (patch) | |
| tree | d5c2e5a6f566684f532553fffccf04498076688c | |
| parent | 9a1971ccc4226a82034c2b820a6520e12f779e09 (diff) | |
Issues of rXXXXXX
a42c1a186ad053457477b49f944c23ce13e05372
| -rw-r--r-- | yt/yt/core/concurrency/async_looper.cpp | 70 | ||||
| -rw-r--r-- | yt/yt/core/concurrency/async_looper.h | 18 | ||||
| -rw-r--r-- | yt/yt/core/concurrency/unittests/async_looper_ut.cpp | 232 |
3 files changed, 167 insertions, 153 deletions
diff --git a/yt/yt/core/concurrency/async_looper.cpp b/yt/yt/core/concurrency/async_looper.cpp index 05ad633c1d9..e347e3fc2f1 100644 --- a/yt/yt/core/concurrency/async_looper.cpp +++ b/yt/yt/core/concurrency/async_looper.cpp @@ -12,12 +12,11 @@ TAsyncLooper::TAsyncLooper( IInvokerPtr invoker, TCallback<TFuture<void>(bool)> asyncStart, TCallback<void(bool)> syncFinish, - TString name, const NLogging::TLogger& logger) : Invoker_(std::move(invoker)) , AsyncStart_(std::move(asyncStart)) , SyncFinish_(std::move(syncFinish)) - , Logger(logger.WithTag("Looper: %v", name)) + , Logger(logger) { YT_VERIFY(Invoker_); YT_VERIFY(Invoker_ != GetSyncInvoker()); @@ -29,7 +28,7 @@ void TAsyncLooper::Start() { auto traceContext = NTracing::GetOrCreateTraceContext("LooperStart"); auto traceGuard = NTracing::TCurrentTraceContextGuard(traceContext); - YT_LOG_DEBUG("Requesting AsyncLooper to start"); + YT_LOG_DEBUG("Requesting looper to start"); Invoker_->Invoke( BIND(&TAsyncLooper::DoStart, MakeStrong(this))); @@ -73,7 +72,7 @@ void TAsyncLooper::DoStart() // will bail out due to epoch mismatch -> we are free to // ignore this case on our side. State_ = EState::Running; - YT_LOG_DEBUG("Starting the looper"); + YT_LOG_DEBUG("Starting looper"); StartLoop(/*cleanStart*/ true, guard); break; @@ -271,40 +270,45 @@ void TAsyncLooper::FinishStep(bool wasRestarted) void TAsyncLooper::Stop() { - auto guard = Guard(StateLock_); + TFuture<void> future; - if (State_ == EState::NotRunning) { - // Already stopping - // -> bail out. - YT_LOG_DEBUG("Trying to stop looper that is already stopped"); - return; - } + { + auto guard = Guard(StateLock_); - State_ = EState::NotRunning; - ++EpochNumber_; - YT_LOG_DEBUG("Stopping the looper"); - - // We could be in one of three possible situations (for each stage): - // 1. EStage::AsyncBusy -- |StartLoop| caller will observe - // state |NotRunning| eventually and will cancel the future for us - // (or there will be a restart which is also handled by caller above). - // 2. EStage::Idle -- intermission between async and sync parts - // -- if there is not restart, sync part will just bail out - // if there is a restart, sync part will observe a different epoch - // and bail out as well. - // 3. EStage::Busy -- sync part is running and will simply not - // continue the chain once it observes NotRunning state. - - if (!Future_) { - // If there was a null future produced for async part - // or simply while Stage_ == EStage::AsyncBusy. - return; + if (State_ == EState::NotRunning) { + // Already stopping + // -> bail out. + YT_LOG_DEBUG("Trying to stop looper that is already stopped"); + return; + } + + State_ = EState::NotRunning; + ++EpochNumber_; + YT_LOG_DEBUG("Stopping the looper"); + + // We could be in one of three possible situations (for each stage): + // 1. EStage::AsyncBusy -- |StartLoop| caller will observe + // state |NotRunning| eventually and will cancel the future for us + // (or there will be a restart which is also handled by caller above). + // 2. EStage::Idle -- intermission between async and sync parts + // -- if there is not restart, sync part will just bail out + // if there is a restart, sync part will observe a different epoch + // and bail out as well. + // 3. EStage::Busy -- sync part is running and will simply not + // continue the chain once it observes NotRunning state. + + if (!Future_) { + // If there was a null future produced for async part + // or simply while Stage_ == EStage::AsyncBusy. + return; + } + + future = std::exchange(Future_, TFuture<void>()); } - Future_.Cancel(TError("Looper stopped")); - Future_.Reset(); + future.Cancel(TError("Looper stopped")); } //////////////////////////////////////////////////////////////////////////////// -} // NYT::NConcurrency +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/async_looper.h b/yt/yt/core/concurrency/async_looper.h index 3f356d267b5..93b31a7dc86 100644 --- a/yt/yt/core/concurrency/async_looper.h +++ b/yt/yt/core/concurrency/async_looper.h @@ -10,11 +10,9 @@ namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// -// Class which indefenetely runs -// two tasks: +// Class which indefenetely runs two tasks: // 1. Async start which creates async action. -// 2. Sync finish which is called after async action -// is finished. +// 2. Sync finish which is called after async action is finished. // Both are ran in Invoker_. // Both |Start| and |Stop| are completely // thread-safe, can be executed in any order and any number of times. @@ -27,8 +25,7 @@ public: IInvokerPtr invoker, TCallback<TFuture<void>(bool cleanStart)> asyncStart, TCallback<void(bool cleanStart)> syncFinish, - TString name, - const NLogging::TLogger& logger = NLogging::TLogger("Looper logger")); + const NLogging::TLogger& logger = NLogging::TLogger("AsyncLooper")); // Starts polling. // First loop will have cleanStart == true @@ -41,8 +38,9 @@ public: private: const IInvokerPtr Invoker_; - TCallback<TFuture<void>(bool)> AsyncStart_; - TCallback<void(bool)> SyncFinish_; + const TCallback<TFuture<void>(bool)> AsyncStart_; + const TCallback<void(bool)> SyncFinish_; + const NLogging::TLogger Logger; YT_DECLARE_SPIN_LOCK(NYT::NThreading::TSpinLock, StateLock_); using TGuard = TGuard<NYT::NThreading::TSpinLock>; @@ -75,8 +73,6 @@ private: ui64 EpochNumber_ = 0; TFuture<void> Future_; - const NLogging::TLogger Logger; - void DoStart(); void StartLoop(bool cleanStart, const TGuard& guard); @@ -89,4 +85,4 @@ DEFINE_REFCOUNTED_TYPE(TAsyncLooper); //////////////////////////////////////////////////////////////////////////////// -} // NYT::NConcurrency +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/async_looper_ut.cpp b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp index 0b48079a901..ca9d9fe2d12 100644 --- a/yt/yt/core/concurrency/unittests/async_looper_ut.cpp +++ b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp @@ -7,13 +7,13 @@ #include <library/cpp/yt/threading/event_count.h> +#include <thread> + namespace NYT::NConcurrency { namespace { //////////////////////////////////////////////////////////////////////////////// -const TString LooperName = "TestLooper"; - // TODO(arkady-e1ppa): Add ManualInvoker which only runs callbacks when // manually requested. Add test when Stop/Restart occurs during the // intermission between Async and Sync steps. @@ -22,13 +22,13 @@ TEST(TAsyncLooperTest, JustWorks) { auto queue = New<TActionQueue>(); - TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { VERIFY_INVOKER_AFFINITY(invoker); return BIND([] {}).AsyncVia(invoker).Run(); }); auto progress = std::make_shared<std::atomic<int>>(0); - TCallback syncFinish = BIND([progress, invoker = queue->GetInvoker()] (bool) { + auto syncFinish = BIND([progress, invoker = queue->GetInvoker()] (bool) { VERIFY_INVOKER_AFFINITY(invoker); progress->fetch_add(1); }); @@ -38,19 +38,22 @@ TEST(TAsyncLooperTest, JustWorks) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); Sleep(TDuration::Seconds(1)); EXPECT_EQ(currentProgress, progress->load()); looper->Start(); - while (currentProgress == progress->load()); + while (currentProgress == progress->load()) { + Sleep(TDuration::MilliSeconds(1)); + } currentProgress = progress->load(); - while (currentProgress == progress->load()); + while (currentProgress == progress->load()) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } @@ -59,12 +62,12 @@ TEST(TAsyncLooperTest, Restart) { auto queue = New<TActionQueue>(); - TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { return BIND([] {}).AsyncVia(invoker).Run(); }); auto cleanStarts = std::make_shared<std::atomic<int>>(0); - TCallback syncFinish = BIND([cleanStarts] (bool cleanStart) { + auto syncFinish = BIND([cleanStarts] (bool cleanStart) { if (cleanStart) { cleanStarts->fetch_add(1); } @@ -73,12 +76,13 @@ TEST(TAsyncLooperTest, Restart) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); - while (cleanStarts->load() == 0); + while (cleanStarts->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } EXPECT_EQ(cleanStarts->load(), 1); @@ -90,7 +94,9 @@ TEST(TAsyncLooperTest, Restart) looper->Start(); - while (cleanStarts->load() == 1); + while (cleanStarts->load() == 1) { + Sleep(TDuration::MilliSeconds(1)); + } EXPECT_EQ(cleanStarts->load(), 2); @@ -104,7 +110,7 @@ TEST(TAsyncLooperTest, CancelAsyncStep) NThreading::TEvent started; auto promise = NewPromise<void>(); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), promise, &started] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), promise, &started] (bool) { return BIND([promise, &started] { started.NotifyAll(); WaitFor(promise.ToFuture()) @@ -112,14 +118,13 @@ TEST(TAsyncLooperTest, CancelAsyncStep) }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([] (bool) { + auto syncFinish = BIND([] (bool) { }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -137,12 +142,12 @@ TEST(TAsyncLooperTest, CancelSyncStep) NThreading::TEvent started; auto promise = NewPromise<void>(); - TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { return BIND([] { }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([promise, &started] (bool) { + auto syncFinish = BIND([promise, &started] (bool) { started.NotifyAll(); WaitFor(promise.ToFuture()) .ThrowOnError(); @@ -151,8 +156,7 @@ TEST(TAsyncLooperTest, CancelSyncStep) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -172,7 +176,7 @@ TEST(TAsyncLooperTest, StopDuringAsyncStep) NThreading::TEvent releaseAsync; NThreading::TEvent started; - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started] (bool) { return BIND([&releaseAsync, &started] { started.NotifyAll(); releaseAsync.Wait(); @@ -180,15 +184,14 @@ TEST(TAsyncLooperTest, StopDuringAsyncStep) }); auto mustBeFalse = std::make_shared<std::atomic<bool>>(false); - TCallback syncFinish = BIND([mustBeFalse] (bool) { + auto syncFinish = BIND([mustBeFalse] (bool) { mustBeFalse->store(true); }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -215,7 +218,7 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepWaitFor) auto releaseAsync = NewPromise<void>(); NThreading::TEvent started; - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &started, releaseAsync] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), &started, releaseAsync] (bool) { return BIND([releaseAsync, &started] { started.NotifyAll(); WaitFor(releaseAsync.ToFuture()) @@ -224,15 +227,14 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepWaitFor) }); auto mustBeFalse = std::make_shared<std::atomic<bool>>(false); - TCallback syncFinish = BIND([mustBeFalse] (bool) { + auto syncFinish = BIND([mustBeFalse] (bool) { mustBeFalse->store(true); }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -260,36 +262,39 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStep) // ignore cancelation in this test. NThreading::TEvent releaseAsync; - auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + auto asyncRunCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, asyncRunsCount] (bool) { - return BIND([&releaseAsync, asyncRunsCount] { - asyncRunsCount->fetch_add(1); + auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, asyncRunCount] (bool) { + return BIND([&releaseAsync, asyncRunCount] { + asyncRunCount->fetch_add(1); releaseAsync.Wait(); }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([] (bool) { + auto syncFinish = BIND([] (bool) { }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); - while (asyncRunsCount->load() == 0); + while (asyncRunCount->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } - EXPECT_EQ(asyncRunsCount->load(), 1); + EXPECT_EQ(asyncRunCount->load(), 1); looper->Stop(); looper->Start(); releaseAsync.NotifyAll(); - while (asyncRunsCount->load() == 1); + while (asyncRunCount->load() == 1) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } @@ -300,37 +305,40 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepWaitFor) auto releaseAsync = NewPromise<void>(); - auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + auto asyncRunCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), releaseAsync, asyncRunsCount] (bool) { - return BIND([releaseAsync, asyncRunsCount] { - asyncRunsCount->fetch_add(1); + auto asyncStart = BIND([invoker = queue->GetInvoker(), releaseAsync, asyncRunCount] (bool) { + return BIND([releaseAsync, asyncRunCount] { + asyncRunCount->fetch_add(1); WaitFor(releaseAsync.ToFuture()) .ThrowOnError(); }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([] (bool) { + auto syncFinish = BIND([] (bool) { }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); - while (asyncRunsCount->load() == 0); + while (asyncRunCount->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } - EXPECT_EQ(asyncRunsCount->load(), 1); + EXPECT_EQ(asyncRunCount->load(), 1); looper->Stop(); looper->Start(); releaseAsync.Set(); - while (asyncRunsCount->load() == 1); + while (asyncRunCount->load() == 1) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } @@ -345,7 +353,7 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepPreparation) NThreading::TEvent started; auto mustBeFalse = std::make_shared<std::atomic<bool>>(false); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, mustBeFalse] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, mustBeFalse] (bool) { started.NotifyAll(); releaseAsync.Wait(); @@ -358,14 +366,13 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepPreparation) }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([] (bool) { + auto syncFinish = BIND([] (bool) { }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); queue->GetInvoker()->Invoke(BIND([looper] { looper->Start(); @@ -396,24 +403,23 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation1) NThreading::TEvent releaseAsync; NThreading::TEvent started; - auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + auto asyncRunCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, asyncRunsCount] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, asyncRunCount] (bool) { started.NotifyAll(); releaseAsync.Wait(); - return BIND([asyncRunsCount] { - asyncRunsCount->fetch_add(1); + return BIND([asyncRunCount] { + asyncRunCount->fetch_add(1); }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([] (bool) { + auto syncFinish = BIND([] (bool) { }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -424,7 +430,9 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation1) releaseAsync.NotifyAll(); - while (asyncRunsCount->load() == 0); + while (asyncRunCount->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } @@ -441,7 +449,7 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2) auto asyncCleanStarts = std::make_shared<std::atomic<int>>(0); auto syncCleanStarts = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &secondIterationStarted, asyncCleanStarts, syncCleanStarts] (bool cleanStart) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &secondIterationStarted, asyncCleanStarts, syncCleanStarts] (bool cleanStart) { if (cleanStart) { asyncCleanStarts->fetch_add(1); } @@ -456,7 +464,7 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2) }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([syncCleanStarts] (bool cleanStart) { + auto syncFinish = BIND([syncCleanStarts] (bool cleanStart) { if (cleanStart) { syncCleanStarts->fetch_add(1); } @@ -465,8 +473,7 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -480,7 +487,9 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2) releaseAsync.NotifyAll(); - while (syncCleanStarts->load() == 1); + while (syncCleanStarts->load() == 1) { + Sleep(TDuration::MilliSeconds(1)); + } EXPECT_EQ(asyncCleanStarts->load(), 2); EXPECT_EQ(syncCleanStarts->load(), 2); @@ -497,14 +506,14 @@ TEST(TAsyncLooperTest, StopDuringSyncStep) NThreading::TEvent releaseAsync; NThreading::TEvent started; - auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) { - return BIND([asyncRunsCount] { - asyncRunsCount->fetch_add(1); + auto asyncRunCount = std::make_shared<std::atomic<int>>(0); + auto asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunCount] (bool) { + return BIND([asyncRunCount] { + asyncRunCount->fetch_add(1); }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([&releaseAsync, &started] (bool) { + auto syncFinish = BIND([&releaseAsync, &started] (bool) { started.NotifyAll(); releaseAsync.Wait(); }); @@ -512,8 +521,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStep) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -530,7 +538,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStep) // Ensure queue is empty queue->Shutdown(/*graceful*/ true); - EXPECT_EQ(asyncRunsCount->load(), 1); + EXPECT_EQ(asyncRunCount->load(), 1); } TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor) @@ -542,14 +550,14 @@ TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor) auto releaseAsync = NewPromise<void>(); NThreading::TEvent started; - auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) { - return BIND([asyncRunsCount] { - asyncRunsCount->fetch_add(1); + auto asyncRunCount = std::make_shared<std::atomic<int>>(0); + auto asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunCount] (bool) { + return BIND([asyncRunCount] { + asyncRunCount->fetch_add(1); }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([releaseAsync, &started] (bool) { + auto syncFinish = BIND([releaseAsync, &started] (bool) { started.NotifyAll(); WaitFor(releaseAsync.ToFuture()) .ThrowOnError(); @@ -558,8 +566,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); @@ -576,7 +583,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor) // Ensure queue is empty queue->Shutdown(/*graceful*/ true); - EXPECT_EQ(asyncRunsCount->load(), 1); + EXPECT_EQ(asyncRunCount->load(), 1); } TEST(TAsyncLooperTest, RestartDuringSyncStep) @@ -587,36 +594,39 @@ TEST(TAsyncLooperTest, RestartDuringSyncStep) // ignore cancelation in this test. NThreading::TEvent releaseAsync; - auto syncRunsCount = std::make_shared<std::atomic<int>>(0); + auto syncRunCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { return BIND([] { }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([&releaseAsync, syncRunsCount] (bool) { - syncRunsCount->fetch_add(1); + auto syncFinish = BIND([&releaseAsync, syncRunCount] (bool) { + syncRunCount->fetch_add(1); releaseAsync.Wait(); }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); - while (syncRunsCount->load() == 0); + while (syncRunCount->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } - EXPECT_EQ(syncRunsCount->load(), 1); + EXPECT_EQ(syncRunCount->load(), 1); looper->Stop(); looper->Start(); releaseAsync.NotifyAll(); - while (syncRunsCount->load() == 1); + while (syncRunCount->load() == 1) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } @@ -627,15 +637,15 @@ TEST(TAsyncLooperTest, RestartDuringSyncStepWaitFor) auto releaseAsync = NewPromise<void>(); - auto syncRunsCount = std::make_shared<std::atomic<int>>(0); + auto syncRunCount = std::make_shared<std::atomic<int>>(0); - TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { return BIND([] { }).AsyncVia(invoker).Run(); }); - TCallback syncFinish = BIND([releaseAsync, syncRunsCount] (bool) { - syncRunsCount->fetch_add(1); + auto syncFinish = BIND([releaseAsync, syncRunCount] (bool) { + syncRunCount->fetch_add(1); WaitFor(releaseAsync.ToFuture()) .ThrowOnError(); }); @@ -643,21 +653,24 @@ TEST(TAsyncLooperTest, RestartDuringSyncStepWaitFor) auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); - while (syncRunsCount->load() == 0); + while (syncRunCount->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } - EXPECT_EQ(syncRunsCount->load(), 1); + EXPECT_EQ(syncRunCount->load(), 1); looper->Stop(); looper->Start(); releaseAsync.Set(); - while (syncRunsCount->load() == 1); + while (syncRunCount->load() == 1) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } @@ -669,7 +682,7 @@ TEST(TAsyncLooperTest, NullFuture) auto switcher = std::make_shared<std::atomic<bool>>(false); NThreading::TEvent loopBroken; - TCallback asyncStart = BIND([invoker = queue->GetInvoker(), switcher, &loopBroken] (bool) { + auto asyncStart = BIND([invoker = queue->GetInvoker(), switcher, &loopBroken] (bool) { if (!switcher->load()) { loopBroken.NotifyAll(); return TFuture<void>(); @@ -678,28 +691,29 @@ TEST(TAsyncLooperTest, NullFuture) return BIND([] {}).AsyncVia(invoker).Run(); }); - auto syncRunsCount = std::make_shared<std::atomic<int>>(0); - TCallback syncFinish = BIND([syncRunsCount] (bool) { - syncRunsCount->fetch_add(1); + auto syncRunCount = std::make_shared<std::atomic<int>>(0); + auto syncFinish = BIND([syncRunCount] (bool) { + syncRunCount->fetch_add(1); }); auto looper = New<TAsyncLooper>( queue->GetInvoker(), asyncStart, - syncFinish, - LooperName); + syncFinish); looper->Start(); loopBroken.Wait(); - EXPECT_EQ(syncRunsCount->load(), 0); + EXPECT_EQ(syncRunCount->load(), 0); switcher->store(true); looper->Stop(); looper->Start(); - while (syncRunsCount->load() == 0); + while (syncRunCount->load() == 0) { + Sleep(TDuration::MilliSeconds(1)); + } looper->Stop(); } |
