diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-04-20 13:34:06 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-04-20 13:42:49 +0300 |
commit | e2e52cb94e08f5675363513babfff7b040d95781 (patch) | |
tree | 3b0150470050f1f5f8db56a6b8a38aca50aa97e9 | |
parent | 7358d1854b8132957a8a61d3513faa73187dcd7d (diff) | |
download | ydb-e2e52cb94e08f5675363513babfff7b040d95781.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/core/concurrency/parallel_runner-inl.h | 78 | ||||
-rw-r--r-- | yt/yt/core/concurrency/parallel_runner.h | 54 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp | 78 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/ya.make | 1 |
4 files changed, 211 insertions, 0 deletions
diff --git a/yt/yt/core/concurrency/parallel_runner-inl.h b/yt/yt/core/concurrency/parallel_runner-inl.h new file mode 100644 index 00000000000..ca906dfe185 --- /dev/null +++ b/yt/yt/core/concurrency/parallel_runner-inl.h @@ -0,0 +1,78 @@ +#ifndef PARALLEL_RUNNER_INL_H_ +#error "Direct inclusion of this file is not allowed, include parallel_runner.h" +// For the sake of sane code completion. +#include "parallel_runner.h" +#endif +#undef PARALLEL_RUNNER_INL_H_ + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +TParallelRunner<T>::TParallelRunner( + IInvokerPtr invoker, + i64 batchSize) + : Invoker_(std::move(invoker)) + , BatchSize_(batchSize) +{ } + +template <class T> +TParallelRunner<T> TParallelRunner<T>::CreateAsync( + IInvokerPtr invoker, + i64 batchSize) +{ + return TParallelRunner(std::move(invoker), batchSize); +} + +template <class T> +TParallelRunner<T> TParallelRunner<T>::CreateSync() +{ + return TParallelRunner( + /*invoker*/ nullptr, + /*batchSize*/ std::numeric_limits<i64>::max()); +} + +template <class T> +void TParallelRunner<T>::Add(T item) +{ + CurrentBatch_.push_back(std::move(item)); + if (std::ssize(CurrentBatch_) >= BatchSize_) { + Batches_.push_back(std::exchange(CurrentBatch_, {})); + } +} + +template <class T> +template <class F> +TFuture<void> TParallelRunner<T>::Run(F func) +{ + if (Invoker_) { + if (!CurrentBatch_.empty()) { + Batches_.push_back(std::move(CurrentBatch_)); + } + + std::vector<TFuture<void>> futures; + futures.reserve(Batches_.size()); + for (auto& batch : Batches_) { + futures.push_back( + BIND([func, batch = std::move(batch)] () mutable { + for (auto& item : batch) { + func(item); + } + }) + .AsyncVia(Invoker_) + .Run()); + } + + return AllSucceeded(std::move(futures)); + } else { + for (auto& item : CurrentBatch_) { + func(item); + } + return VoidFuture; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/parallel_runner.h b/yt/yt/core/concurrency/parallel_runner.h new file mode 100644 index 00000000000..17bc22d1ba2 --- /dev/null +++ b/yt/yt/core/concurrency/parallel_runner.h @@ -0,0 +1,54 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/actions/future.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +//! Groups given items into batches (of a given size) and invokes +//! a function for each batch in parallel. +template <class T> +class TParallelRunner +{ +public: + static TParallelRunner CreateAsync( + IInvokerPtr invoker, + i64 batchSize); + static TParallelRunner CreateSync(); + + //! Registers an item to be processed. + void Add(T item); + + //! Runs #func for each registered item. + /*! + * For a runner created via #CreateAsync, function invocations happen asynchronously + * in a given invoker. + * + * For a runner created via #CreateSync, these invocations happen synchronously + * in the current thread. + */ + template <class F> + TFuture<void> Run(F func); + +private: + TParallelRunner( + IInvokerPtr invoker, + i64 batchSize); + + const IInvokerPtr Invoker_; + const i64 BatchSize_; + + std::vector<std::vector<T>> Batches_; + std::vector<T> CurrentBatch_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency + +#define PARALLEL_RUNNER_INL_H_ +#include "parallel_runner-inl.h" +#undef PARALLEL_RUNNER_INL_H_ diff --git a/yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp b/yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp new file mode 100644 index 00000000000..16966a96e6b --- /dev/null +++ b/yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp @@ -0,0 +1,78 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/parallel_runner.h> + +#include <atomic> + +namespace NYT::NConcurrency { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TSyncParallelRunnerTest + : public ::testing::Test + , public ::testing::WithParamInterface<std::tuple<int, int>> +{ }; + +TEST_P(TSyncParallelRunnerTest, Do) +{ + auto [count, expectedSum] = GetParam(); + auto runner = TParallelRunner<int>::CreateSync(); + for (int index = 0; index < count; ++index) { + runner.Add(index); + } + int sum = 0; + auto future = runner.Run([&] (int arg) { + sum += arg; + }); + EXPECT_TRUE(future.IsSet()); + EXPECT_TRUE(future.Get().IsOK()); + EXPECT_EQ(sum, expectedSum); +} + +INSTANTIATE_TEST_SUITE_P( + TSyncParallelRunnerTest, + TSyncParallelRunnerTest, + ::testing::Values( + std::tuple( 0, 0), + std::tuple(10, 45))); + +//////////////////////////////////////////////////////////////////////////////// + +class TAsyncParallelRunnerTest + : public ::testing::Test + , public ::testing::WithParamInterface<std::tuple<int, int, int>> +{ +protected: + const TActionQueuePtr ActionQueue_ = New<TActionQueue>("Queue"); +}; + +TEST_P(TAsyncParallelRunnerTest, Do) +{ + auto [batchSize, count, expectedSum] = GetParam(); + auto runner = TParallelRunner<int>::CreateAsync(ActionQueue_->GetInvoker(), batchSize); + for (int index = 0; index < count; ++index) { + runner.Add(index); + } + std::atomic<int> sum = 0; + runner.Run([&] (int arg) { + sum += arg; + }) + .Get() + .ThrowOnError(); + EXPECT_EQ(sum.load(), expectedSum); +} + +INSTANTIATE_TEST_SUITE_P( + TAsyncParallelRunnerTest, + TAsyncParallelRunnerTest, + ::testing::Values( + std::tuple(1, 0, 0), + std::tuple(3, 10, 45), + std::tuple(3, 9, 36))); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT:::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make index 4abc7ebc0ea..ab8db2de4ca 100644 --- a/yt/yt/core/concurrency/unittests/ya.make +++ b/yt/yt/core/concurrency/unittests/ya.make @@ -26,6 +26,7 @@ SRCS( invoker_pool_ut.cpp nonblocking_batcher_ut.cpp nonblocking_queue_ut.cpp + parallel_runner_ut.cpp periodic_ut.cpp profiled_fair_share_invoker_pool_ut.cpp propagating_storage_ut.cpp |