diff options
author | babenko <babenko@yandex-team.com> | 2025-01-20 22:40:49 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-01-20 23:26:30 +0300 |
commit | 6d1e12141d962a0b930005bd2019fe29aee28ace (patch) | |
tree | 113b102fbec35e7e4410d59453e5f7605de7a869 | |
parent | 3278759bf794f497e25675f20e4fdb8deab0b87d (diff) | |
download | ydb-6d1e12141d962a0b930005bd2019fe29aee28ace.tar.gz |
Renames around thread count for Quantized Executor
commit_hash:4205378633fa55367cc737db13587b8dd471fa7d
-rw-r--r-- | yt/yt/core/concurrency/quantized_executor.cpp | 36 | ||||
-rw-r--r-- | yt/yt/core/concurrency/quantized_executor.h | 6 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp | 30 |
3 files changed, 36 insertions, 36 deletions
diff --git a/yt/yt/core/concurrency/quantized_executor.cpp b/yt/yt/core/concurrency/quantized_executor.cpp index 106215d5e6..5e859cc1d6 100644 --- a/yt/yt/core/concurrency/quantized_executor.cpp +++ b/yt/yt/core/concurrency/quantized_executor.cpp @@ -28,7 +28,7 @@ public: , Logger(ConcurrencyLogger().WithTag("Executor: %v", Name_)) , ControlQueue_(New<TActionQueue>(Format("%vCtl", Name_))) , ControlInvoker_(ControlQueue_->GetInvoker()) - , DesiredWorkerCount_(options.WorkerCount) + , DesiredThreadCount_(options.ThreadCount) { YT_ASSERT_INVOKER_THREAD_AFFINITY(ControlInvoker_, ControlThread); } @@ -42,11 +42,11 @@ public: .Run(); } - void Reconfigure(int workerCount) override + void SetThreadCount(int threadCount) override { YT_ASSERT_THREAD_AFFINITY_ANY(); - DesiredWorkerCount_.store(workerCount); + DesiredThreadCount_.store(threadCount); } private: @@ -63,8 +63,8 @@ private: std::vector<ISuspendableActionQueuePtr> Workers_; std::vector<IInvokerPtr> Invokers_; - std::atomic<int> ActiveWorkerCount_ = 0; - std::atomic<int> DesiredWorkerCount_ = 0; + std::atomic<int> ActiveThreadCount_ = 0; + std::atomic<int> DesiredThreadCount_ = 0; int QuantumIndex_ = 0; @@ -80,21 +80,21 @@ private: YT_ASSERT_THREAD_AFFINITY(ControlThread); YT_VERIFY(!Running_); - int desiredWorkerCount = DesiredWorkerCount_.load(); - if (ActiveWorkerCount_ == desiredWorkerCount) { + int desiredThreadCount = DesiredThreadCount_.load(); + if (ActiveThreadCount_ == desiredThreadCount) { return; } - int currentWorkerCount = std::ssize(Workers_); + int currentThreadCount = std::ssize(Workers_); - YT_LOG_DEBUG("Updating worker count (WorkerCount: %v -> %v)", - currentWorkerCount, - desiredWorkerCount); + YT_LOG_DEBUG("Updating thread count (Count: %v -> %v)", + currentThreadCount, + desiredThreadCount); - if (desiredWorkerCount > currentWorkerCount) { - Workers_.reserve(desiredWorkerCount); - Invokers_.reserve(desiredWorkerCount); - for (int index = currentWorkerCount; index < desiredWorkerCount; ++index) { + if (desiredThreadCount > currentThreadCount) { + Workers_.reserve(desiredThreadCount); + Invokers_.reserve(desiredThreadCount); + for (int index = currentThreadCount; index < desiredThreadCount; ++index) { auto worker = CreateSuspendableActionQueue( /*threadName*/ Format("%v:%v", Name_, index), {.ThreadInitializer = Options_.ThreadInitializer}); @@ -109,8 +109,8 @@ private: } } - YT_VERIFY(std::ssize(Workers_) >= desiredWorkerCount); - ActiveWorkerCount_ = desiredWorkerCount; + YT_VERIFY(std::ssize(Workers_) >= desiredThreadCount); + ActiveThreadCount_ = desiredThreadCount; } TFuture<void> StartQuantum(TDuration timeout) @@ -200,7 +200,7 @@ private: } // Worker is disabled, do not schedule new callbacks to it. - if (workerIndex >= ActiveWorkerCount_) { + if (workerIndex >= ActiveThreadCount_) { return; } diff --git a/yt/yt/core/concurrency/quantized_executor.h b/yt/yt/core/concurrency/quantized_executor.h index bc6c78bad1..d676e01570 100644 --- a/yt/yt/core/concurrency/quantized_executor.h +++ b/yt/yt/core/concurrency/quantized_executor.h @@ -36,8 +36,8 @@ struct IQuantizedExecutor */ virtual TFuture<void> Run(TDuration timeout) = 0; - //! Updates the number of workers. - virtual void Reconfigure(int workerCount) = 0; + //! Updates the number of threads. + virtual void SetThreadCount(int threadCount) = 0; }; DEFINE_REFCOUNTED_TYPE(IQuantizedExecutor) @@ -46,7 +46,7 @@ DEFINE_REFCOUNTED_TYPE(IQuantizedExecutor) struct TQuantizedExecutorOptions { - int WorkerCount = 1; + int ThreadCount = 1; std::function<void()> ThreadInitializer; }; diff --git a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp index 4ddc546cdb..b684006a5b 100644 --- a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp +++ b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp @@ -127,22 +127,22 @@ protected: TIntrusivePtr<TLongCallbackProvider> LongCallbackProvider_; IQuantizedExecutorPtr Executor_; - void InitSimple(int workerCount, i64 iterationCount) + void InitSimple(int threadCount, i64 iterationCount) { SimpleCallbackProvider_ = New<TSimpleCallbackProvider>(iterationCount); - Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, {.WorkerCount = workerCount}); + Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, {.ThreadCount = threadCount}); } - void InitLong(int workerCount, i64 iterationCount) + void InitLong(int threadCount, i64 iterationCount) { LongCallbackProvider_ = New<TLongCallbackProvider>(iterationCount); - Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, {.WorkerCount = workerCount}); + Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, {.ThreadCount = threadCount}); } }; TEST_F(TQuantizedExecutorTest, Simple) { - InitSimple(/*workerCount*/ 1, /*iterationCount*/ 100); + InitSimple(/*threadCount*/ 1, /*iterationCount*/ 100); WaitFor(Executor_->Run(TDuration::Max())) .ThrowOnError(); @@ -152,7 +152,7 @@ TEST_F(TQuantizedExecutorTest, Simple) TEST_F(TQuantizedExecutorTest, Timeout) { - InitSimple(/*workerCount*/ 2, /*iterationCount*/ std::numeric_limits<i64>::max()); + InitSimple(/*threadCount*/ 2, /*iterationCount*/ std::numeric_limits<i64>::max()); WaitFor(Executor_->Run(TDuration::MilliSeconds(100))) .ThrowOnError(); @@ -168,7 +168,7 @@ TEST_F(TQuantizedExecutorTest, Timeout) TEST_F(TQuantizedExecutorTest, LongCallback1) { - InitLong(/*workerCount*/ 4, /*iterationCount*/ 20); + InitLong(/*threadCount*/ 4, /*iterationCount*/ 20); auto future = Executor_->Run(TDuration::MilliSeconds(500)); @@ -179,7 +179,7 @@ TEST_F(TQuantizedExecutorTest, LongCallback1) TEST_F(TQuantizedExecutorTest, LongCallback2) { - InitLong(/*workerCount*/ 4, /*iterationCount*/ 100); + InitLong(/*threadCount*/ 4, /*iterationCount*/ 100); for (int index = 0; index < 9; ++index) { WaitFor(Executor_->Run(TDuration::MilliSeconds(100))) @@ -194,12 +194,12 @@ TEST_F(TQuantizedExecutorTest, LongCallback2) TEST_F(TQuantizedExecutorTest, Reconfigure) { - InitSimple(/*workerCount*/ 10, /*iterationCount*/ std::numeric_limits<i64>::max()); + InitSimple(/*threadCount*/ 10, /*iterationCount*/ std::numeric_limits<i64>::max()); i64 lastCounter = 0; - auto run = [&] (int workerCount) { - Executor_->Reconfigure(workerCount); + auto run = [&] (int threadCount) { + Executor_->SetThreadCount(threadCount); WaitFor(Executor_->Run(TDuration::MilliSeconds(100))) .ThrowOnError(); @@ -214,14 +214,14 @@ TEST_F(TQuantizedExecutorTest, Reconfigure) std::mt19937 rng(42); for (int index = 0; index < 10; ++index) { - auto workerCount = rng() % 5 + 1; - auto increment = run(workerCount); + auto threadCount = rng() % 5 + 1; + auto increment = run(threadCount); - EXPECT_LE(increment, workerCount * /*milliseconds*/ 100.0 / /*period*/ 5 * 1.25); + EXPECT_LE(increment, threadCount * /*milliseconds*/ 100.0 / /*period*/ 5 * 1.25); } } -TEST_F(TQuantizedExecutorTest, WorkerInitializer) +TEST_F(TQuantizedExecutorTest, ThreadInitializer) { auto callbackProvider = New<TInitializingCallbackProvider>(); EXPECT_FALSE(callbackProvider->IsFinished()); |