diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-07-13 10:07:37 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-07-13 10:17:32 +0300 |
commit | 7fa6e8fc2355ab3dea321089c1fb973928e12892 (patch) | |
tree | 2b357aa922dc94d514142456ccfb3abe7f594aba | |
parent | 473ffd9303aee1ebd2b1087bfd05d0fa08539510 (diff) | |
download | ydb-7fa6e8fc2355ab3dea321089c1fb973928e12892.tar.gz |
YT-22219: Introduce AsyncLooper and use it in chunk scraper
2e433124f2c0565c823a46b5bc36dc66105a106b
-rw-r--r-- | yt/yt/core/concurrency/async_looper.cpp | 310 | ||||
-rw-r--r-- | yt/yt/core/concurrency/async_looper.h | 92 | ||||
-rw-r--r-- | yt/yt/core/concurrency/public.h | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/async_looper_ut.cpp | 710 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/ya.make | 1 |
6 files changed, 1116 insertions, 0 deletions
diff --git a/yt/yt/core/concurrency/async_looper.cpp b/yt/yt/core/concurrency/async_looper.cpp new file mode 100644 index 0000000000..05ad633c1d --- /dev/null +++ b/yt/yt/core/concurrency/async_looper.cpp @@ -0,0 +1,310 @@ +#include "async_looper.h" + +#include <yt/yt/core/tracing/trace_context.h> + +#include <yt/yt/core/misc/finally.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +TAsyncLooper::TAsyncLooper( + IInvokerPtr invoker, + TCallback<TFuture<void>(bool)> asyncStart, + TCallback<void(bool)> syncFinish, + TString name, + const NLogging::TLogger& logger) + : Invoker_(std::move(invoker)) + , AsyncStart_(std::move(asyncStart)) + , SyncFinish_(std::move(syncFinish)) + , Logger(logger.WithTag("Looper: %v", name)) +{ + YT_VERIFY(Invoker_); + YT_VERIFY(Invoker_ != GetSyncInvoker()); + YT_VERIFY(AsyncStart_); + YT_VERIFY(SyncFinish_); +} + +void TAsyncLooper::Start() +{ + auto traceContext = NTracing::GetOrCreateTraceContext("LooperStart"); + auto traceGuard = NTracing::TCurrentTraceContextGuard(traceContext); + YT_LOG_DEBUG("Requesting AsyncLooper to start"); + + Invoker_->Invoke( + BIND(&TAsyncLooper::DoStart, MakeStrong(this))); +} + +void TAsyncLooper::DoStart() +{ + auto guard = Guard(StateLock_); + switch (State_) { + case EState::Running: + // Already Running. + case EState::Restarting: + // Soon to be Running. + YT_LOG_DEBUG("Looper is either already running or restarting"); + return; + + case EState::NotRunning: + break; + + default: + YT_ABORT(); + } + + switch (Stage_) { + case EStage::AsyncBusy: + // Has been stopped during async step + // -> request restart. + case EStage::Busy: + // Has been stopped during sync step + // -> request restart. + YT_LOG_DEBUG("Looper is busy but was stopped. Requesting restart"); + State_ = EState::Restarting; + return; + + case EStage::Idle: + // Two possibilities: + // 1. Nothing is happening -> we just start as normal + // 2. We have been stopped during intermission between + // async step and the sync one. Call to stop must have + // incremented the epoch and therefore imminent sync step + // will bail out due to epoch mismatch -> we are free to + // ignore this case on our side. + State_ = EState::Running; + YT_LOG_DEBUG("Starting the looper"); + StartLoop(/*cleanStart*/ true, guard); + break; + + default: + YT_ABORT(); + } +} + +void TAsyncLooper::StartLoop(bool cleanStart, const TGuard& guard) +{ + VERIFY_SPINLOCK_AFFINITY(StateLock_); + YT_VERIFY(State_ == EState::Running); + YT_VERIFY(Stage_ == EStage::Idle); + + Future_.Reset(); + + auto traceContext = NTracing::GetOrCreateTraceContext("StartLoop"); + auto traceGuard = NTracing::TCurrentTraceContextGuard(traceContext); + + Stage_ = EStage::AsyncBusy; + + TFuture<void> future; + + try { + auto cleanup = Finally([&] { + Stage_ = EStage::Idle; + }); + auto unguard = Unguard(guard); + future = AsyncStart_(cleanStart); + + } catch (const TFiberCanceledException&) { + // We got canceled -- this is normal. + throw; + } catch (const std::exception& ex) { + YT_LOG_FATAL(ex, "Unexpected error encountered during the async step"); + } catch (...) { + YT_LOG_FATAL("Unexpected error encountered during the async step"); + } + + bool wasRestarted = false; + switch (State_) { + case EState::NotRunning: + // We have been stopped during the async step + // -> cancel the future if there is one. + YT_LOG_DEBUG("Looper stop occured during the async part"); + + if (future) { + future.Cancel(TError("Looper stopped")); + } + return; + + case EState::Restarting: + // We have been restarted during the async step + // -> convert to running. + YT_LOG_DEBUG("Looper restart occured during the async part. Next loop will be a clean start"); + + State_ = EState::Running; + wasRestarted = true; + // Next step, should it occur + // will have cleanStart == true + break; + + case EState::Running: + // Nothing happened during async step + // -> proceed as normal. + break; + + default: + YT_ABORT(); + } + + if (future) { + // NB(arkady-e1ppa): We read epoch here (and not earlier) + // because we could have been restarted during the async step + // in which case we take up the role of the restarted loop. + // Thus we have to have get the most up to date epoch here. + Future_ = future + .Apply(BIND( + &TAsyncLooper::AfterStart, + MakeWeak(this), + cleanStart, + wasRestarted, + EpochNumber_) + .AsyncVia(Invoker_)); + } +} + +void TAsyncLooper::AfterStart(bool cleanStart, bool wasRestarted, ui64 epochNumber, const TError& error) +{ + if (!error.IsOK()) { + YT_LOG_WARNING(error, "Async start failed unexpectedly. Stopping looper"); + return; + } + + { + auto guard = Guard(StateLock_); + + YT_ASSERT(Stage_ == EStage::Idle); + + switch (State_) { + case EState::NotRunning: + YT_LOG_DEBUG("Looper stop occured during the intermission between async and sync steps"); + // We have been stopped -> bail out. + return; + + case EState::Running: + if (epochNumber != EpochNumber_) { + // We got restarted during the intermission. + // Caller of |Start| will start the new chain + // and we just bail out. + YT_LOG_DEBUG("Looper restart occured during the intermission between async and sync steps"); + + return; + } + break; + + case EState::Restarting: + // Restarting requires stage to + // not be |Idle|. + // NB(arkady-e1ppa): Since enum is not macro generated + // it is not serializable. + YT_LOG_ERROR( + "Looper encountered impossible state while starting sync step (State: Restarting)"); + + default: + YT_ABORT(); + } + + Stage_ = EStage::Busy; + } + + DoStep(cleanStart, wasRestarted); +} + +void TAsyncLooper::DoStep(bool cleanStart, bool wasRestarted) +{ + auto cleanup = Finally([this, wasRestarted] { + FinishStep(wasRestarted); + }); + + try { + SyncFinish_(cleanStart); + } catch (const TFiberCanceledException&) { + // We got canceled -- this is normal. + throw; + } catch (const std::exception& ex) { + YT_LOG_FATAL(ex, "Unexpected error encountered during the sync step"); + } catch (...) { + YT_LOG_FATAL("Unexpected error encountered during the sync step"); + } +} + +void TAsyncLooper::FinishStep(bool wasRestarted) +{ + auto guard = Guard(StateLock_); + + YT_ASSERT(Stage_ == EStage::Busy); + // NB(arkady-e1ppa): Since this transition happens + // under spinlock and below we would go to AsyncBusy before the release + // this stage switch will not be observed by users. + Stage_ = EStage::Idle; + + switch (State_) { + case EState::NotRunning: + // We have been stopped + // -> bail out. + YT_LOG_DEBUG("Looper stop occured during the sync step"); + + return; + + case EState::Restarting: + // We have been restarted + // -> start the new chain of loops. + YT_LOG_DEBUG("Looper restart occured during the sync step"); + + State_ = EState::Running; + StartLoop(/*cleanStart*/ true, guard); + return; + + case EState::Running: + // Nothing has happened + // -> continue the current chain of loops. + // NB(arkady-e1ppa): If |wasRestarted| is |true| + // then we have been restarted during the async step + // of this loop iteration. In order to be able to + // reliable ever restart the async part, we will + // enforce clean start of the next iteration. + StartLoop(/*cleanStart*/ wasRestarted, guard); + return; + + default: + YT_ABORT(); + } +} + +void TAsyncLooper::Stop() +{ + auto guard = Guard(StateLock_); + + if (State_ == EState::NotRunning) { + // Already stopping + // -> bail out. + YT_LOG_DEBUG("Trying to stop looper that is already stopped"); + return; + } + + State_ = EState::NotRunning; + ++EpochNumber_; + YT_LOG_DEBUG("Stopping the looper"); + + // We could be in one of three possible situations (for each stage): + // 1. EStage::AsyncBusy -- |StartLoop| caller will observe + // state |NotRunning| eventually and will cancel the future for us + // (or there will be a restart which is also handled by caller above). + // 2. EStage::Idle -- intermission between async and sync parts + // -- if there is not restart, sync part will just bail out + // if there is a restart, sync part will observe a different epoch + // and bail out as well. + // 3. EStage::Busy -- sync part is running and will simply not + // continue the chain once it observes NotRunning state. + + if (!Future_) { + // If there was a null future produced for async part + // or simply while Stage_ == EStage::AsyncBusy. + return; + } + + Future_.Cancel(TError("Looper stopped")); + Future_.Reset(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // NYT::NConcurrency diff --git a/yt/yt/core/concurrency/async_looper.h b/yt/yt/core/concurrency/async_looper.h new file mode 100644 index 0000000000..3f356d267b --- /dev/null +++ b/yt/yt/core/concurrency/async_looper.h @@ -0,0 +1,92 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/actions/future.h> + +#include <yt/yt/core/logging/log.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +// Class which indefenetely runs +// two tasks: +// 1. Async start which creates async action. +// 2. Sync finish which is called after async action +// is finished. +// Both are ran in Invoker_. +// Both |Start| and |Stop| are completely +// thread-safe, can be executed in any order and any number of times. +// Dropping last reference to looper at any point is also safe. +class TAsyncLooper + : public TRefCounted +{ +public: + TAsyncLooper( + IInvokerPtr invoker, + TCallback<TFuture<void>(bool cleanStart)> asyncStart, + TCallback<void(bool cleanStart)> syncFinish, + TString name, + const NLogging::TLogger& logger = NLogging::TLogger("Looper logger")); + + // Starts polling. + // First loop will have cleanStart == true + // Calling after stop will act as + // if for the first time. + void Start(); + + // Cancels the current loop if one is present. + void Stop(); + +private: + const IInvokerPtr Invoker_; + TCallback<TFuture<void>(bool)> AsyncStart_; + TCallback<void(bool)> SyncFinish_; + + YT_DECLARE_SPIN_LOCK(NYT::NThreading::TSpinLock, StateLock_); + using TGuard = TGuard<NYT::NThreading::TSpinLock>; + + enum class EState + { + NotRunning, + Running, + Restarting, + }; + + // Default transitions are (only observable ones are listed): + // Idle -> AsyncBusy -> Idle -> Busy -> + // AsyncBusy -> Idle -> Busy -> ... + // NB(arkady-e1ppa): Technically, AsyncBusy + // is redundant and can be replaced with Busy. + // It does not produce noticeable overhead to keep it + // and it is helpful for understanding what is going on + // in the implementation (e.g. why it is correct). + enum class EStage + { + Idle, + AsyncBusy, + Busy, + }; + + EState State_ = EState::NotRunning; + EStage Stage_ = EStage::Idle; + + ui64 EpochNumber_ = 0; + TFuture<void> Future_; + + const NLogging::TLogger Logger; + + void DoStart(); + + void StartLoop(bool cleanStart, const TGuard& guard); + void AfterStart(bool cleanStart, bool wasRestarted, ui64 epochNumber, const TError& error); + void DoStep(bool cleanStart, bool wasRestarted); + void FinishStep(bool wasRestarted); +}; + +DEFINE_REFCOUNTED_TYPE(TAsyncLooper); + +//////////////////////////////////////////////////////////////////////////////// + +} // NYT::NConcurrency diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h index d7533f1232..4cab384b88 100644 --- a/yt/yt/core/concurrency/public.h +++ b/yt/yt/core/concurrency/public.h @@ -23,6 +23,8 @@ DECLARE_REFCOUNTED_STRUCT(IFairShareActionQueue) DECLARE_REFCOUNTED_STRUCT(IQuantizedExecutor) +DECLARE_REFCOUNTED_CLASS(TAsyncLooper); + namespace NDetail { DECLARE_REFCOUNTED_STRUCT(TDelayedExecutorEntry) diff --git a/yt/yt/core/concurrency/unittests/async_looper_ut.cpp b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp new file mode 100644 index 0000000000..0b48079a90 --- /dev/null +++ b/yt/yt/core/concurrency/unittests/async_looper_ut.cpp @@ -0,0 +1,710 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/concurrency/async_looper.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/scheduler_api.h> + +#include <library/cpp/yt/threading/event_count.h> + +namespace NYT::NConcurrency { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +const TString LooperName = "TestLooper"; + +// TODO(arkady-e1ppa): Add ManualInvoker which only runs callbacks when +// manually requested. Add test when Stop/Restart occurs during the +// intermission between Async and Sync steps. + +TEST(TAsyncLooperTest, JustWorks) +{ + auto queue = New<TActionQueue>(); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + VERIFY_INVOKER_AFFINITY(invoker); + return BIND([] {}).AsyncVia(invoker).Run(); + }); + + auto progress = std::make_shared<std::atomic<int>>(0); + TCallback syncFinish = BIND([progress, invoker = queue->GetInvoker()] (bool) { + VERIFY_INVOKER_AFFINITY(invoker); + progress->fetch_add(1); + }); + + auto currentProgress = progress->load(); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + Sleep(TDuration::Seconds(1)); + EXPECT_EQ(currentProgress, progress->load()); + + looper->Start(); + + while (currentProgress == progress->load()); + + currentProgress = progress->load(); + + while (currentProgress == progress->load()); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, Restart) +{ + auto queue = New<TActionQueue>(); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + return BIND([] {}).AsyncVia(invoker).Run(); + }); + + auto cleanStarts = std::make_shared<std::atomic<int>>(0); + TCallback syncFinish = BIND([cleanStarts] (bool cleanStart) { + if (cleanStart) { + cleanStarts->fetch_add(1); + } + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + while (cleanStarts->load() == 0); + + EXPECT_EQ(cleanStarts->load(), 1); + + Sleep(TDuration::Seconds(1)); + + EXPECT_EQ(cleanStarts->load(), 1); + + looper->Stop(); + + looper->Start(); + + while (cleanStarts->load() == 1); + + EXPECT_EQ(cleanStarts->load(), 2); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, CancelAsyncStep) +{ + auto queue = New<TActionQueue>(); + + NThreading::TEvent started; + auto promise = NewPromise<void>(); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), promise, &started] (bool) { + return BIND([promise, &started] { + started.NotifyAll(); + WaitFor(promise.ToFuture()) + .ThrowOnError(); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([] (bool) { + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + + EXPECT_TRUE(promise.IsCanceled()); +} + +TEST(TAsyncLooperTest, CancelSyncStep) +{ + auto queue = New<TActionQueue>(); + + NThreading::TEvent started; + auto promise = NewPromise<void>(); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + return BIND([] { + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([promise, &started] (bool) { + started.NotifyAll(); + WaitFor(promise.ToFuture()) + .ThrowOnError(); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + + EXPECT_TRUE(promise.IsCanceled()); +} + +TEST(TAsyncLooperTest, StopDuringAsyncStep) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + NThreading::TEvent started; + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started] (bool) { + return BIND([&releaseAsync, &started] { + started.NotifyAll(); + releaseAsync.Wait(); + }).AsyncVia(invoker).Run(); + }); + + auto mustBeFalse = std::make_shared<std::atomic<bool>>(false); + TCallback syncFinish = BIND([mustBeFalse] (bool) { + mustBeFalse->store(true); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + + releaseAsync.NotifyAll(); + + // We cannot ensure that callback will be submitted + // So we just wait a little bit. + Sleep(TDuration::Seconds(1)); + + // Ensure queue is empty + queue->Shutdown(/*graceful*/ true); + + EXPECT_FALSE(mustBeFalse->load()); +} + +TEST(TAsyncLooperTest, StopDuringAsyncStepWaitFor) +{ + auto queue = New<TActionQueue>(); + + auto releaseAsync = NewPromise<void>(); + NThreading::TEvent started; + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &started, releaseAsync] (bool) { + return BIND([releaseAsync, &started] { + started.NotifyAll(); + WaitFor(releaseAsync.ToFuture()) + .ThrowOnError(); + }).AsyncVia(invoker).Run(); + }); + + auto mustBeFalse = std::make_shared<std::atomic<bool>>(false); + TCallback syncFinish = BIND([mustBeFalse] (bool) { + mustBeFalse->store(true); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + + releaseAsync.Set(); + + // We cannot ensure that callback will be submitted + // So we just wait a little bit. + Sleep(TDuration::Seconds(1)); + + // Ensure queue is empty + queue->Shutdown(/*graceful*/ true); + + EXPECT_FALSE(mustBeFalse->load()); +} + +TEST(TAsyncLooperTest, RestartDuringAsyncStep) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + + auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, asyncRunsCount] (bool) { + return BIND([&releaseAsync, asyncRunsCount] { + asyncRunsCount->fetch_add(1); + releaseAsync.Wait(); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([] (bool) { + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + while (asyncRunsCount->load() == 0); + + EXPECT_EQ(asyncRunsCount->load(), 1); + + looper->Stop(); + looper->Start(); + + releaseAsync.NotifyAll(); + + while (asyncRunsCount->load() == 1); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, RestartDuringAsyncStepWaitFor) +{ + auto queue = New<TActionQueue>(); + + auto releaseAsync = NewPromise<void>(); + + auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), releaseAsync, asyncRunsCount] (bool) { + return BIND([releaseAsync, asyncRunsCount] { + asyncRunsCount->fetch_add(1); + WaitFor(releaseAsync.ToFuture()) + .ThrowOnError(); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([] (bool) { + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + while (asyncRunsCount->load() == 0); + + EXPECT_EQ(asyncRunsCount->load(), 1); + + looper->Stop(); + looper->Start(); + + releaseAsync.Set(); + + while (asyncRunsCount->load() == 1); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, StopDuringAsyncStepPreparation) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + NThreading::TEvent started; + + auto mustBeFalse = std::make_shared<std::atomic<bool>>(false); + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, mustBeFalse] (bool) { + started.NotifyAll(); + releaseAsync.Wait(); + + // NB(arkady-e1ppa): Callback below will be submitted to the same action queue + // current callback is running on. Thus we guarantee that it will not + // be finished before looper internals get to cancel it thus + // preventing the loop from occuring. + return BIND([mustBeFalse] { + mustBeFalse->store(true); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([] (bool) { + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + queue->GetInvoker()->Invoke(BIND([looper] { + looper->Start(); + })); + + started.Wait(); + + looper->Stop(); + + releaseAsync.NotifyAll(); + + // We cannot ensure that callback will be submitted + // So we just wait a little bit. + Sleep(TDuration::Seconds(1)); + + // Ensure queue is empty + queue->Shutdown(/*graceful*/ true); + + EXPECT_FALSE(mustBeFalse->load()); +} + +TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation1) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + NThreading::TEvent started; + + auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &started, asyncRunsCount] (bool) { + started.NotifyAll(); + releaseAsync.Wait(); + return BIND([asyncRunsCount] { + asyncRunsCount->fetch_add(1); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([] (bool) { + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + looper->Start(); + + releaseAsync.NotifyAll(); + + while (asyncRunsCount->load() == 0); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, RestartDuringAsyncStepPreparation2) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + NThreading::TEvent secondIterationStarted; + + auto asyncCleanStarts = std::make_shared<std::atomic<int>>(0); + auto syncCleanStarts = std::make_shared<std::atomic<int>>(0); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), &releaseAsync, &secondIterationStarted, asyncCleanStarts, syncCleanStarts] (bool cleanStart) { + if (cleanStart) { + asyncCleanStarts->fetch_add(1); + } + + if (syncCleanStarts->load() == 1) { + // Clean start has fully finished. + secondIterationStarted.NotifyAll(); + releaseAsync.Wait(); + } + + return BIND([] { + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([syncCleanStarts] (bool cleanStart) { + if (cleanStart) { + syncCleanStarts->fetch_add(1); + } + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + secondIterationStarted.Wait(); + + EXPECT_EQ(asyncCleanStarts->load(), 1); + EXPECT_EQ(syncCleanStarts->load(), 1); + + looper->Stop(); + looper->Start(); + + releaseAsync.NotifyAll(); + + while (syncCleanStarts->load() == 1); + + EXPECT_EQ(asyncCleanStarts->load(), 2); + EXPECT_EQ(syncCleanStarts->load(), 2); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, StopDuringSyncStep) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + NThreading::TEvent started; + + auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) { + return BIND([asyncRunsCount] { + asyncRunsCount->fetch_add(1); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([&releaseAsync, &started] (bool) { + started.NotifyAll(); + releaseAsync.Wait(); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + + releaseAsync.NotifyAll(); + + // We cannot ensure that callback will be submitted + // So we just wait a little bit. + Sleep(TDuration::Seconds(1)); + + // Ensure queue is empty + queue->Shutdown(/*graceful*/ true); + + EXPECT_EQ(asyncRunsCount->load(), 1); +} + +TEST(TAsyncLooperTest, StopDuringSyncStepWaitFor) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + auto releaseAsync = NewPromise<void>(); + NThreading::TEvent started; + + auto asyncRunsCount = std::make_shared<std::atomic<int>>(0); + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), asyncRunsCount] (bool) { + return BIND([asyncRunsCount] { + asyncRunsCount->fetch_add(1); + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([releaseAsync, &started] (bool) { + started.NotifyAll(); + WaitFor(releaseAsync.ToFuture()) + .ThrowOnError(); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + started.Wait(); + + looper->Stop(); + + releaseAsync.Set(); + + // We cannot ensure that callback will be submitted + // So we just wait a little bit. + Sleep(TDuration::Seconds(1)); + + // Ensure queue is empty + queue->Shutdown(/*graceful*/ true); + + EXPECT_EQ(asyncRunsCount->load(), 1); +} + +TEST(TAsyncLooperTest, RestartDuringSyncStep) +{ + auto queue = New<TActionQueue>(); + + // We use event and not future to + // ignore cancelation in this test. + NThreading::TEvent releaseAsync; + + auto syncRunsCount = std::make_shared<std::atomic<int>>(0); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + return BIND([] { + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([&releaseAsync, syncRunsCount] (bool) { + syncRunsCount->fetch_add(1); + releaseAsync.Wait(); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + while (syncRunsCount->load() == 0); + + EXPECT_EQ(syncRunsCount->load(), 1); + + looper->Stop(); + looper->Start(); + + releaseAsync.NotifyAll(); + + while (syncRunsCount->load() == 1); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, RestartDuringSyncStepWaitFor) +{ + auto queue = New<TActionQueue>(); + + auto releaseAsync = NewPromise<void>(); + + auto syncRunsCount = std::make_shared<std::atomic<int>>(0); + + TCallback asyncStart = BIND([invoker = queue->GetInvoker()] (bool) { + return BIND([] { + }).AsyncVia(invoker).Run(); + }); + + TCallback syncFinish = BIND([releaseAsync, syncRunsCount] (bool) { + syncRunsCount->fetch_add(1); + WaitFor(releaseAsync.ToFuture()) + .ThrowOnError(); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + while (syncRunsCount->load() == 0); + + EXPECT_EQ(syncRunsCount->load(), 1); + + looper->Stop(); + looper->Start(); + + releaseAsync.Set(); + + while (syncRunsCount->load() == 1); + + looper->Stop(); +} + +TEST(TAsyncLooperTest, NullFuture) +{ + auto queue = New<TActionQueue>(); + + auto switcher = std::make_shared<std::atomic<bool>>(false); + NThreading::TEvent loopBroken; + + TCallback asyncStart = BIND([invoker = queue->GetInvoker(), switcher, &loopBroken] (bool) { + if (!switcher->load()) { + loopBroken.NotifyAll(); + return TFuture<void>(); + } + + return BIND([] {}).AsyncVia(invoker).Run(); + }); + + auto syncRunsCount = std::make_shared<std::atomic<int>>(0); + TCallback syncFinish = BIND([syncRunsCount] (bool) { + syncRunsCount->fetch_add(1); + }); + + auto looper = New<TAsyncLooper>( + queue->GetInvoker(), + asyncStart, + syncFinish, + LooperName); + + looper->Start(); + + loopBroken.Wait(); + + EXPECT_EQ(syncRunsCount->load(), 0); + + switcher->store(true); + looper->Stop(); + looper->Start(); + + while (syncRunsCount->load() == 0); + + looper->Stop(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT:::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make index d98c8bf285..ae8a5c6f99 100644 --- a/yt/yt/core/concurrency/unittests/ya.make +++ b/yt/yt/core/concurrency/unittests/ya.make @@ -10,6 +10,7 @@ PROTO_NAMESPACE(yt) SRCS( async_barrier_ut.cpp + async_looper_ut.cpp async_rw_lock_ut.cpp async_semaphore_ut.cpp async_stream_pipe_ut.cpp diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 83a4017b3d..9ce00aca5b 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -45,6 +45,7 @@ SRCS( concurrency/action_queue.cpp concurrency/async_barrier.cpp + concurrency/async_looper.cpp concurrency/async_rw_lock.cpp concurrency/async_semaphore.cpp concurrency/async_stream_pipe.cpp |