diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-01-09 14:00:11 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-01-09 14:13:35 +0300 |
commit | 8226a64bff2d23f431351e2daa829b80b493a487 (patch) | |
tree | 89df8011464a1342bae01280a5cf73302cab7602 | |
parent | 427d34bceee5c2b12a181a0eac3fd562b68e8754 (diff) | |
download | ydb-8226a64bff2d23f431351e2daa829b80b493a487.tar.gz |
Intermediate changes
commit_hash:85dacdeea9986fde1c6d03d8f9b6369c5310d0ee
-rw-r--r-- | yt/yt/core/concurrency/unittests/bounded_concurrency_invoker_ut.cpp | 284 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/scheduler_ut.cpp | 245 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/ya.make | 1 |
3 files changed, 285 insertions, 245 deletions
diff --git a/yt/yt/core/concurrency/unittests/bounded_concurrency_invoker_ut.cpp b/yt/yt/core/concurrency/unittests/bounded_concurrency_invoker_ut.cpp new file mode 100644 index 0000000000..5a22f97430 --- /dev/null +++ b/yt/yt/core/concurrency/unittests/bounded_concurrency_invoker_ut.cpp @@ -0,0 +1,284 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/actions/future.h> + +#include <yt/yt/core/concurrency/action_queue.h> + +#include <yt/yt/core/logging/log.h> + +#include <yt/yt/core/misc/lazy_ptr.h> + +namespace NYT::NConcurrency { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +constexpr auto SleepQuantum = TDuration::MilliSeconds(100); + +//////////////////////////////////////////////////////////////////////////////// + +class TBoundedConcurrencyInvokerTest + : public ::testing::Test +{ +protected: + TLazyIntrusivePtr<TActionQueue> Queue1; + TLazyIntrusivePtr<TActionQueue> Queue2; + + void TearDown() override + { + if (Queue1.HasValue()) { + Queue1->Shutdown(); + } + if (Queue2.HasValue()) { + Queue2->Shutdown(); + } + } +}; + +TEST_F(TBoundedConcurrencyInvokerTest, WaitFor1) +{ + auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 1); + BIND([&] { + for (int i = 0; i < 10; ++i) { + TDelayedExecutor::WaitForDuration(SleepQuantum); + } + }).AsyncVia(invoker).Run().Get().ThrowOnError(); +} + +TEST_F(TBoundedConcurrencyInvokerTest, WaitFor2) +{ + auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 2); + + auto promise = NewPromise<void>(); + auto future = promise.ToFuture(); + + auto a1 = BIND([&] { + promise.Set(); + }); + + auto a2 = BIND([&] { + invoker->Invoke(a1); + WaitFor(future) + .ThrowOnError(); + }); + + a2.AsyncVia(invoker).Run().Get().ThrowOnError(); +} + +TEST_F(TBoundedConcurrencyInvokerTest, WaitFor3) +{ + auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 1); + + auto promise = NewPromise<void>(); + auto future = promise.ToFuture(); + + bool a1called = false; + bool a1finished = false; + auto a1 = BIND([&] { + a1called = true; + WaitFor(future) + .ThrowOnError(); + a1finished = true; + }); + + bool a2called = false; + auto a2 = BIND([&] { + a2called = true; + }); + + invoker->Invoke(a1); + invoker->Invoke(a2); + + Sleep(SleepQuantum); + EXPECT_TRUE(a1called); + EXPECT_FALSE(a1finished); + EXPECT_FALSE(a2called); + + promise.Set(); + + Sleep(SleepQuantum); + EXPECT_TRUE(a1called); + EXPECT_TRUE(a1finished); + EXPECT_TRUE(a2called); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TBoundedConcurrencyInvokerParametrizedReconfigureTest + : public TBoundedConcurrencyInvokerTest + , public ::testing::WithParamInterface<std::tuple<int, int, bool>> +{ }; + +INSTANTIATE_TEST_SUITE_P( + Test, + TBoundedConcurrencyInvokerParametrizedReconfigureTest, + ::testing::Values( + std::tuple(3, 5, true), + std::tuple(5, 3, true), + std::tuple(3, 5, false), + std::tuple(5, 3, false))); + +TEST_P(TBoundedConcurrencyInvokerParametrizedReconfigureTest, SetMaxConcurrentInvocations) +{ + auto [initialMaxConcurrentInvocations, finalMaxConcurrentInvocations, invokeSecondBatchRightAway] = GetParam(); + int maxConcurrentInvocations = initialMaxConcurrentInvocations; + auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), maxConcurrentInvocations); + + // Set firstPromise, then secondPromise. + auto firstPromise = NewPromise<void>(); + auto secondPromise = NewPromise<void>(); + + auto firstFuture = firstPromise.ToFuture(); + auto secondFuture = secondPromise.ToFuture(); + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, lock); + int runnedCallbacks = 0; + int finishedCallbacks = 0; + + std::vector<std::vector<TFuture<void>>> callbacks; + + auto invokeCallbacks = [&] (std::optional<TFuture<void>> startFuture = {}) { + for (int i = 0; i < 10; i++) { + callbacks.back().push_back(BIND([ + &, + callbackIndex = i, + startFuture + ] { + if (startFuture) { + WaitFor(*startFuture) + .ThrowOnError(); + } + + { + auto guard = Guard(lock); + runnedCallbacks += 1; + } + + // Later callbacks wait for the first future to set to check that + // they are not scheduled before first MaxConcurrentInvocations callbacks. + WaitFor((callbackIndex > maxConcurrentInvocations) + ? firstFuture + : secondFuture) + .ThrowOnError(); + + auto guard = Guard(lock); + + auto concurrentInvocations = runnedCallbacks - finishedCallbacks; + THROW_ERROR_EXCEPTION_UNLESS(concurrentInvocations <= maxConcurrentInvocations, "Number of concurrent invocations exceeds maximum (ConcurrentInvocations: %v, MaxConcurrentInvocations: %v)", + concurrentInvocations, + maxConcurrentInvocations); + if (callbackIndex > maxConcurrentInvocations) { + THROW_ERROR_EXCEPTION_UNLESS(finishedCallbacks > maxConcurrentInvocations, "%v-th callback was executed before first %v", + callbackIndex + 1, maxConcurrentInvocations); + } + + finishedCallbacks += 1; + }).AsyncVia(invoker).Run()); + } + }; + + auto resetState = [&] { + firstPromise = NewPromise<void>(); + secondPromise = NewPromise<void>(); + + firstFuture = firstPromise.ToFuture(); + secondFuture = secondPromise.ToFuture(); + + runnedCallbacks = 0; + finishedCallbacks = 0; + }; + + callbacks.emplace_back(); + invokeCallbacks(); + + auto startPromise = NewPromise<void>(); + auto catchUpPromise = NewPromise<void>(); + std::vector<TFuture<void>> catchUpCallbacks; + + auto invokeSecondBatch = [&] { + // To properly decrease max concurrent invocations we need some buffer callbacks. + auto catchUpFuture = catchUpPromise.ToFuture(); + if (finalMaxConcurrentInvocations < initialMaxConcurrentInvocations) { + for (int i = 0; i < 10; i++) { + catchUpCallbacks.push_back(BIND([catchUpFuture] { + WaitFor(catchUpFuture) + .ThrowOnError(); + }).AsyncVia(invoker).Run()); + } + } + + callbacks.emplace_back(); + invokeCallbacks(startPromise.ToFuture()); + }; + + if (invokeSecondBatchRightAway) { + invokeSecondBatch(); + } + + ASSERT_EQ(std::ssize(callbacks), invokeSecondBatchRightAway ? 2 : 1); + ASSERT_EQ(std::ssize(callbacks[0]), 10); + if (invokeSecondBatchRightAway) { + ASSERT_EQ(std::ssize(callbacks[1]), 10); + } + + firstPromise.Set(); + secondPromise.Set(); + + WaitFor(AllSucceeded(callbacks[0])) + .ThrowOnError(); + EXPECT_EQ(runnedCallbacks, 10); + EXPECT_EQ(finishedCallbacks, 10); + + resetState(); + + maxConcurrentInvocations = finalMaxConcurrentInvocations; + invoker->SetMaxConcurrentInvocations(maxConcurrentInvocations); + + if (!invokeSecondBatchRightAway) { + invokeSecondBatch(); + } + + ASSERT_EQ(std::ssize(callbacks), 2); + ASSERT_EQ(std::ssize(callbacks[1]), 10); + + if (finalMaxConcurrentInvocations < initialMaxConcurrentInvocations) { + // Wait until pending change is applied + catchUpPromise.Set(); + WaitFor(AllSucceeded(catchUpCallbacks)) + .ThrowOnError(); + } + + startPromise.Set(); + + firstPromise.Set(); + secondPromise.Set(); + + WaitFor(AllSucceeded(callbacks[1])) + .ThrowOnError(); + EXPECT_EQ(runnedCallbacks, 10); + EXPECT_EQ(finishedCallbacks, 10); +} + +TEST_F(TBoundedConcurrencyInvokerTest, ReconfigureBeforeFirstInvocation) +{ + auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 0); + invoker->SetMaxConcurrentInvocations(1); + + auto promise = NewPromise<void>(); + + BIND([&] { + promise.Set(); + }) + .AsyncVia(invoker) + .Run() + .Get() + .ThrowOnError(); + + EXPECT_TRUE(promise.IsSet()); +} + + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp index 9b6bc7e5c2..445b98543a 100644 --- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp @@ -503,251 +503,6 @@ TEST_F(TSchedulerTest, WaitForInSerializedInvoker2) AllSucceeded(futures).Get().ThrowOnError(); } -// Bounded concurrency invoker. -class TBoundedConcurrencyInvokerTest - : public TSchedulerTest -{ }; - -TEST_F(TBoundedConcurrencyInvokerTest, WaitFor1) -{ - auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 1); - BIND([&] { - for (int i = 0; i < 10; ++i) { - TDelayedExecutor::WaitForDuration(SleepQuantum); - } - }).AsyncVia(invoker).Run().Get().ThrowOnError(); -} - -TEST_F(TBoundedConcurrencyInvokerTest, WaitFor2) -{ - auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 2); - - auto promise = NewPromise<void>(); - auto future = promise.ToFuture(); - - auto a1 = BIND([&] { - promise.Set(); - }); - - auto a2 = BIND([&] { - invoker->Invoke(a1); - WaitFor(future) - .ThrowOnError(); - }); - - a2.AsyncVia(invoker).Run().Get().ThrowOnError(); -} - -TEST_F(TBoundedConcurrencyInvokerTest, WaitFor3) -{ - auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 1); - - auto promise = NewPromise<void>(); - auto future = promise.ToFuture(); - - bool a1called = false; - bool a1finished = false; - auto a1 = BIND([&] { - a1called = true; - WaitFor(future) - .ThrowOnError(); - a1finished = true; - }); - - bool a2called = false; - auto a2 = BIND([&] { - a2called = true; - }); - - invoker->Invoke(a1); - invoker->Invoke(a2); - - Sleep(SleepQuantum); - EXPECT_TRUE(a1called); - EXPECT_FALSE(a1finished); - EXPECT_FALSE(a2called); - - promise.Set(); - - Sleep(SleepQuantum); - EXPECT_TRUE(a1called); - EXPECT_TRUE(a1finished); - EXPECT_TRUE(a2called); -} - -class TBoundedConcurrencyInvokerParametrizedReconfigureTest - : public TBoundedConcurrencyInvokerTest - , public ::testing::WithParamInterface<std::tuple<int, int, bool>> -{ }; - -INSTANTIATE_TEST_SUITE_P( - Test, - TBoundedConcurrencyInvokerParametrizedReconfigureTest, - ::testing::Values( - std::tuple(3, 5, true), - std::tuple(5, 3, true), - std::tuple(3, 5, false), - std::tuple(5, 3, false))); - -TEST_P(TBoundedConcurrencyInvokerParametrizedReconfigureTest, SetMaxConcurrentInvocations) -{ - auto [initialMaxConcurrentInvocations, finalMaxConcurrentInvocations, invokeSecondBatchRightAway] = GetParam(); - int maxConcurrentInvocations = initialMaxConcurrentInvocations; - auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), maxConcurrentInvocations); - - // Set firstPromise, then secondPromise. - auto firstPromise = NewPromise<void>(); - auto secondPromise = NewPromise<void>(); - - auto firstFuture = firstPromise.ToFuture(); - auto secondFuture = secondPromise.ToFuture(); - - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, lock); - int runnedCallbacks = 0; - int finishedCallbacks = 0; - - std::vector<std::vector<TFuture<void>>> callbacks; - - auto invokeCallbacks = [&] (std::optional<TFuture<void>> startFuture = {}) { - for (int i = 0; i < 10; i++) { - callbacks.back().push_back(BIND([ - &, - callbackIndex = i, - startFuture - ] { - if (startFuture) { - WaitFor(*startFuture) - .ThrowOnError(); - } - - { - auto guard = Guard(lock); - runnedCallbacks += 1; - } - - // Later callbacks wait for the first future to set to check that - // they are not scheduled before first MaxConcurrentInvocations callbacks. - WaitFor((callbackIndex > maxConcurrentInvocations) - ? firstFuture - : secondFuture) - .ThrowOnError(); - - auto guard = Guard(lock); - - auto concurrentInvocations = runnedCallbacks - finishedCallbacks; - THROW_ERROR_EXCEPTION_UNLESS(concurrentInvocations <= maxConcurrentInvocations, "Number of concurrent invocations exceeds maximum (ConcurrentInvocations: %v, MaxConcurrentInvocations: %v)", - concurrentInvocations, - maxConcurrentInvocations); - if (callbackIndex > maxConcurrentInvocations) { - THROW_ERROR_EXCEPTION_UNLESS(finishedCallbacks > maxConcurrentInvocations, "%v-th callback was executed before first %v", - callbackIndex + 1, maxConcurrentInvocations); - } - - finishedCallbacks += 1; - }).AsyncVia(invoker).Run()); - } - }; - - auto resetState = [&] { - firstPromise = NewPromise<void>(); - secondPromise = NewPromise<void>(); - - firstFuture = firstPromise.ToFuture(); - secondFuture = secondPromise.ToFuture(); - - runnedCallbacks = 0; - finishedCallbacks = 0; - }; - - callbacks.emplace_back(); - invokeCallbacks(); - - auto startPromise = NewPromise<void>(); - auto catchUpPromise = NewPromise<void>(); - std::vector<TFuture<void>> catchUpCallbacks; - - auto invokeSecondBatch = [&] { - // To properly decrease max concurrent invocations we need some buffer callbacks. - auto catchUpFuture = catchUpPromise.ToFuture(); - if (finalMaxConcurrentInvocations < initialMaxConcurrentInvocations) { - for (int i = 0; i < 10; i++) { - catchUpCallbacks.push_back(BIND([catchUpFuture] { - WaitFor(catchUpFuture) - .ThrowOnError(); - }).AsyncVia(invoker).Run()); - } - } - - callbacks.emplace_back(); - invokeCallbacks(startPromise.ToFuture()); - }; - - if (invokeSecondBatchRightAway) { - invokeSecondBatch(); - } - - ASSERT_EQ(std::ssize(callbacks), invokeSecondBatchRightAway ? 2 : 1); - ASSERT_EQ(std::ssize(callbacks[0]), 10); - if (invokeSecondBatchRightAway) { - ASSERT_EQ(std::ssize(callbacks[1]), 10); - } - - firstPromise.Set(); - secondPromise.Set(); - - WaitFor(AllSucceeded(callbacks[0])) - .ThrowOnError(); - EXPECT_EQ(runnedCallbacks, 10); - EXPECT_EQ(finishedCallbacks, 10); - - resetState(); - - maxConcurrentInvocations = finalMaxConcurrentInvocations; - invoker->SetMaxConcurrentInvocations(maxConcurrentInvocations); - - if (!invokeSecondBatchRightAway) { - invokeSecondBatch(); - } - - ASSERT_EQ(std::ssize(callbacks), 2); - ASSERT_EQ(std::ssize(callbacks[1]), 10); - - if (finalMaxConcurrentInvocations < initialMaxConcurrentInvocations) { - // Wait until pending change is applied - catchUpPromise.Set(); - WaitFor(AllSucceeded(catchUpCallbacks)) - .ThrowOnError(); - } - - startPromise.Set(); - - firstPromise.Set(); - secondPromise.Set(); - - WaitFor(AllSucceeded(callbacks[1])) - .ThrowOnError(); - EXPECT_EQ(runnedCallbacks, 10); - EXPECT_EQ(finishedCallbacks, 10); -} - -TEST_F(TBoundedConcurrencyInvokerTest, ReconfigureBeforeFirstInvocation) -{ - auto invoker = CreateBoundedConcurrencyInvoker(Queue1->GetInvoker(), 0); - invoker->SetMaxConcurrentInvocations(1); - - auto promise = NewPromise<void>(); - - BIND([&] { - promise.Set(); - }) - .AsyncVia(invoker) - .Run() - .Get() - .ThrowOnError(); - - EXPECT_TRUE(promise.IsSet()); -} - TEST_F(TSchedulerTest, PropagateFiberCancelationToFuture) { auto p1 = NewPromise<void>(); diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make index 61f7b416ad..1d59b915ee 100644 --- a/yt/yt/core/concurrency/unittests/ya.make +++ b/yt/yt/core/concurrency/unittests/ya.make @@ -12,6 +12,7 @@ SRCS( async_stream_pipe_ut.cpp async_stream_ut.cpp async_yson_writer_ut.cpp + bounded_concurrency_invoker_ut.cpp coroutines_ut.cpp count_down_latch_ut.cpp delayed_executor_ut.cpp |