diff options
| author | lukyan <[email protected]> | 2023-12-06 16:18:28 +0300 |
|---|---|---|
| committer | lukyan <[email protected]> | 2023-12-06 17:40:17 +0300 |
| commit | 13ac4eedd17b4a12baf845326ff875fca8cb6314 (patch) | |
| tree | 537b46fb7a9f7a1a1a978bb3f1c5cd7e1874cd7d | |
| parent | 615507d23aef2046761324ef9a4b26d93ab75dd2 (diff) | |
YT-20519: Optimize execution pool removal in fair share thread pool
| -rw-r--r-- | yt/yt/core/concurrency/new_fair_share_thread_pool.cpp | 44 | ||||
| -rw-r--r-- | yt/yt/core/concurrency/new_fair_share_thread_pool.h | 1 | ||||
| -rw-r--r-- | yt/yt/core/concurrency/thread_pool_poller.cpp | 3 | ||||
| -rw-r--r-- | yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/core/misc/mpsc_stack-inl.h | 53 | ||||
| -rw-r--r-- | yt/yt/core/misc/mpsc_stack.h | 8 | ||||
| -rw-r--r-- | yt/yt/core/misc/unittests/mpsc_stack_ut.cpp | 18 |
7 files changed, 103 insertions, 26 deletions
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index 987fbb45396..3fcad5f9b4b 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -391,7 +391,9 @@ public: , ThreadNamePrefix_(threadNamePrefix) , Profiler_(TProfiler{"/fair_share_queue"} .WithHot()) + , CumulativeSchedulingTimeCounter_(Profiler_.TimeCounter("/time/scheduling_cumulative")) , PoolWeightProvider_(options.PoolWeightProvider) + , PoolRetentionTime_(options.PoolRetentionTime) , VerboseLogging_(options.VerboseLogging) { } @@ -434,6 +436,9 @@ public: } } + // Using non atomic Pool pointer is safe because it is set once and RemoveBucket cannot be + // concurrently executed with ConsumeInvokeQueue. + // Pool is nullptr when bucket was created but no actions were invoked. if (auto* pool = bucket->Pool) { UnlinkBucketQueue_.Enqueue(pool); } @@ -566,7 +571,9 @@ private: const TString ThreadNamePrefix_; const TProfiler Profiler_; + const NProfiling::TTimeCounter CumulativeSchedulingTimeCounter_; const IPoolWeightProviderPtr PoolWeightProvider_; + const TDuration PoolRetentionTime_; const bool VerboseLogging_; // TODO(lukyan): Sharded mapping. @@ -591,7 +598,6 @@ private: // Buffer to keep actions during distribution to threads. std::array<TAction, TThreadPoolBase::MaxThreadCount> OtherActions_; - std::atomic<int> ThreadCount_ = 0; std::atomic<int> ActiveThreads_ = 0; @@ -615,7 +621,7 @@ private: return mappingIt->second.get(); } - void ConsumeInvokeQueue(TCpuInstant currentInstant) + Y_NO_INLINE void ConsumeInvokeQueue() { VERIFY_SPINLOCK_AFFINITY(MainLock_); @@ -682,20 +688,24 @@ private: WaitHeap_.Insert(&bucket->EnqueuedTime); } }); + } - UnlinkBucketQueue_.DequeueAll(false, [&] (TExecutionPool* pool) { - YT_VERIFY(pool->BucketRefs > 0); - if (--pool->BucketRefs == 0) { + Y_NO_INLINE void ProcessUnlinkedBuckets(TCpuInstant currentInstant) + { + UnlinkBucketQueue_.FilterElements([&] (TExecutionPool* pool) { + YT_ASSERT(pool->BucketRefs > 0); + if (pool->BucketRefs == 1) { auto lastUsageTime = pool->LastUsageTime.load(std::memory_order_acquire); - if (CpuDurationToDuration(currentInstant - lastUsageTime) > TDuration::Seconds(30)) { - auto poolIt = PoolMapping_.find(pool->PoolName); - YT_VERIFY(poolIt != PoolMapping_.end() && poolIt->second.get() == pool); - PoolMapping_.erase(poolIt); - } else { - ++pool->BucketRefs; - UnlinkBucketQueue_.Enqueue(pool); + if (CpuDurationToDuration(currentInstant - lastUsageTime) < PoolRetentionTime_) { + return true; } + auto poolIt = PoolMapping_.find(pool->PoolName); + YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second.get() == pool); + PoolMapping_.erase(poolIt); + } else { + --pool->BucketRefs; } + return false; }); } @@ -739,7 +749,7 @@ private: threadState->BucketToUnref = std::move(bucket); } - void UpdateExcessTime(TBucket* bucket, TCpuDuration duration, TCpuInstant currentInstant) + Y_NO_INLINE void UpdateExcessTime(TBucket* bucket, TCpuDuration duration, TCpuInstant currentInstant) { VERIFY_SPINLOCK_AFFINITY(MainLock_); @@ -771,7 +781,7 @@ private: YT_ASSERT(!bucket->EnqueuedTime.GetPositionInHeap() == !bucket->GetPositionInHeap()); } - bool GetStarvingBucket(TAction* action) + Y_NO_INLINE bool GetStarvingBucket(TAction* action) { VERIFY_SPINLOCK_AFFINITY(MainLock_); @@ -873,7 +883,9 @@ private: YT_LOG_TRACE("Consuming invoke queue"); - ConsumeInvokeQueue(currentInstant); + ConsumeInvokeQueue(); + + ProcessUnlinkedBuckets(currentInstant); int fetchedActions = 0; int otherActionCount = 0; @@ -976,6 +988,8 @@ private: action.BucketHolder->Pool->WaitTimeCounter.Record(waitTime); ReportWaitTime(waitTime); } + + CumulativeSchedulingTimeCounter_.Add(CpuDurationToDuration(GetCpuInstant() - cpuInstant)); }); auto& request = threadState.Request; diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.h b/yt/yt/core/concurrency/new_fair_share_thread_pool.h index 72fab955323..8be740969e8 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.h +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.h @@ -13,6 +13,7 @@ struct TNewTwoLevelFairShareThreadPoolOptions IPoolWeightProviderPtr PoolWeightProvider = nullptr; bool VerboseLogging = false; TDuration PollingPeriod = TDuration::MilliSeconds(10); + TDuration PoolRetentionTime = TDuration::Seconds(30); }; ITwoLevelFairShareThreadPoolPtr CreateNewTwoLevelFairShareThreadPool( diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp index 35ee8698ab4..c7d93ffb957 100644 --- a/yt/yt/core/concurrency/thread_pool_poller.cpp +++ b/yt/yt/core/concurrency/thread_pool_poller.cpp @@ -191,7 +191,8 @@ public: threadCount, threadNamePrefix + "FS", { - .PollingPeriod = pollingPeriod + .PollingPeriod = pollingPeriod, + .PoolRetentionTime = TDuration::Zero() }); AuxInvoker_ = FairShareThreadPool_->GetInvoker("aux", "default"); } diff --git a/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp index c225667c3a3..e04160aa3a8 100644 --- a/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp @@ -23,7 +23,7 @@ void CheckTimeSlotCorrectness(const TDuration& interval) { const auto& now = TInstant::Now(); auto lastTick = TInstant::FromValue((now.GetValue() / interval.GetValue()) * interval.GetValue()); - YT_VERIFY(now - lastTick <= TDuration::MilliSeconds(10)); + EXPECT_LE(now - lastTick, TDuration::MilliSeconds(10)); } TEST_W(TScheduledExecutorTest, Simple) diff --git a/yt/yt/core/misc/mpsc_stack-inl.h b/yt/yt/core/misc/mpsc_stack-inl.h index 4f14d8ba5c4..e6316d03193 100644 --- a/yt/yt/core/misc/mpsc_stack-inl.h +++ b/yt/yt/core/misc/mpsc_stack-inl.h @@ -39,22 +39,24 @@ TMpscStack<T>::~TMpscStack() template <class T> void TMpscStack<T>::Enqueue(const T& value) { - DoEnqueue(new TNode(value)); + auto node = new TNode(value); + DoEnqueue(node, node); } template <class T> void TMpscStack<T>::Enqueue(T&& value) { - DoEnqueue(new TNode(std::move(value))); + auto node = new TNode(std::move(value)); + DoEnqueue(node, node); } template <class T> -void TMpscStack<T>::DoEnqueue(TNode* node) +void TMpscStack<T>::DoEnqueue(TNode* head, TNode* tail) { auto* expected = Head_.load(std::memory_order::relaxed); do { - node->Next = expected; - } while (!Head_.compare_exchange_weak(expected, node)); + tail->Next = expected; + } while (!Head_.compare_exchange_weak(expected, head)); } template <class T> @@ -84,7 +86,7 @@ std::vector<T> TMpscStack<T>::DequeueAll(bool reverse) template <class T> template <class F> -bool TMpscStack<T>::DequeueAll(bool reverse, F&& functor) +bool TMpscStack<T>::DoDequeueAll(bool reverse, F&& functor) { auto* current = Head_.exchange(nullptr); if (!current) { @@ -101,15 +103,50 @@ bool TMpscStack<T>::DequeueAll(bool reverse, F&& functor) } } while (current) { - functor(current->Value); auto* next = current->Next; - delete current; + functor(current); current = next; } return true; } template <class T> +template <class F> +bool TMpscStack<T>::DequeueAll(bool reverse, F&& functor) +{ + return DoDequeueAll(reverse, [&] (TNode* node) { + functor(node->Value); + delete node; + }); +} + +template <class T> +template <class F> +void TMpscStack<T>::FilterElements(F&& functor) +{ + TNode* filteredHead = nullptr; + TNode* filteredTail = nullptr; + + DoDequeueAll(false, [&] (TNode* node) { + if (functor(node->Value)) { + node->Next = nullptr; + if (filteredTail) { + filteredTail->Next = node; + } else { + filteredHead = node; + } + filteredTail = node; + } else { + delete node; + } + }); + + if (filteredHead) { + DoEnqueue(filteredHead, filteredTail); + } +} + +template <class T> bool TMpscStack<T>::IsEmpty() const { return !Head_.load(); diff --git a/yt/yt/core/misc/mpsc_stack.h b/yt/yt/core/misc/mpsc_stack.h index 7907c5ba145..e41b711e1d9 100644 --- a/yt/yt/core/misc/mpsc_stack.h +++ b/yt/yt/core/misc/mpsc_stack.h @@ -27,6 +27,9 @@ public: template <class F> bool DequeueAll(bool reverse, F&& functor); + template <class F> + void FilterElements(F&& functor); + bool IsEmpty() const; private: @@ -34,7 +37,10 @@ private: std::atomic<TNode*> Head_ = nullptr; - void DoEnqueue(TNode* node); + void DoEnqueue(TNode* head, TNode* tail); + + template <class F> + bool DoDequeueAll(bool reverse, F&& functor); }; ///////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp b/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp index fe3aa5878fd..903e9e16c7a 100644 --- a/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp +++ b/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp @@ -113,6 +113,24 @@ TEST(TMpscStackTest, ConcurrentTryDequeueAll) EXPECT_TRUE(stack.IsEmpty()); } +TEST(TMpscStackTest, DequeueFiltered) +{ + { + TMpscStack<int> stack; + for (int i = 0; i < 5; ++i) { + stack.Enqueue(i); + } + + stack.FilterElements([] (int value) { + return value % 2 == 0; + }); + EXPECT_FALSE(stack.IsEmpty()); + auto values = stack.DequeueAll(/*reverse*/ false); + + EXPECT_EQ(values, std::vector<int>({4, 2, 0})); + } +} + //////////////////////////////////////////////////////////////////////////////// } // namespace |
