summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlukyan <[email protected]>2023-12-06 16:18:28 +0300
committerlukyan <[email protected]>2023-12-06 17:40:17 +0300
commit13ac4eedd17b4a12baf845326ff875fca8cb6314 (patch)
tree537b46fb7a9f7a1a1a978bb3f1c5cd7e1874cd7d
parent615507d23aef2046761324ef9a4b26d93ab75dd2 (diff)
YT-20519: Optimize execution pool removal in fair share thread pool
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.cpp44
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.h1
-rw-r--r--yt/yt/core/concurrency/thread_pool_poller.cpp3
-rw-r--r--yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp2
-rw-r--r--yt/yt/core/misc/mpsc_stack-inl.h53
-rw-r--r--yt/yt/core/misc/mpsc_stack.h8
-rw-r--r--yt/yt/core/misc/unittests/mpsc_stack_ut.cpp18
7 files changed, 103 insertions, 26 deletions
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
index 987fbb45396..3fcad5f9b4b 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
@@ -391,7 +391,9 @@ public:
, ThreadNamePrefix_(threadNamePrefix)
, Profiler_(TProfiler{"/fair_share_queue"}
.WithHot())
+ , CumulativeSchedulingTimeCounter_(Profiler_.TimeCounter("/time/scheduling_cumulative"))
, PoolWeightProvider_(options.PoolWeightProvider)
+ , PoolRetentionTime_(options.PoolRetentionTime)
, VerboseLogging_(options.VerboseLogging)
{ }
@@ -434,6 +436,9 @@ public:
}
}
+ // Using non atomic Pool pointer is safe because it is set once and RemoveBucket cannot be
+ // concurrently executed with ConsumeInvokeQueue.
+ // Pool is nullptr when bucket was created but no actions were invoked.
if (auto* pool = bucket->Pool) {
UnlinkBucketQueue_.Enqueue(pool);
}
@@ -566,7 +571,9 @@ private:
const TString ThreadNamePrefix_;
const TProfiler Profiler_;
+ const NProfiling::TTimeCounter CumulativeSchedulingTimeCounter_;
const IPoolWeightProviderPtr PoolWeightProvider_;
+ const TDuration PoolRetentionTime_;
const bool VerboseLogging_;
// TODO(lukyan): Sharded mapping.
@@ -591,7 +598,6 @@ private:
// Buffer to keep actions during distribution to threads.
std::array<TAction, TThreadPoolBase::MaxThreadCount> OtherActions_;
-
std::atomic<int> ThreadCount_ = 0;
std::atomic<int> ActiveThreads_ = 0;
@@ -615,7 +621,7 @@ private:
return mappingIt->second.get();
}
- void ConsumeInvokeQueue(TCpuInstant currentInstant)
+ Y_NO_INLINE void ConsumeInvokeQueue()
{
VERIFY_SPINLOCK_AFFINITY(MainLock_);
@@ -682,20 +688,24 @@ private:
WaitHeap_.Insert(&bucket->EnqueuedTime);
}
});
+ }
- UnlinkBucketQueue_.DequeueAll(false, [&] (TExecutionPool* pool) {
- YT_VERIFY(pool->BucketRefs > 0);
- if (--pool->BucketRefs == 0) {
+ Y_NO_INLINE void ProcessUnlinkedBuckets(TCpuInstant currentInstant)
+ {
+ UnlinkBucketQueue_.FilterElements([&] (TExecutionPool* pool) {
+ YT_ASSERT(pool->BucketRefs > 0);
+ if (pool->BucketRefs == 1) {
auto lastUsageTime = pool->LastUsageTime.load(std::memory_order_acquire);
- if (CpuDurationToDuration(currentInstant - lastUsageTime) > TDuration::Seconds(30)) {
- auto poolIt = PoolMapping_.find(pool->PoolName);
- YT_VERIFY(poolIt != PoolMapping_.end() && poolIt->second.get() == pool);
- PoolMapping_.erase(poolIt);
- } else {
- ++pool->BucketRefs;
- UnlinkBucketQueue_.Enqueue(pool);
+ if (CpuDurationToDuration(currentInstant - lastUsageTime) < PoolRetentionTime_) {
+ return true;
}
+ auto poolIt = PoolMapping_.find(pool->PoolName);
+ YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second.get() == pool);
+ PoolMapping_.erase(poolIt);
+ } else {
+ --pool->BucketRefs;
}
+ return false;
});
}
@@ -739,7 +749,7 @@ private:
threadState->BucketToUnref = std::move(bucket);
}
- void UpdateExcessTime(TBucket* bucket, TCpuDuration duration, TCpuInstant currentInstant)
+ Y_NO_INLINE void UpdateExcessTime(TBucket* bucket, TCpuDuration duration, TCpuInstant currentInstant)
{
VERIFY_SPINLOCK_AFFINITY(MainLock_);
@@ -771,7 +781,7 @@ private:
YT_ASSERT(!bucket->EnqueuedTime.GetPositionInHeap() == !bucket->GetPositionInHeap());
}
- bool GetStarvingBucket(TAction* action)
+ Y_NO_INLINE bool GetStarvingBucket(TAction* action)
{
VERIFY_SPINLOCK_AFFINITY(MainLock_);
@@ -873,7 +883,9 @@ private:
YT_LOG_TRACE("Consuming invoke queue");
- ConsumeInvokeQueue(currentInstant);
+ ConsumeInvokeQueue();
+
+ ProcessUnlinkedBuckets(currentInstant);
int fetchedActions = 0;
int otherActionCount = 0;
@@ -976,6 +988,8 @@ private:
action.BucketHolder->Pool->WaitTimeCounter.Record(waitTime);
ReportWaitTime(waitTime);
}
+
+ CumulativeSchedulingTimeCounter_.Add(CpuDurationToDuration(GetCpuInstant() - cpuInstant));
});
auto& request = threadState.Request;
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.h b/yt/yt/core/concurrency/new_fair_share_thread_pool.h
index 72fab955323..8be740969e8 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.h
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.h
@@ -13,6 +13,7 @@ struct TNewTwoLevelFairShareThreadPoolOptions
IPoolWeightProviderPtr PoolWeightProvider = nullptr;
bool VerboseLogging = false;
TDuration PollingPeriod = TDuration::MilliSeconds(10);
+ TDuration PoolRetentionTime = TDuration::Seconds(30);
};
ITwoLevelFairShareThreadPoolPtr CreateNewTwoLevelFairShareThreadPool(
diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp
index 35ee8698ab4..c7d93ffb957 100644
--- a/yt/yt/core/concurrency/thread_pool_poller.cpp
+++ b/yt/yt/core/concurrency/thread_pool_poller.cpp
@@ -191,7 +191,8 @@ public:
threadCount,
threadNamePrefix + "FS",
{
- .PollingPeriod = pollingPeriod
+ .PollingPeriod = pollingPeriod,
+ .PoolRetentionTime = TDuration::Zero()
});
AuxInvoker_ = FairShareThreadPool_->GetInvoker("aux", "default");
}
diff --git a/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp
index c225667c3a3..e04160aa3a8 100644
--- a/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp
@@ -23,7 +23,7 @@ void CheckTimeSlotCorrectness(const TDuration& interval)
{
const auto& now = TInstant::Now();
auto lastTick = TInstant::FromValue((now.GetValue() / interval.GetValue()) * interval.GetValue());
- YT_VERIFY(now - lastTick <= TDuration::MilliSeconds(10));
+ EXPECT_LE(now - lastTick, TDuration::MilliSeconds(10));
}
TEST_W(TScheduledExecutorTest, Simple)
diff --git a/yt/yt/core/misc/mpsc_stack-inl.h b/yt/yt/core/misc/mpsc_stack-inl.h
index 4f14d8ba5c4..e6316d03193 100644
--- a/yt/yt/core/misc/mpsc_stack-inl.h
+++ b/yt/yt/core/misc/mpsc_stack-inl.h
@@ -39,22 +39,24 @@ TMpscStack<T>::~TMpscStack()
template <class T>
void TMpscStack<T>::Enqueue(const T& value)
{
- DoEnqueue(new TNode(value));
+ auto node = new TNode(value);
+ DoEnqueue(node, node);
}
template <class T>
void TMpscStack<T>::Enqueue(T&& value)
{
- DoEnqueue(new TNode(std::move(value)));
+ auto node = new TNode(std::move(value));
+ DoEnqueue(node, node);
}
template <class T>
-void TMpscStack<T>::DoEnqueue(TNode* node)
+void TMpscStack<T>::DoEnqueue(TNode* head, TNode* tail)
{
auto* expected = Head_.load(std::memory_order::relaxed);
do {
- node->Next = expected;
- } while (!Head_.compare_exchange_weak(expected, node));
+ tail->Next = expected;
+ } while (!Head_.compare_exchange_weak(expected, head));
}
template <class T>
@@ -84,7 +86,7 @@ std::vector<T> TMpscStack<T>::DequeueAll(bool reverse)
template <class T>
template <class F>
-bool TMpscStack<T>::DequeueAll(bool reverse, F&& functor)
+bool TMpscStack<T>::DoDequeueAll(bool reverse, F&& functor)
{
auto* current = Head_.exchange(nullptr);
if (!current) {
@@ -101,15 +103,50 @@ bool TMpscStack<T>::DequeueAll(bool reverse, F&& functor)
}
}
while (current) {
- functor(current->Value);
auto* next = current->Next;
- delete current;
+ functor(current);
current = next;
}
return true;
}
template <class T>
+template <class F>
+bool TMpscStack<T>::DequeueAll(bool reverse, F&& functor)
+{
+ return DoDequeueAll(reverse, [&] (TNode* node) {
+ functor(node->Value);
+ delete node;
+ });
+}
+
+template <class T>
+template <class F>
+void TMpscStack<T>::FilterElements(F&& functor)
+{
+ TNode* filteredHead = nullptr;
+ TNode* filteredTail = nullptr;
+
+ DoDequeueAll(false, [&] (TNode* node) {
+ if (functor(node->Value)) {
+ node->Next = nullptr;
+ if (filteredTail) {
+ filteredTail->Next = node;
+ } else {
+ filteredHead = node;
+ }
+ filteredTail = node;
+ } else {
+ delete node;
+ }
+ });
+
+ if (filteredHead) {
+ DoEnqueue(filteredHead, filteredTail);
+ }
+}
+
+template <class T>
bool TMpscStack<T>::IsEmpty() const
{
return !Head_.load();
diff --git a/yt/yt/core/misc/mpsc_stack.h b/yt/yt/core/misc/mpsc_stack.h
index 7907c5ba145..e41b711e1d9 100644
--- a/yt/yt/core/misc/mpsc_stack.h
+++ b/yt/yt/core/misc/mpsc_stack.h
@@ -27,6 +27,9 @@ public:
template <class F>
bool DequeueAll(bool reverse, F&& functor);
+ template <class F>
+ void FilterElements(F&& functor);
+
bool IsEmpty() const;
private:
@@ -34,7 +37,10 @@ private:
std::atomic<TNode*> Head_ = nullptr;
- void DoEnqueue(TNode* node);
+ void DoEnqueue(TNode* head, TNode* tail);
+
+ template <class F>
+ bool DoDequeueAll(bool reverse, F&& functor);
};
/////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp b/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp
index fe3aa5878fd..903e9e16c7a 100644
--- a/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp
+++ b/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp
@@ -113,6 +113,24 @@ TEST(TMpscStackTest, ConcurrentTryDequeueAll)
EXPECT_TRUE(stack.IsEmpty());
}
+TEST(TMpscStackTest, DequeueFiltered)
+{
+ {
+ TMpscStack<int> stack;
+ for (int i = 0; i < 5; ++i) {
+ stack.Enqueue(i);
+ }
+
+ stack.FilterElements([] (int value) {
+ return value % 2 == 0;
+ });
+ EXPECT_FALSE(stack.IsEmpty());
+ auto values = stack.DequeueAll(/*reverse*/ false);
+
+ EXPECT_EQ(values, std::vector<int>({4, 2, 0}));
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace