aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2025-01-20 22:40:49 +0300
committerbabenko <babenko@yandex-team.com>2025-01-20 23:26:30 +0300
commit6d1e12141d962a0b930005bd2019fe29aee28ace (patch)
tree113b102fbec35e7e4410d59453e5f7605de7a869
parent3278759bf794f497e25675f20e4fdb8deab0b87d (diff)
downloadydb-6d1e12141d962a0b930005bd2019fe29aee28ace.tar.gz
Renames around thread count for Quantized Executor
commit_hash:4205378633fa55367cc737db13587b8dd471fa7d
-rw-r--r--yt/yt/core/concurrency/quantized_executor.cpp36
-rw-r--r--yt/yt/core/concurrency/quantized_executor.h6
-rw-r--r--yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp30
3 files changed, 36 insertions, 36 deletions
diff --git a/yt/yt/core/concurrency/quantized_executor.cpp b/yt/yt/core/concurrency/quantized_executor.cpp
index 106215d5e6..5e859cc1d6 100644
--- a/yt/yt/core/concurrency/quantized_executor.cpp
+++ b/yt/yt/core/concurrency/quantized_executor.cpp
@@ -28,7 +28,7 @@ public:
, Logger(ConcurrencyLogger().WithTag("Executor: %v", Name_))
, ControlQueue_(New<TActionQueue>(Format("%vCtl", Name_)))
, ControlInvoker_(ControlQueue_->GetInvoker())
- , DesiredWorkerCount_(options.WorkerCount)
+ , DesiredThreadCount_(options.ThreadCount)
{
YT_ASSERT_INVOKER_THREAD_AFFINITY(ControlInvoker_, ControlThread);
}
@@ -42,11 +42,11 @@ public:
.Run();
}
- void Reconfigure(int workerCount) override
+ void SetThreadCount(int threadCount) override
{
YT_ASSERT_THREAD_AFFINITY_ANY();
- DesiredWorkerCount_.store(workerCount);
+ DesiredThreadCount_.store(threadCount);
}
private:
@@ -63,8 +63,8 @@ private:
std::vector<ISuspendableActionQueuePtr> Workers_;
std::vector<IInvokerPtr> Invokers_;
- std::atomic<int> ActiveWorkerCount_ = 0;
- std::atomic<int> DesiredWorkerCount_ = 0;
+ std::atomic<int> ActiveThreadCount_ = 0;
+ std::atomic<int> DesiredThreadCount_ = 0;
int QuantumIndex_ = 0;
@@ -80,21 +80,21 @@ private:
YT_ASSERT_THREAD_AFFINITY(ControlThread);
YT_VERIFY(!Running_);
- int desiredWorkerCount = DesiredWorkerCount_.load();
- if (ActiveWorkerCount_ == desiredWorkerCount) {
+ int desiredThreadCount = DesiredThreadCount_.load();
+ if (ActiveThreadCount_ == desiredThreadCount) {
return;
}
- int currentWorkerCount = std::ssize(Workers_);
+ int currentThreadCount = std::ssize(Workers_);
- YT_LOG_DEBUG("Updating worker count (WorkerCount: %v -> %v)",
- currentWorkerCount,
- desiredWorkerCount);
+ YT_LOG_DEBUG("Updating thread count (Count: %v -> %v)",
+ currentThreadCount,
+ desiredThreadCount);
- if (desiredWorkerCount > currentWorkerCount) {
- Workers_.reserve(desiredWorkerCount);
- Invokers_.reserve(desiredWorkerCount);
- for (int index = currentWorkerCount; index < desiredWorkerCount; ++index) {
+ if (desiredThreadCount > currentThreadCount) {
+ Workers_.reserve(desiredThreadCount);
+ Invokers_.reserve(desiredThreadCount);
+ for (int index = currentThreadCount; index < desiredThreadCount; ++index) {
auto worker = CreateSuspendableActionQueue(
/*threadName*/ Format("%v:%v", Name_, index),
{.ThreadInitializer = Options_.ThreadInitializer});
@@ -109,8 +109,8 @@ private:
}
}
- YT_VERIFY(std::ssize(Workers_) >= desiredWorkerCount);
- ActiveWorkerCount_ = desiredWorkerCount;
+ YT_VERIFY(std::ssize(Workers_) >= desiredThreadCount);
+ ActiveThreadCount_ = desiredThreadCount;
}
TFuture<void> StartQuantum(TDuration timeout)
@@ -200,7 +200,7 @@ private:
}
// Worker is disabled, do not schedule new callbacks to it.
- if (workerIndex >= ActiveWorkerCount_) {
+ if (workerIndex >= ActiveThreadCount_) {
return;
}
diff --git a/yt/yt/core/concurrency/quantized_executor.h b/yt/yt/core/concurrency/quantized_executor.h
index bc6c78bad1..d676e01570 100644
--- a/yt/yt/core/concurrency/quantized_executor.h
+++ b/yt/yt/core/concurrency/quantized_executor.h
@@ -36,8 +36,8 @@ struct IQuantizedExecutor
*/
virtual TFuture<void> Run(TDuration timeout) = 0;
- //! Updates the number of workers.
- virtual void Reconfigure(int workerCount) = 0;
+ //! Updates the number of threads.
+ virtual void SetThreadCount(int threadCount) = 0;
};
DEFINE_REFCOUNTED_TYPE(IQuantizedExecutor)
@@ -46,7 +46,7 @@ DEFINE_REFCOUNTED_TYPE(IQuantizedExecutor)
struct TQuantizedExecutorOptions
{
- int WorkerCount = 1;
+ int ThreadCount = 1;
std::function<void()> ThreadInitializer;
};
diff --git a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
index 4ddc546cdb..b684006a5b 100644
--- a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
@@ -127,22 +127,22 @@ protected:
TIntrusivePtr<TLongCallbackProvider> LongCallbackProvider_;
IQuantizedExecutorPtr Executor_;
- void InitSimple(int workerCount, i64 iterationCount)
+ void InitSimple(int threadCount, i64 iterationCount)
{
SimpleCallbackProvider_ = New<TSimpleCallbackProvider>(iterationCount);
- Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, {.WorkerCount = workerCount});
+ Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, {.ThreadCount = threadCount});
}
- void InitLong(int workerCount, i64 iterationCount)
+ void InitLong(int threadCount, i64 iterationCount)
{
LongCallbackProvider_ = New<TLongCallbackProvider>(iterationCount);
- Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, {.WorkerCount = workerCount});
+ Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, {.ThreadCount = threadCount});
}
};
TEST_F(TQuantizedExecutorTest, Simple)
{
- InitSimple(/*workerCount*/ 1, /*iterationCount*/ 100);
+ InitSimple(/*threadCount*/ 1, /*iterationCount*/ 100);
WaitFor(Executor_->Run(TDuration::Max()))
.ThrowOnError();
@@ -152,7 +152,7 @@ TEST_F(TQuantizedExecutorTest, Simple)
TEST_F(TQuantizedExecutorTest, Timeout)
{
- InitSimple(/*workerCount*/ 2, /*iterationCount*/ std::numeric_limits<i64>::max());
+ InitSimple(/*threadCount*/ 2, /*iterationCount*/ std::numeric_limits<i64>::max());
WaitFor(Executor_->Run(TDuration::MilliSeconds(100)))
.ThrowOnError();
@@ -168,7 +168,7 @@ TEST_F(TQuantizedExecutorTest, Timeout)
TEST_F(TQuantizedExecutorTest, LongCallback1)
{
- InitLong(/*workerCount*/ 4, /*iterationCount*/ 20);
+ InitLong(/*threadCount*/ 4, /*iterationCount*/ 20);
auto future = Executor_->Run(TDuration::MilliSeconds(500));
@@ -179,7 +179,7 @@ TEST_F(TQuantizedExecutorTest, LongCallback1)
TEST_F(TQuantizedExecutorTest, LongCallback2)
{
- InitLong(/*workerCount*/ 4, /*iterationCount*/ 100);
+ InitLong(/*threadCount*/ 4, /*iterationCount*/ 100);
for (int index = 0; index < 9; ++index) {
WaitFor(Executor_->Run(TDuration::MilliSeconds(100)))
@@ -194,12 +194,12 @@ TEST_F(TQuantizedExecutorTest, LongCallback2)
TEST_F(TQuantizedExecutorTest, Reconfigure)
{
- InitSimple(/*workerCount*/ 10, /*iterationCount*/ std::numeric_limits<i64>::max());
+ InitSimple(/*threadCount*/ 10, /*iterationCount*/ std::numeric_limits<i64>::max());
i64 lastCounter = 0;
- auto run = [&] (int workerCount) {
- Executor_->Reconfigure(workerCount);
+ auto run = [&] (int threadCount) {
+ Executor_->SetThreadCount(threadCount);
WaitFor(Executor_->Run(TDuration::MilliSeconds(100)))
.ThrowOnError();
@@ -214,14 +214,14 @@ TEST_F(TQuantizedExecutorTest, Reconfigure)
std::mt19937 rng(42);
for (int index = 0; index < 10; ++index) {
- auto workerCount = rng() % 5 + 1;
- auto increment = run(workerCount);
+ auto threadCount = rng() % 5 + 1;
+ auto increment = run(threadCount);
- EXPECT_LE(increment, workerCount * /*milliseconds*/ 100.0 / /*period*/ 5 * 1.25);
+ EXPECT_LE(increment, threadCount * /*milliseconds*/ 100.0 / /*period*/ 5 * 1.25);
}
}
-TEST_F(TQuantizedExecutorTest, WorkerInitializer)
+TEST_F(TQuantizedExecutorTest, ThreadInitializer)
{
auto callbackProvider = New<TInitializingCallbackProvider>();
EXPECT_FALSE(callbackProvider->IsFinished());