aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-07-13 10:07:37 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-07-13 10:17:32 +0300
commit7fa6e8fc2355ab3dea321089c1fb973928e12892 (patch)
tree2b357aa922dc94d514142456ccfb3abe7f594aba
parent473ffd9303aee1ebd2b1087bfd05d0fa08539510 (diff)
downloadydb-7fa6e8fc2355ab3dea321089c1fb973928e12892.tar.gz
YT-22219: Introduce AsyncLooper and use it in chunk scraper
2e433124f2c0565c823a46b5bc36dc66105a106b
-rw-r--r--yt/yt/core/concurrency/async_looper.cpp310
-rw-r--r--yt/yt/core/concurrency/async_looper.h92
-rw-r--r--yt/yt/core/concurrency/public.h2
-rw-r--r--yt/yt/core/concurrency/unittests/async_looper_ut.cpp710
-rw-r--r--yt/yt/core/concurrency/unittests/ya.make1
-rw-r--r--yt/yt/core/ya.make1
6 files changed, 1116 insertions, 0 deletions
diff --git a/yt/yt/core/concurrency/async_looper.cpp b/yt/yt/core/concurrency/async_looper.cpp
new file mode 100644
index 0000000000..05ad633c1d
--- /dev/null
+++ b/yt/yt/core/concurrency/async_looper.cpp
@@ -0,0 +1,310 @@
+#include "async_looper.h"
+
+#include <yt/yt/core/tracing/trace_context.h>
+
+#include <yt/yt/core/misc/finally.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAsyncLooper::TAsyncLooper(
+ IInvokerPtr invoker,
+ TCallback<TFuture<void>(bool)> asyncStart,
+ TCallback<void(bool)> syncFinish,
+ TString name,
+ const NLogging::TLogger& logger)
+ : Invoker_(std::move(invoker))
+ , AsyncStart_(std::move(asyncStart))
+ , SyncFinish_(std::move(syncFinish))
+ , Logger(logger.WithTag("Looper: %v", name))
+{
+ YT_VERIFY(Invoker_);
+ YT_VERIFY(Invoker_ != GetSyncInvoker());
+ YT_VERIFY(AsyncStart_);
+ YT_VERIFY(SyncFinish_);
+}
+
+void TAsyncLooper::Start()
+{
+ auto traceContext = NTracing::GetOrCreateTraceContext("LooperStart");
+ auto traceGuard = NTracing::TCurrentTraceContextGuard(traceContext);
+ YT_LOG_DEBUG("Requesting AsyncLooper to start");
+
+ Invoker_->Invoke(
+ BIND(&TAsyncLooper::DoStart, MakeStrong(this)));
+}
+
+void TAsyncLooper::DoStart()
+{
+ auto guard = Guard(StateLock_);
+ switch (State_) {
+ case EState::Running:
+ // Already Running.
+ case EState::Restarting:
+ // Soon to be Running.
+ YT_LOG_DEBUG("Looper is either already running or restarting");
+ return;
+
+ case EState::NotRunning:
+ break;
+
+ default:
+ YT_ABORT();
+ }
+
+ switch (Stage_) {
+ case EStage::AsyncBusy:
+ // Has been stopped during async step
+ // -> request restart.
+ case EStage::Busy:
+ // Has been stopped during sync step
+ // -> request restart.
+ YT_LOG_DEBUG("Looper is busy but was stopped. Requesting restart");
+ State_ = EState::Restarting;
+ return;
+
+ case EStage::Idle:
+ // Two possibilities:
+ // 1. Nothing is happening -> we just start as normal
+ // 2. We have been stopped during intermission between
+ // async step and the sync one. Call to stop must have
+ // incremented the epoch and therefore imminent sync step
+ // will bail out due to epoch mismatch -> we are free to
+ // ignore this case on our side.
+ State_ = EState::Running;
+ YT_LOG_DEBUG("Starting the looper");
+ StartLoop(/*cleanStart*/ true, guard);
+ break;
+
+ default:
+ YT_ABORT();
+ }
+}
+
+void TAsyncLooper::StartLoop(bool cleanStart, const TGuard& guard)
+{
+ VERIFY_SPINLOCK_AFFINITY(StateLock_);
+ YT_VERIFY(State_ == EState::Running);
+ YT_VERIFY(Stage_ == EStage::Idle);
+
+ Future_.Reset();
+
+ auto traceContext = NTracing::GetOrCreateTraceContext("StartLoop");
+ auto traceGuard = NTracing::TCurrentTraceContextGuard(traceContext);
+
+ Stage_ = EStage::AsyncBusy;
+
+ TFuture<void> future;
+
+ try {
+ auto cleanup = Finally([&] {
+ Stage_ = EStage::Idle;
+ });
+ auto unguard = Unguard(guard);
+ future = AsyncStart_(cleanStart);
+
+ } catch (const TFiberCanceledException&) {
+ // We got canceled -- this is normal.
+ throw;
+ } catch (const std::exception& ex) {
+ YT_LOG_FATAL(ex, "Unexpected error encountered during the async step");
+ } catch (...) {
+ YT_LOG_FATAL("Unexpected error encountered during the async step");
+ }
+
+ bool wasRestarted = false;
+ switch (State_) {
+ case EState::NotRunning:
+ // We have been stopped during the async step
+ // -> cancel the future if there is one.
+ YT_LOG_DEBUG("Looper stop occured during the async part");
+
+ if (future) {
+ future.Cancel(TError("Looper stopped"));
+ }
+ return;
+
+ case EState::Restarting:
+ // We have been restarted during the async step
+ // -> convert to running.
+ YT_LOG_DEBUG("Looper restart occured during the async part. Next loop will be a clean start");
+
+ State_ = EState::Running;
+ wasRestarted = true;
+ // Next step, should it occur
+ // will have cleanStart == true
+ break;
+
+ case EState::Running:
+ // Nothing happened during async step
+ // -> proceed as normal.
+ break;
+
+ default:
+ YT_ABORT();
+ }
+
+ if (future) {
+ // NB(arkady-e1ppa): We read epoch here (and not earlier)
+ // because we could have been restarted during the async step
+ // in which case we take up the role of the restarted loop.
+ // Thus we have to have get the most up to date epoch here.
+ Future_ = future
+ .Apply(BIND(
+ &TAsyncLooper::AfterStart,
+ MakeWeak(this),
+ cleanStart,
+ wasRestarted,
+ EpochNumber_)
+ .AsyncVia(Invoker_));
+ }
+}
+
+void TAsyncLooper::AfterStart(bool cleanStart, bool wasRestarted, ui64 epochNumber, const TError& error)
+{
+ if (!error.IsOK()) {
+ YT_LOG_WARNING(error, "Async start failed unexpectedly. Stopping looper");
+ return;
+ }
+
+ {
+ auto guard = Guard(StateLock_);
+
+ YT_ASSERT(Stage_ == EStage::Idle);
+
+ switch (State_) {
+ case EState::NotRunning:
+ YT_LOG_DEBUG("Looper stop occured during the intermission between async and sync steps");
+ // We have been stopped -> bail out.
+ return;
+
+ case EState::Running:
+ if (epochNumber != EpochNumber_) {
+ // We got restarted during the intermission.
+ // Caller of |Start| will start the new chain
+ // and we just bail out.
+ YT_LOG_DEBUG("Looper restart occured during the intermission between async and sync steps");
+
+ return;
+ }
+ break;
+
+ case EState::Restarting:
+ // Restarting requires stage to
+ // not be |Idle|.
+ // NB(arkady-e1ppa): Since enum is not macro generated
+ // it is not serializable.
+ YT_LOG_ERROR(
+ "Looper encountered impossible state while starting sync step (State: Restarting)");
+
+ default:
+ YT_ABORT();
+ }
+
+ Stage_ = EStage::Busy;
+ }
+
+ DoStep(cleanStart, wasRestarted);
+}
+
+void TAsyncLooper::DoStep(bool cleanStart, bool wasRestarted)
+{
+ auto cleanup = Finally([this, wasRestarted] {
+ FinishStep(wasRestarted);
+ });
+
+ try {
+ SyncFinish_(cleanStart);
+ } catch (const TFiberCanceledException&) {
+ // We got canceled -- this is normal.
+ throw;
+ } catch (const std::exception& ex) {
+ YT_LOG_FATAL(ex, "Unexpected error encountered during the sync step");
+ } catch (...) {
+ YT_LOG_FATAL("Unexpected error encountered during the sync step");
+ }
+}
+
+void TAsyncLooper::FinishStep(bool wasRestarted)
+{
+ auto guard = Guard(StateLock_);
+
+ YT_ASSERT(Stage_ == EStage::Busy);
+ // NB(arkady-e1ppa): Since this transition happens
+ // under spinlock and below we would go to AsyncBusy before the release
+ // this stage switch will not be observed by users.
+ Stage_ = EStage::Idle;
+
+ switch (State_) {
+ case EState::NotRunning:
+ // We have been stopped
+ // -> bail out.
+ YT_LOG_DEBUG("Looper stop occured during the sync step");
+
+ return;
+
+ case EState::Restarting:
+ // We have been restarted
+ // -> start the new chain of loops.
+ YT_LOG_DEBUG("Looper restart occured during the sync step");
+
+ State_ = EState::Running;
+ StartLoop(/*cleanStart*/ true, guard);
+ return;
+
+ case EState::Running:
+ // Nothing has happened
+ // -> continue the current chain of loops.
+ // NB(arkady-e1ppa): If |wasRestarted| is |true|
+ // then we have been restarted during the async step
+ // of this loop iteration. In order to be able to
+ // reliable ever restart the async part, we will
+ // enforce clean start of the next iteration.
+ StartLoop(/*cleanStart*/ wasRestarted, guard);
+ return;
+
+ default:
+ YT_ABORT();
+ }
+}
+
+void TAsyncLooper::Stop()
+{
+ auto guard = Guard(StateLock_);
+
+ if (State_ == EState::NotRunning) {
+ // Already stopping
+ // -> bail out.
+ YT_LOG_DEBUG("Trying to stop looper that is already stopped");
+ return;
+ }
+
+ State_ = EState::NotRunning;
+ ++EpochNumber_;
+ YT_LOG_DEBUG("Stopping the looper");
+
+ // We could be in one of three possible situations (for each stage):
+ // 1. EStage::AsyncBusy -- |StartLoop| caller will observe
+ // state |NotRunning| eventually and will cancel the future for us
+ // (or there will be a restart which is also handled by caller above).
+ // 2. EStage::Idle -- intermission between async and sync parts
+ // -- if there is not restart, sync part will just bail out
+ // if there is a restart, sync part will observe a different epoch
+ // and bail out as well.
+ // 3. EStage::Busy -- sync part is running and will simply not
+ // continue the chain once it observes NotRunning state.
+
+ if (!Future_) {
+ // If there was a null future produced for async part
+ // or simply while Stage_ == EStage::AsyncBusy.
+ return;
+ }
+
+ Future_.Cancel(TError("Looper stopped"));
+ Future_.Reset();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/async_looper.h b/yt/yt/core/concurrency/async_looper.h
new file mode 100644
index 0000000000..3f356d267b
--- /dev/null
+++ b/yt/yt/core/concurrency/async_looper.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/actions/future.h>
+
+#include <yt/yt/core/logging/log.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Class which indefenetely runs
+// two tasks:
+// 1. Async start which creates async action.
+// 2. Sync finish which is called after async action
+// is finished.
+// Both are ran in Invoker_.
+// Both |Start| and |Stop| are completely
+// thread-safe, can be executed in any order and any number of times.
+// Dropping last reference to looper at any point is also safe.
+class TAsyncLooper
+ : public TRefCounted
+{
+public:
+ TAsyncLooper(
+ IInvokerPtr invoker,
+ TCallback<TFuture<void>(bool cleanStart)> asyncStart,
+ TCallback<void(bool cleanStart)> syncFinish,
+ TString name,
+ const NLogging::TLogger& logger = NLogging::TLogger("Looper logger"));
+
+ // Starts polling.
+ // First loop will have cleanStart == true
+ // Calling after stop will act as
+ // if for the first time.
+ void Start();
+
+ // Cancels the current loop if one is present.
+ void Stop();
+
+private:
+ const IInvokerPtr Invoker_;
+ TCallback<TFuture<void>(bool)> AsyncStart_;
+ TCallback<void(bool)> SyncFinish_;
+
+ YT_DECLARE_SPIN_LOCK(NYT::NThreading::TSpinLock, StateLock_);
+ using TGuard = TGuard<NYT::NThreading::TSpinLock>;
+
+ enum class EState
+ {
+ NotRunning,
+ Running,
+ Restarting,
+ };
+
+ // Default transitions are (only observable ones are listed):
+ // Idle -> AsyncBusy -> Idle -> Busy ->
+ // AsyncBusy -> Idle -> Busy -> ...
+ // NB(arkady-e1ppa): Technically, AsyncBusy
+ // is redundant and can be replaced with Busy.
+ // It does not produce noticeable overhead to keep it
+ // and it is helpful for understanding what is going on
+ // in the implementation (e.g. why it is correct).
+ enum class EStage
+ {
+ Idle,
+ AsyncBusy,
+ Busy,
+ };
+
+ EState State_ = EState::NotRunning;
+ EStage Stage_ = EStage::Idle;
+
+ ui64 EpochNumber_ = 0;
+ TFuture<void> Future_;
+
+ const NLogging::TLogger Logger;
+
+ void DoStart();
+
+ void StartLoop(bool cleanStart, const TGuard& guard);
+ void AfterStart(bool cleanStart, bool wasRestarted, ui64 epochNumber, const TError& error);
+ void DoStep(bool cleanStart, bool wasRestarted);
+ void FinishStep(bool wasRestarted);
+};
+
+DEFINE_REFCOUNTED_TYPE(TAsyncLooper);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h
index d7533f1232..4cab384b88 100644
--- a/yt/yt/core/concurrency/public.h
+++ b/yt/yt/core/concurrency/public.h
@@ -23,6 +23,8 @@ DECLARE_REFCOUNTED_STRUCT(IFairShareActionQueue)
DECLARE_REFCOUNTED_STRUCT(IQuantizedExecutor)
+DECLARE_REFCOUNTED_CLASS(TAsyncLooper);
+
namespace NDetail {
DECLARE_REFCOUNTED_STRUCT(TDelayedExecutorEntry)
diff --git a/yt/yt/core/concurrency/unittests/async_looper_ut.cpp b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp
new file mode 100644
index 0000000000..0b48079a90
--- /dev/null
+++ b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp
@@ -0,0 +1,710 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/concurrency/async_looper.h>
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/scheduler_api.h>
+
+#include <library/cpp/yt/threading/event_count.h>
+
+namespace NYT::NConcurrency {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TString LooperName = "TestLooper";
+
+// TODO(arkady-e1ppa): Add ManualInvoker which only runs callbacks when
+// manually requested. Add test when Stop/Restart occurs during the
+// intermission between Async and Sync steps.
+
+TEST(TAsyncLooperTest, JustWorks)
+{
+ auto queue = New<TActionQueue>();
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ VERIFY_INVOKER_AFFINITY(invoker);
+ return BIND([] {}).AsyncVia(invoker).Run();
+ });
+
+ auto progress = std::make_shared<std::atomic<int>>(0);
+ TCallback syncFinish = BIND([progress, invoker = queue->GetInvoker()] (bool) {
+ VERIFY_INVOKER_AFFINITY(invoker);
+ progress->fetch_add(1);
+ });
+
+ auto currentProgress = progress->load();
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ Sleep(TDuration::Seconds(1));
+ EXPECT_EQ(currentProgress, progress->load());
+
+ looper->Start();
+
+ while (currentProgress == progress->load());
+
+ currentProgress = progress->load();
+
+ while (currentProgress == progress->load());
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, Restart)
+{
+ auto queue = New<TActionQueue>();
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ return BIND([] {}).AsyncVia(invoker).Run();
+ });
+
+ auto cleanStarts = std::make_shared<std::atomic<int>>(0);
+ TCallback syncFinish = BIND([cleanStarts] (bool cleanStart) {
+ if (cleanStart) {
+ cleanStarts->fetch_add(1);
+ }
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ while (cleanStarts->load() == 0);
+
+ EXPECT_EQ(cleanStarts->load(), 1);
+
+ Sleep(TDuration::Seconds(1));
+
+ EXPECT_EQ(cleanStarts->load(), 1);
+
+ looper->Stop();
+
+ looper->Start();
+
+ while (cleanStarts->load() == 1);
+
+ EXPECT_EQ(cleanStarts->load(), 2);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, CancelAsyncStep)
+{
+ auto queue = New<TActionQueue>();
+
+ NThreading::TEvent started;
+ auto promise = NewPromise<void>();
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), promise, &started] (bool) {
+ return BIND([promise, &started] {
+ started.NotifyAll();
+ WaitFor(promise.ToFuture())
+ .ThrowOnError();
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([] (bool) {
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+
+ EXPECT_TRUE(promise.IsCanceled());
+}
+
+TEST(TAsyncLooperTest, CancelSyncStep)
+{
+ auto queue = New<TActionQueue>();
+
+ NThreading::TEvent started;
+ auto promise = NewPromise<void>();
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ return BIND([] {
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([promise, &started] (bool) {
+ started.NotifyAll();
+ WaitFor(promise.ToFuture())
+ .ThrowOnError();
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+
+ EXPECT_TRUE(promise.IsCanceled());
+}
+
+TEST(TAsyncLooperTest, StopDuringAsyncStep)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+ NThreading::TEvent started;
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started] (bool) {
+ return BIND([&releaseAsync, &started] {
+ started.NotifyAll();
+ releaseAsync.Wait();
+ }).AsyncVia(invoker).Run();
+ });
+
+ auto mustBeFalse = std::make_shared<std::atomic<bool>>(false);
+ TCallback syncFinish = BIND([mustBeFalse] (bool) {
+ mustBeFalse->store(true);
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+
+ releaseAsync.NotifyAll();
+
+ // We cannot ensure that callback will be submitted
+ // So we just wait a little bit.
+ Sleep(TDuration::Seconds(1));
+
+ // Ensure queue is empty
+ queue->Shutdown(/*graceful*/ true);
+
+ EXPECT_FALSE(mustBeFalse->load());
+}
+
+TEST(TAsyncLooperTest, StopDuringAsyncStepWaitFor)
+{
+ auto queue = New<TActionQueue>();
+
+ auto releaseAsync = NewPromise<void>();
+ NThreading::TEvent started;
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &started, releaseAsync] (bool) {
+ return BIND([releaseAsync, &started] {
+ started.NotifyAll();
+ WaitFor(releaseAsync.ToFuture())
+ .ThrowOnError();
+ }).AsyncVia(invoker).Run();
+ });
+
+ auto mustBeFalse = std::make_shared<std::atomic<bool>>(false);
+ TCallback syncFinish = BIND([mustBeFalse] (bool) {
+ mustBeFalse->store(true);
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+
+ releaseAsync.Set();
+
+ // We cannot ensure that callback will be submitted
+ // So we just wait a little bit.
+ Sleep(TDuration::Seconds(1));
+
+ // Ensure queue is empty
+ queue->Shutdown(/*graceful*/ true);
+
+ EXPECT_FALSE(mustBeFalse->load());
+}
+
+TEST(TAsyncLooperTest, RestartDuringAsyncStep)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+
+ auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, asyncRunsCount] (bool) {
+ return BIND([&releaseAsync, asyncRunsCount] {
+ asyncRunsCount->fetch_add(1);
+ releaseAsync.Wait();
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([] (bool) {
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ while (asyncRunsCount->load() == 0);
+
+ EXPECT_EQ(asyncRunsCount->load(), 1);
+
+ looper->Stop();
+ looper->Start();
+
+ releaseAsync.NotifyAll();
+
+ while (asyncRunsCount->load() == 1);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, RestartDuringAsyncStepWaitFor)
+{
+ auto queue = New<TActionQueue>();
+
+ auto releaseAsync = NewPromise<void>();
+
+ auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), releaseAsync, asyncRunsCount] (bool) {
+ return BIND([releaseAsync, asyncRunsCount] {
+ asyncRunsCount->fetch_add(1);
+ WaitFor(releaseAsync.ToFuture())
+ .ThrowOnError();
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([] (bool) {
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ while (asyncRunsCount->load() == 0);
+
+ EXPECT_EQ(asyncRunsCount->load(), 1);
+
+ looper->Stop();
+ looper->Start();
+
+ releaseAsync.Set();
+
+ while (asyncRunsCount->load() == 1);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, StopDuringAsyncStepPreparation)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+ NThreading::TEvent started;
+
+ auto mustBeFalse = std::make_shared<std::atomic<bool>>(false);
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, mustBeFalse] (bool) {
+ started.NotifyAll();
+ releaseAsync.Wait();
+
+ // NB(arkady-e1ppa): Callback below will be submitted to the same action queue
+ // current callback is running on. Thus we guarantee that it will not
+ // be finished before looper internals get to cancel it thus
+ // preventing the loop from occuring.
+ return BIND([mustBeFalse] {
+ mustBeFalse->store(true);
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([] (bool) {
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ queue->GetInvoker()->Invoke(BIND([looper] {
+ looper->Start();
+ }));
+
+ started.Wait();
+
+ looper->Stop();
+
+ releaseAsync.NotifyAll();
+
+ // We cannot ensure that callback will be submitted
+ // So we just wait a little bit.
+ Sleep(TDuration::Seconds(1));
+
+ // Ensure queue is empty
+ queue->Shutdown(/*graceful*/ true);
+
+ EXPECT_FALSE(mustBeFalse->load());
+}
+
+TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation1)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+ NThreading::TEvent started;
+
+ auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, asyncRunsCount] (bool) {
+ started.NotifyAll();
+ releaseAsync.Wait();
+ return BIND([asyncRunsCount] {
+ asyncRunsCount->fetch_add(1);
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([] (bool) {
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+ looper->Start();
+
+ releaseAsync.NotifyAll();
+
+ while (asyncRunsCount->load() == 0);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+ NThreading::TEvent secondIterationStarted;
+
+ auto asyncCleanStarts = std::make_shared<std::atomic<int>>(0);
+ auto syncCleanStarts = std::make_shared<std::atomic<int>>(0);
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &secondIterationStarted, asyncCleanStarts, syncCleanStarts] (bool cleanStart) {
+ if (cleanStart) {
+ asyncCleanStarts->fetch_add(1);
+ }
+
+ if (syncCleanStarts->load() == 1) {
+ // Clean start has fully finished.
+ secondIterationStarted.NotifyAll();
+ releaseAsync.Wait();
+ }
+
+ return BIND([] {
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([syncCleanStarts] (bool cleanStart) {
+ if (cleanStart) {
+ syncCleanStarts->fetch_add(1);
+ }
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ secondIterationStarted.Wait();
+
+ EXPECT_EQ(asyncCleanStarts->load(), 1);
+ EXPECT_EQ(syncCleanStarts->load(), 1);
+
+ looper->Stop();
+ looper->Start();
+
+ releaseAsync.NotifyAll();
+
+ while (syncCleanStarts->load() == 1);
+
+ EXPECT_EQ(asyncCleanStarts->load(), 2);
+ EXPECT_EQ(syncCleanStarts->load(), 2);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, StopDuringSyncStep)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+ NThreading::TEvent started;
+
+ auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) {
+ return BIND([asyncRunsCount] {
+ asyncRunsCount->fetch_add(1);
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([&releaseAsync, &started] (bool) {
+ started.NotifyAll();
+ releaseAsync.Wait();
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+
+ releaseAsync.NotifyAll();
+
+ // We cannot ensure that callback will be submitted
+ // So we just wait a little bit.
+ Sleep(TDuration::Seconds(1));
+
+ // Ensure queue is empty
+ queue->Shutdown(/*graceful*/ true);
+
+ EXPECT_EQ(asyncRunsCount->load(), 1);
+}
+
+TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ auto releaseAsync = NewPromise<void>();
+ NThreading::TEvent started;
+
+ auto asyncRunsCount = std::make_shared<std::atomic<int>>(0);
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) {
+ return BIND([asyncRunsCount] {
+ asyncRunsCount->fetch_add(1);
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([releaseAsync, &started] (bool) {
+ started.NotifyAll();
+ WaitFor(releaseAsync.ToFuture())
+ .ThrowOnError();
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ started.Wait();
+
+ looper->Stop();
+
+ releaseAsync.Set();
+
+ // We cannot ensure that callback will be submitted
+ // So we just wait a little bit.
+ Sleep(TDuration::Seconds(1));
+
+ // Ensure queue is empty
+ queue->Shutdown(/*graceful*/ true);
+
+ EXPECT_EQ(asyncRunsCount->load(), 1);
+}
+
+TEST(TAsyncLooperTest, RestartDuringSyncStep)
+{
+ auto queue = New<TActionQueue>();
+
+ // We use event and not future to
+ // ignore cancelation in this test.
+ NThreading::TEvent releaseAsync;
+
+ auto syncRunsCount = std::make_shared<std::atomic<int>>(0);
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ return BIND([] {
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([&releaseAsync, syncRunsCount] (bool) {
+ syncRunsCount->fetch_add(1);
+ releaseAsync.Wait();
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ while (syncRunsCount->load() == 0);
+
+ EXPECT_EQ(syncRunsCount->load(), 1);
+
+ looper->Stop();
+ looper->Start();
+
+ releaseAsync.NotifyAll();
+
+ while (syncRunsCount->load() == 1);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, RestartDuringSyncStepWaitFor)
+{
+ auto queue = New<TActionQueue>();
+
+ auto releaseAsync = NewPromise<void>();
+
+ auto syncRunsCount = std::make_shared<std::atomic<int>>(0);
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) {
+ return BIND([] {
+ }).AsyncVia(invoker).Run();
+ });
+
+ TCallback syncFinish = BIND([releaseAsync, syncRunsCount] (bool) {
+ syncRunsCount->fetch_add(1);
+ WaitFor(releaseAsync.ToFuture())
+ .ThrowOnError();
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ while (syncRunsCount->load() == 0);
+
+ EXPECT_EQ(syncRunsCount->load(), 1);
+
+ looper->Stop();
+ looper->Start();
+
+ releaseAsync.Set();
+
+ while (syncRunsCount->load() == 1);
+
+ looper->Stop();
+}
+
+TEST(TAsyncLooperTest, NullFuture)
+{
+ auto queue = New<TActionQueue>();
+
+ auto switcher = std::make_shared<std::atomic<bool>>(false);
+ NThreading::TEvent loopBroken;
+
+ TCallback asyncStart = BIND([invoker = queue->GetInvoker(), switcher, &loopBroken] (bool) {
+ if (!switcher->load()) {
+ loopBroken.NotifyAll();
+ return TFuture<void>();
+ }
+
+ return BIND([] {}).AsyncVia(invoker).Run();
+ });
+
+ auto syncRunsCount = std::make_shared<std::atomic<int>>(0);
+ TCallback syncFinish = BIND([syncRunsCount] (bool) {
+ syncRunsCount->fetch_add(1);
+ });
+
+ auto looper = New<TAsyncLooper>(
+ queue->GetInvoker(),
+ asyncStart,
+ syncFinish,
+ LooperName);
+
+ looper->Start();
+
+ loopBroken.Wait();
+
+ EXPECT_EQ(syncRunsCount->load(), 0);
+
+ switcher->store(true);
+ looper->Stop();
+ looper->Start();
+
+ while (syncRunsCount->load() == 0);
+
+ looper->Stop();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT:::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make
index d98c8bf285..ae8a5c6f99 100644
--- a/yt/yt/core/concurrency/unittests/ya.make
+++ b/yt/yt/core/concurrency/unittests/ya.make
@@ -10,6 +10,7 @@ PROTO_NAMESPACE(yt)
SRCS(
async_barrier_ut.cpp
+ async_looper_ut.cpp
async_rw_lock_ut.cpp
async_semaphore_ut.cpp
async_stream_pipe_ut.cpp
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 83a4017b3d..9ce00aca5b 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -45,6 +45,7 @@ SRCS(
concurrency/action_queue.cpp
concurrency/async_barrier.cpp
+ concurrency/async_looper.cpp
concurrency/async_rw_lock.cpp
concurrency/async_semaphore.cpp
concurrency/async_stream_pipe.cpp