summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <[email protected]>2024-07-23 00:21:25 +0300
committerarkady-e1ppa <[email protected]>2024-07-23 00:31:10 +0300
commitbf4bccbe064e695aa38836745858955613a416fd (patch)
treed5c2e5a6f566684f532553fffccf04498076688c
parent9a1971ccc4226a82034c2b820a6520e12f779e09 (diff)
Issues of rXXXXXX
a42c1a186ad053457477b49f944c23ce13e05372
-rw-r--r--yt/yt/core/concurrency/async_looper.cpp70
-rw-r--r--yt/yt/core/concurrency/async_looper.h18
-rw-r--r--yt/yt/core/concurrency/unittests/async_looper_ut.cpp232
3 files changed, 167 insertions, 153 deletions
diff --git a/yt/yt/core/concurrency/async_looper.cpp b/yt/yt/core/concurrency/async_looper.cpp
index 05ad633c1d9..e347e3fc2f1 100644
--- a/yt/yt/core/concurrency/async_looper.cpp
+++ b/yt/yt/core/concurrency/async_looper.cpp
@@ -12,12 +12,11 @@ TAsyncLooper::TAsyncLooper(
IInvokerPtr invoker,
TCallback<TFuture<void>(bool)> asyncStart,
TCallback<void(bool)> syncFinish,
- TString name,
const NLogging::TLogger& logger)
: Invoker_(std::move(invoker))
, AsyncStart_(std::move(asyncStart))
, SyncFinish_(std::move(syncFinish))
- , Logger(logger.WithTag("Looper: %v", name))
+ , Logger(logger)
{
YT_VERIFY(Invoker_);
YT_VERIFY(Invoker_ != GetSyncInvoker());
@@ -29,7 +28,7 @@ void TAsyncLooper::Start()
{
auto traceContext = NTracing::GetOrCreateTraceContext("LooperStart");
auto traceGuard = NTracing::TCurrentTraceContextGuard(traceContext);
- YT_LOG_DEBUG("Requesting AsyncLooper to start");
+ YT_LOG_DEBUG("Requesting looper to start");
Invoker_->Invoke(
BIND(&TAsyncLooper::DoStart, MakeStrong(this)));
@@ -73,7 +72,7 @@ void TAsyncLooper::DoStart()
// will bail out due to epoch mismatch -> we are free to
// ignore this case on our side.
State_ = EState::Running;
- YT_LOG_DEBUG("Starting the looper");
+ YT_LOG_DEBUG("Starting looper");
StartLoop(/*cleanStart*/ true, guard);
break;
@@ -271,40 +270,45 @@ void TAsyncLooper::FinishStep(bool wasRestarted)
void TAsyncLooper::Stop()
{
- auto guard = Guard(StateLock_);
+ TFuture<void> future;
- if (State_ == EState::NotRunning) {
- // Already stopping
- // -> bail out.
- YT_LOG_DEBUG("Trying to stop looper that is already stopped");
- return;
- }
+ {
+ auto guard = Guard(StateLock_);
- State_ = EState::NotRunning;
- ++EpochNumber_;
- YT_LOG_DEBUG("Stopping the looper");
-
- // We could be in one of three possible situations (for each stage):
- // 1. EStage::AsyncBusy -- |StartLoop| caller will observe
- // state |NotRunning| eventually and will cancel the future for us
- // (or there will be a restart which is also handled by caller above).
- // 2. EStage::Idle -- intermission between async and sync parts
- // -- if there is not restart, sync part will just bail out
- // if there is a restart, sync part will observe a different epoch
- // and bail out as well.
- // 3. EStage::Busy -- sync part is running and will simply not
- // continue the chain once it observes NotRunning state.
-
- if (!Future_) {
- // If there was a null future produced for async part
- // or simply while Stage_ == EStage::AsyncBusy.
- return;
+ if (State_ == EState::NotRunning) {
+ // Already stopping
+ // -> bail out.
+ YT_LOG_DEBUG("Trying to stop looper that is already stopped");
+ return;
+ }
+
+ State_ = EState::NotRunning;
+ ++EpochNumber_;
+ YT_LOG_DEBUG("Stopping the looper");
+
+ // We could be in one of three possible situations (for each stage):
+ // 1. EStage::AsyncBusy -- |StartLoop| caller will observe
+ // state |NotRunning| eventually and will cancel the future for us
+ // (or there will be a restart which is also handled by caller above).
+ // 2. EStage::Idle -- intermission between async and sync parts
+ // -- if there is not restart, sync part will just bail out
+ // if there is a restart, sync part will observe a different epoch
+ // and bail out as well.
+ // 3. EStage::Busy -- sync part is running and will simply not
+ // continue the chain once it observes NotRunning state.
+
+ if (!Future_) {
+ // If there was a null future produced for async part
+ // or simply while Stage_ == EStage::AsyncBusy.
+ return;
+ }
+
+ future = std::exchange(Future_, TFuture<void>());
}
- Future_.Cancel(TError("Looper stopped"));
- Future_.Reset();
+ future.Cancel(TError("Looper stopped"));
}
////////////////////////////////////////////////////////////////////////////////
-} // NYT::NConcurrency
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/async_looper.h b/yt/yt/core/concurrency/async_looper.h
index 3f356d267b5..93b31a7dc86 100644
--- a/yt/yt/core/concurrency/async_looper.h
+++ b/yt/yt/core/concurrency/async_looper.h
@@ -10,11 +10,9 @@ namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
-// Class which indefenetely runs
-// two tasks:
+// Class which indefenetely runs two tasks:
// 1. Async start which creates async action.
-// 2. Sync finish which is called after async action
-// is finished.
+// 2. Sync finish which is called after async action is finished.
// Both are ran in Invoker_.
// Both |Start| and |Stop| are completely
// thread-safe, can be executed in any order and any number of times.
@@ -27,8 +25,7 @@ public:
IInvokerPtr invoker,
TCallback<TFuture<void>(bool cleanStart)> asyncStart,
TCallback<void(bool cleanStart)> syncFinish,
- TString name,
- const NLogging::TLogger& logger = NLogging::TLogger("Looper logger"));
+ const NLogging::TLogger& logger = NLogging::TLogger("AsyncLooper"));
// Starts polling.
// First loop will have cleanStart == true
@@ -41,8 +38,9 @@ public:
private:
const IInvokerPtr Invoker_;
- TCallback<TFuture<void>(bool)> AsyncStart_;
- TCallback<void(bool)> SyncFinish_;
+ const TCallback<TFuture<void>(bool)> AsyncStart_;
+ const TCallback<void(bool)> SyncFinish_;
+ const NLogging::TLogger Logger;
YT_DECLARE_SPIN_LOCK(NYT::NThreading::TSpinLock, StateLock_);
using TGuard = TGuard<NYT::NThreading::TSpinLock>;
@@ -75,8 +73,6 @@ private:
ui64 EpochNumber_ = 0;
TFuture<void> Future_;
- const NLogging::TLogger Logger;
-
void DoStart();
void StartLoop(bool cleanStart, const TGuard& guard);
@@ -89,4 +85,4 @@ DEFINE_REFCOUNTED_TYPE(TAsyncLooper);
////////////////////////////////////////////////////////////////////////////////
-} // NYT::NConcurrency
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/async_looper_ut.cpp b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp
index 0b48079a901..ca9d9fe2d12 100644
--- a/yt/yt/core/concurrency/unittests/async_looper_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp
@@ -7,13 +7,13 @@
#include <library/cpp/yt/threading/event_count.h>
+#include <thread>
+
namespace NYT::NConcurrency {
namespace {
////////////////////////////////////////////////////////////////////////////////
-const TString LooperName = "TestLooper";
-
// TODO(arkady-e1ppa): Add ManualInvoker which only runs callbacks when
// manually requested. Add test when Stop/Restart occurs during the
// intermission between Async and Sync steps.
@@ -22,13 +22,13 @@ TEST(TAsyncLooperTest, JustWorks)
{
auto queue = New<TActionQueue>();
- TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
VERIFY_INVOKER_AFFINITY(invoker);
return BIND([] {}).AsyncVia(invoker).Run();
});
auto progress = std::make_shared<std::atomic<int>>(0);
- TCallback syncFinish = BIND([progress, invoker = queue->GetInvoker()] (bool) {
+ auto syncFinish = BIND([progress, invoker = queue->GetInvoker()] (bool) {
VERIFY_INVOKER_AFFINITY(invoker);
progress->fetch_add(1);
});
@@ -38,19 +38,22 @@ TEST(TAsyncLooperTest, JustWorks)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
Sleep(TDuration::Seconds(1));
EXPECT_EQ(currentProgress, progress->load());
looper->Start();
- while (currentProgress == progress->load());
+ while (currentProgress == progress->load()) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
currentProgress = progress->load();
- while (currentProgress == progress->load());
+ while (currentProgress == progress->load()) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}
@@ -59,12 +62,12 @@ TEST(TAsyncLooperTest, Restart)
{
auto queue = New<TActionQueue>();
- TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
return BIND([] {}).AsyncVia(invoker).Run();
});
auto cleanStarts = std::make_shared<std::atomic<int>>(0);
- TCallback syncFinish = BIND([cleanStarts] (bool cleanStart) {
+ auto syncFinish = BIND([cleanStarts] (bool cleanStart) {
if (cleanStart) {
cleanStarts->fetch_add(1);
}
@@ -73,12 +76,13 @@ TEST(TAsyncLooperTest, Restart)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
- while (cleanStarts->load() == 0);
+ while (cleanStarts->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
EXPECT_EQ(cleanStarts->load(), 1);
@@ -90,7 +94,9 @@ TEST(TAsyncLooperTest, Restart)
looper->Start();
- while (cleanStarts->load() == 1);
+ while (cleanStarts->load() == 1) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
EXPECT_EQ(cleanStarts->load(), 2);
@@ -104,7 +110,7 @@ TEST(TAsyncLooperTest, CancelAsyncStep)
NThreading::TEvent started;
auto promise = NewPromise<void>();
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), promise, &started] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), promise, &started] (bool) {
return BIND([promise, &started] {
started.NotifyAll();
WaitFor(promise.ToFuture())
@@ -112,14 +118,13 @@ TEST(TAsyncLooperTest, CancelAsyncStep)
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([] (bool) {
+ auto syncFinish = BIND([] (bool) {
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -137,12 +142,12 @@ TEST(TAsyncLooperTest, CancelSyncStep)
NThreading::TEvent started;
auto promise = NewPromise<void>();
- TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
return BIND([] {
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([promise, &started] (bool) {
+ auto syncFinish = BIND([promise, &started] (bool) {
started.NotifyAll();
WaitFor(promise.ToFuture())
.ThrowOnError();
@@ -151,8 +156,7 @@ TEST(TAsyncLooperTest, CancelSyncStep)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -172,7 +176,7 @@ TEST(TAsyncLooperTest, StopDuringAsyncStep)
NThreading::TEvent releaseAsync;
NThreading::TEvent started;
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started] (bool) {
return BIND([&releaseAsync, &started] {
started.NotifyAll();
releaseAsync.Wait();
@@ -180,15 +184,14 @@ TEST(TAsyncLooperTest, StopDuringAsyncStep)
});
auto mustBeFalse = std::make_shared<std::atomic<bool>>(false);
- TCallback syncFinish = BIND([mustBeFalse] (bool) {
+ auto syncFinish = BIND([mustBeFalse] (bool) {
mustBeFalse->store(true);
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -215,7 +218,7 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepWaitFor)
auto releaseAsync = NewPromise<void>();
NThreading::TEvent started;
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &started, releaseAsync] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), &started, releaseAsync] (bool) {
return BIND([releaseAsync, &started] {
started.NotifyAll();
WaitFor(releaseAsync.ToFuture())
@@ -224,15 +227,14 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepWaitFor)
});
auto mustBeFalse = std::make_shared<std::atomic<bool>>(false);
- TCallback syncFinish = BIND([mustBeFalse] (bool) {
+ auto syncFinish = BIND([mustBeFalse] (bool) {
mustBeFalse->store(true);
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -260,36 +262,39 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStep)
// ignore cancelation in this test.
NThreading::TEvent releaseAsync;
- auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+ auto asyncRunCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, asyncRunsCount] (bool) {
- return BIND([&releaseAsync, asyncRunsCount] {
- asyncRunsCount->fetch_add(1);
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, asyncRunCount] (bool) {
+ return BIND([&releaseAsync, asyncRunCount] {
+ asyncRunCount->fetch_add(1);
releaseAsync.Wait();
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([] (bool) {
+ auto syncFinish = BIND([] (bool) {
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
- while (asyncRunsCount->load() == 0);
+ while (asyncRunCount->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
- EXPECT_EQ(asyncRunsCount->load(), 1);
+ EXPECT_EQ(asyncRunCount->load(), 1);
looper->Stop();
looper->Start();
releaseAsync.NotifyAll();
- while (asyncRunsCount->load() == 1);
+ while (asyncRunCount->load() == 1) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}
@@ -300,37 +305,40 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepWaitFor)
auto releaseAsync = NewPromise<void>();
- auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+ auto asyncRunCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), releaseAsync, asyncRunsCount] (bool) {
- return BIND([releaseAsync, asyncRunsCount] {
- asyncRunsCount->fetch_add(1);
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), releaseAsync, asyncRunCount] (bool) {
+ return BIND([releaseAsync, asyncRunCount] {
+ asyncRunCount->fetch_add(1);
WaitFor(releaseAsync.ToFuture())
.ThrowOnError();
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([] (bool) {
+ auto syncFinish = BIND([] (bool) {
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
- while (asyncRunsCount->load() == 0);
+ while (asyncRunCount->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
- EXPECT_EQ(asyncRunsCount->load(), 1);
+ EXPECT_EQ(asyncRunCount->load(), 1);
looper->Stop();
looper->Start();
releaseAsync.Set();
- while (asyncRunsCount->load() == 1);
+ while (asyncRunCount->load() == 1) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}
@@ -345,7 +353,7 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepPreparation)
NThreading::TEvent started;
auto mustBeFalse = std::make_shared<std::atomic<bool>>(false);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, mustBeFalse] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, mustBeFalse] (bool) {
started.NotifyAll();
releaseAsync.Wait();
@@ -358,14 +366,13 @@ TEST(TAsyncLooperTest, StopDuringAsyncStepPreparation)
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([] (bool) {
+ auto syncFinish = BIND([] (bool) {
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
queue->GetInvoker()->Invoke(BIND([looper] {
looper->Start();
@@ -396,24 +403,23 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation1)
NThreading::TEvent releaseAsync;
NThreading::TEvent started;
- auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+ auto asyncRunCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, asyncRunsCount] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, asyncRunCount] (bool) {
started.NotifyAll();
releaseAsync.Wait();
- return BIND([asyncRunsCount] {
- asyncRunsCount->fetch_add(1);
+ return BIND([asyncRunCount] {
+ asyncRunCount->fetch_add(1);
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([] (bool) {
+ auto syncFinish = BIND([] (bool) {
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -424,7 +430,9 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation1)
releaseAsync.NotifyAll();
- while (asyncRunsCount->load() == 0);
+ while (asyncRunCount->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}
@@ -441,7 +449,7 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2)
auto asyncCleanStarts = std::make_shared<std::atomic<int>>(0);
auto syncCleanStarts = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &secondIterationStarted, asyncCleanStarts, syncCleanStarts] (bool cleanStart) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &secondIterationStarted, asyncCleanStarts, syncCleanStarts] (bool cleanStart) {
if (cleanStart) {
asyncCleanStarts->fetch_add(1);
}
@@ -456,7 +464,7 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2)
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([syncCleanStarts] (bool cleanStart) {
+ auto syncFinish = BIND([syncCleanStarts] (bool cleanStart) {
if (cleanStart) {
syncCleanStarts->fetch_add(1);
}
@@ -465,8 +473,7 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -480,7 +487,9 @@ TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2)
releaseAsync.NotifyAll();
- while (syncCleanStarts->load() == 1);
+ while (syncCleanStarts->load() == 1) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
EXPECT_EQ(asyncCleanStarts->load(), 2);
EXPECT_EQ(syncCleanStarts->load(), 2);
@@ -497,14 +506,14 @@ TEST(TAsyncLooperTest, StopDuringSyncStep)
NThreading::TEvent releaseAsync;
NThreading::TEvent started;
- auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) {
- return BIND([asyncRunsCount] {
- asyncRunsCount->fetch_add(1);
+ auto asyncRunCount = std::make_shared<std::atomic<int>>(0);
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunCount] (bool) {
+ return BIND([asyncRunCount] {
+ asyncRunCount->fetch_add(1);
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([&releaseAsync, &started] (bool) {
+ auto syncFinish = BIND([&releaseAsync, &started] (bool) {
started.NotifyAll();
releaseAsync.Wait();
});
@@ -512,8 +521,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStep)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -530,7 +538,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStep)
// Ensure queue is empty
queue->Shutdown(/*graceful*/ true);
- EXPECT_EQ(asyncRunsCount->load(), 1);
+ EXPECT_EQ(asyncRunCount->load(), 1);
}
TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor)
@@ -542,14 +550,14 @@ TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor)
auto releaseAsync = NewPromise<void>();
NThreading::TEvent started;
- auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) {
- return BIND([asyncRunsCount] {
- asyncRunsCount->fetch_add(1);
+ auto asyncRunCount = std::make_shared<std::atomic<int>>(0);
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunCount] (bool) {
+ return BIND([asyncRunCount] {
+ asyncRunCount->fetch_add(1);
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([releaseAsync, &started] (bool) {
+ auto syncFinish = BIND([releaseAsync, &started] (bool) {
started.NotifyAll();
WaitFor(releaseAsync.ToFuture())
.ThrowOnError();
@@ -558,8 +566,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
@@ -576,7 +583,7 @@ TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor)
// Ensure queue is empty
queue->Shutdown(/*graceful*/ true);
- EXPECT_EQ(asyncRunsCount->load(), 1);
+ EXPECT_EQ(asyncRunCount->load(), 1);
}
TEST(TAsyncLooperTest, RestartDuringSyncStep)
@@ -587,36 +594,39 @@ TEST(TAsyncLooperTest, RestartDuringSyncStep)
// ignore cancelation in this test.
NThreading::TEvent releaseAsync;
- auto syncRunsCount = std::make_shared<std::atomic<int>>(0);
+ auto syncRunCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
return BIND([] {
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([&releaseAsync, syncRunsCount] (bool) {
- syncRunsCount->fetch_add(1);
+ auto syncFinish = BIND([&releaseAsync, syncRunCount] (bool) {
+ syncRunCount->fetch_add(1);
releaseAsync.Wait();
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
- while (syncRunsCount->load() == 0);
+ while (syncRunCount->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
- EXPECT_EQ(syncRunsCount->load(), 1);
+ EXPECT_EQ(syncRunCount->load(), 1);
looper->Stop();
looper->Start();
releaseAsync.NotifyAll();
- while (syncRunsCount->load() == 1);
+ while (syncRunCount->load() == 1) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}
@@ -627,15 +637,15 @@ TEST(TAsyncLooperTest, RestartDuringSyncStepWaitFor)
auto releaseAsync = NewPromise<void>();
- auto syncRunsCount = std::make_shared<std::atomic<int>>(0);
+ auto syncRunCount = std::make_shared<std::atomic<int>>(0);
- TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
return BIND([] {
}).AsyncVia(invoker).Run();
});
- TCallback syncFinish = BIND([releaseAsync, syncRunsCount] (bool) {
- syncRunsCount->fetch_add(1);
+ auto syncFinish = BIND([releaseAsync, syncRunCount] (bool) {
+ syncRunCount->fetch_add(1);
WaitFor(releaseAsync.ToFuture())
.ThrowOnError();
});
@@ -643,21 +653,24 @@ TEST(TAsyncLooperTest, RestartDuringSyncStepWaitFor)
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
- while (syncRunsCount->load() == 0);
+ while (syncRunCount->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
- EXPECT_EQ(syncRunsCount->load(), 1);
+ EXPECT_EQ(syncRunCount->load(), 1);
looper->Stop();
looper->Start();
releaseAsync.Set();
- while (syncRunsCount->load() == 1);
+ while (syncRunCount->load() == 1) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}
@@ -669,7 +682,7 @@ TEST(TAsyncLooperTest, NullFuture)
auto switcher = std::make_shared<std::atomic<bool>>(false);
NThreading::TEvent loopBroken;
- TCallback asyncStart = BIND([invoker = queue->GetInvoker(), switcher, &loopBroken] (bool) {
+ auto asyncStart = BIND([invoker = queue->GetInvoker(), switcher, &loopBroken] (bool) {
if (!switcher->load()) {
loopBroken.NotifyAll();
return TFuture<void>();
@@ -678,28 +691,29 @@ TEST(TAsyncLooperTest, NullFuture)
return BIND([] {}).AsyncVia(invoker).Run();
});
- auto syncRunsCount = std::make_shared<std::atomic<int>>(0);
- TCallback syncFinish = BIND([syncRunsCount] (bool) {
- syncRunsCount->fetch_add(1);
+ auto syncRunCount = std::make_shared<std::atomic<int>>(0);
+ auto syncFinish = BIND([syncRunCount] (bool) {
+ syncRunCount->fetch_add(1);
});
auto looper = New<TAsyncLooper>(
queue->GetInvoker(),
asyncStart,
- syncFinish,
- LooperName);
+ syncFinish);
looper->Start();
loopBroken.Wait();
- EXPECT_EQ(syncRunsCount->load(), 0);
+ EXPECT_EQ(syncRunCount->load(), 0);
switcher->store(true);
looper->Stop();
looper->Start();
- while (syncRunsCount->load() == 0);
+ while (syncRunCount->load() == 0) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
looper->Stop();
}