aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-04-20 13:34:06 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-04-20 13:42:49 +0300
commite2e52cb94e08f5675363513babfff7b040d95781 (patch)
tree3b0150470050f1f5f8db56a6b8a38aca50aa97e9
parent7358d1854b8132957a8a61d3513faa73187dcd7d (diff)
downloadydb-e2e52cb94e08f5675363513babfff7b040d95781.tar.gz
Intermediate changes
-rw-r--r--yt/yt/core/concurrency/parallel_runner-inl.h78
-rw-r--r--yt/yt/core/concurrency/parallel_runner.h54
-rw-r--r--yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp78
-rw-r--r--yt/yt/core/concurrency/unittests/ya.make1
4 files changed, 211 insertions, 0 deletions
diff --git a/yt/yt/core/concurrency/parallel_runner-inl.h b/yt/yt/core/concurrency/parallel_runner-inl.h
new file mode 100644
index 00000000000..ca906dfe185
--- /dev/null
+++ b/yt/yt/core/concurrency/parallel_runner-inl.h
@@ -0,0 +1,78 @@
+#ifndef PARALLEL_RUNNER_INL_H_
+#error "Direct inclusion of this file is not allowed, include parallel_runner.h"
+// For the sake of sane code completion.
+#include "parallel_runner.h"
+#endif
+#undef PARALLEL_RUNNER_INL_H_
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+TParallelRunner<T>::TParallelRunner(
+ IInvokerPtr invoker,
+ i64 batchSize)
+ : Invoker_(std::move(invoker))
+ , BatchSize_(batchSize)
+{ }
+
+template <class T>
+TParallelRunner<T> TParallelRunner<T>::CreateAsync(
+ IInvokerPtr invoker,
+ i64 batchSize)
+{
+ return TParallelRunner(std::move(invoker), batchSize);
+}
+
+template <class T>
+TParallelRunner<T> TParallelRunner<T>::CreateSync()
+{
+ return TParallelRunner(
+ /*invoker*/ nullptr,
+ /*batchSize*/ std::numeric_limits<i64>::max());
+}
+
+template <class T>
+void TParallelRunner<T>::Add(T item)
+{
+ CurrentBatch_.push_back(std::move(item));
+ if (std::ssize(CurrentBatch_) >= BatchSize_) {
+ Batches_.push_back(std::exchange(CurrentBatch_, {}));
+ }
+}
+
+template <class T>
+template <class F>
+TFuture<void> TParallelRunner<T>::Run(F func)
+{
+ if (Invoker_) {
+ if (!CurrentBatch_.empty()) {
+ Batches_.push_back(std::move(CurrentBatch_));
+ }
+
+ std::vector<TFuture<void>> futures;
+ futures.reserve(Batches_.size());
+ for (auto& batch : Batches_) {
+ futures.push_back(
+ BIND([func, batch = std::move(batch)] () mutable {
+ for (auto& item : batch) {
+ func(item);
+ }
+ })
+ .AsyncVia(Invoker_)
+ .Run());
+ }
+
+ return AllSucceeded(std::move(futures));
+ } else {
+ for (auto& item : CurrentBatch_) {
+ func(item);
+ }
+ return VoidFuture;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/parallel_runner.h b/yt/yt/core/concurrency/parallel_runner.h
new file mode 100644
index 00000000000..17bc22d1ba2
--- /dev/null
+++ b/yt/yt/core/concurrency/parallel_runner.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/actions/future.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Groups given items into batches (of a given size) and invokes
+//! a function for each batch in parallel.
+template <class T>
+class TParallelRunner
+{
+public:
+ static TParallelRunner CreateAsync(
+ IInvokerPtr invoker,
+ i64 batchSize);
+ static TParallelRunner CreateSync();
+
+ //! Registers an item to be processed.
+ void Add(T item);
+
+ //! Runs #func for each registered item.
+ /*!
+ * For a runner created via #CreateAsync, function invocations happen asynchronously
+ * in a given invoker.
+ *
+ * For a runner created via #CreateSync, these invocations happen synchronously
+ * in the current thread.
+ */
+ template <class F>
+ TFuture<void> Run(F func);
+
+private:
+ TParallelRunner(
+ IInvokerPtr invoker,
+ i64 batchSize);
+
+ const IInvokerPtr Invoker_;
+ const i64 BatchSize_;
+
+ std::vector<std::vector<T>> Batches_;
+ std::vector<T> CurrentBatch_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
+
+#define PARALLEL_RUNNER_INL_H_
+#include "parallel_runner-inl.h"
+#undef PARALLEL_RUNNER_INL_H_
diff --git a/yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp b/yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp
new file mode 100644
index 00000000000..16966a96e6b
--- /dev/null
+++ b/yt/yt/core/concurrency/unittests/parallel_runner_ut.cpp
@@ -0,0 +1,78 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/parallel_runner.h>
+
+#include <atomic>
+
+namespace NYT::NConcurrency {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSyncParallelRunnerTest
+ : public ::testing::Test
+ , public ::testing::WithParamInterface<std::tuple<int, int>>
+{ };
+
+TEST_P(TSyncParallelRunnerTest, Do)
+{
+ auto [count, expectedSum] = GetParam();
+ auto runner = TParallelRunner<int>::CreateSync();
+ for (int index = 0; index < count; ++index) {
+ runner.Add(index);
+ }
+ int sum = 0;
+ auto future = runner.Run([&] (int arg) {
+ sum += arg;
+ });
+ EXPECT_TRUE(future.IsSet());
+ EXPECT_TRUE(future.Get().IsOK());
+ EXPECT_EQ(sum, expectedSum);
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ TSyncParallelRunnerTest,
+ TSyncParallelRunnerTest,
+ ::testing::Values(
+ std::tuple( 0, 0),
+ std::tuple(10, 45)));
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAsyncParallelRunnerTest
+ : public ::testing::Test
+ , public ::testing::WithParamInterface<std::tuple<int, int, int>>
+{
+protected:
+ const TActionQueuePtr ActionQueue_ = New<TActionQueue>("Queue");
+};
+
+TEST_P(TAsyncParallelRunnerTest, Do)
+{
+ auto [batchSize, count, expectedSum] = GetParam();
+ auto runner = TParallelRunner<int>::CreateAsync(ActionQueue_->GetInvoker(), batchSize);
+ for (int index = 0; index < count; ++index) {
+ runner.Add(index);
+ }
+ std::atomic<int> sum = 0;
+ runner.Run([&] (int arg) {
+ sum += arg;
+ })
+ .Get()
+ .ThrowOnError();
+ EXPECT_EQ(sum.load(), expectedSum);
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ TAsyncParallelRunnerTest,
+ TAsyncParallelRunnerTest,
+ ::testing::Values(
+ std::tuple(1, 0, 0),
+ std::tuple(3, 10, 45),
+ std::tuple(3, 9, 36)));
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT:::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make
index 4abc7ebc0ea..ab8db2de4ca 100644
--- a/yt/yt/core/concurrency/unittests/ya.make
+++ b/yt/yt/core/concurrency/unittests/ya.make
@@ -26,6 +26,7 @@ SRCS(
invoker_pool_ut.cpp
nonblocking_batcher_ut.cpp
nonblocking_queue_ut.cpp
+ parallel_runner_ut.cpp
periodic_ut.cpp
profiled_fair_share_invoker_pool_ut.cpp
propagating_storage_ut.cpp