diff options
author | lukyan <lukyan@yandex-team.com> | 2023-11-28 11:24:01 +0300 |
---|---|---|
committer | lukyan <lukyan@yandex-team.com> | 2023-11-28 12:32:31 +0300 |
commit | 2d9b16afa033b65670357fceee780f89297e2b68 (patch) | |
tree | b7cbe777db4d005c954bd8cb78236225e6640b34 | |
parent | 38b378a33c9814cc1cf654beceb92bf6fb0c7c2c (diff) | |
download | ydb-2d9b16afa033b65670357fceee780f89297e2b68.tar.gz |
YT-20430: Fair share bus and skip redundant events from poller
-rw-r--r-- | yt/yt/core/bus/tcp/connection.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/new_fair_share_thread_pool.cpp | 22 | ||||
-rw-r--r-- | yt/yt/core/concurrency/new_fair_share_thread_pool.h | 10 | ||||
-rw-r--r-- | yt/yt/core/concurrency/poller.h | 3 | ||||
-rw-r--r-- | yt/yt/core/concurrency/thread_pool_poller.cpp | 242 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/scheduler_ut.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/net/connection.cpp | 3 | ||||
-rw-r--r-- | yt/yt/core/net/listener.cpp | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp | 5 |
9 files changed, 178 insertions, 116 deletions
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index f1d6e6c39c..d3168a89f5 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -879,7 +879,7 @@ void TTcpConnection::OnEvent(EPollControl control) YT_ASSERT(Any(previousPendingControl & EPollControl::Running)); if (Any(previousPendingControl & ~EPollControl::Running) && None(previousPendingControl & EPollControl::Shutdown)) { YT_LOG_TRACE("Retrying event processing for OnEvent (PendingControl: %v)", previousPendingControl); - Poller_->Retry(this, false); + Poller_->Retry(this); } } } diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index 2bf6585fd1..987fbb4539 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -1,4 +1,5 @@ #include "two_level_fair_share_thread_pool.h" +#include "new_fair_share_thread_pool.h" #include "private.h" #include "notify_manager.h" #include "profiling_helpers.h" @@ -385,14 +386,13 @@ public: TTwoLevelFairShareQueue( TIntrusivePtr<NThreading::TEventCount> callbackEventCount, const TString& threadNamePrefix, - IPoolWeightProviderPtr poolWeightProvider, - bool verboseLogging) - : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), TDuration::MilliSeconds(10)) + const TNewTwoLevelFairShareThreadPoolOptions& options) + : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), options.PollingPeriod) , ThreadNamePrefix_(threadNamePrefix) , Profiler_(TProfiler{"/fair_share_queue"} .WithHot()) - , PoolWeightProvider_(std::move(poolWeightProvider)) - , VerboseLogging_(verboseLogging) + , PoolWeightProvider_(options.PoolWeightProvider) + , VerboseLogging_(options.VerboseLogging) { } ~TTwoLevelFairShareQueue() @@ -1104,14 +1104,12 @@ public: TTwoLevelFairShareThreadPool( int threadCount, const TString& threadNamePrefix, - IPoolWeightProviderPtr poolWeightProvider, - bool verboseLogging) + const TNewTwoLevelFairShareThreadPoolOptions& options) : TThreadPoolBase(threadNamePrefix) , Queue_(New<TTwoLevelFairShareQueue>( CallbackEventCount_, ThreadNamePrefix_, - std::move(poolWeightProvider), - verboseLogging)) + options)) { Configure(threadCount); } @@ -1190,14 +1188,12 @@ private: ITwoLevelFairShareThreadPoolPtr CreateNewTwoLevelFairShareThreadPool( int threadCount, const TString& threadNamePrefix, - IPoolWeightProviderPtr poolWeightProvider = nullptr, - bool verboseLogging = false) + const TNewTwoLevelFairShareThreadPoolOptions& options) { return New<TTwoLevelFairShareThreadPool>( threadCount, threadNamePrefix, - std::move(poolWeightProvider), - verboseLogging); + options); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.h b/yt/yt/core/concurrency/new_fair_share_thread_pool.h index abc8c74fab..72fab95532 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.h +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.h @@ -8,11 +8,17 @@ namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// +struct TNewTwoLevelFairShareThreadPoolOptions +{ + IPoolWeightProviderPtr PoolWeightProvider = nullptr; + bool VerboseLogging = false; + TDuration PollingPeriod = TDuration::MilliSeconds(10); +}; + ITwoLevelFairShareThreadPoolPtr CreateNewTwoLevelFairShareThreadPool( int threadCount, const TString& threadNamePrefix, - IPoolWeightProviderPtr poolWeightProvider = nullptr, - bool verboseLogging = false); + const TNewTwoLevelFairShareThreadPoolOptions& options = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/poller.h b/yt/yt/core/concurrency/poller.h index 8a2c2e3a97..d9c25295bb 100644 --- a/yt/yt/core/concurrency/poller.h +++ b/yt/yt/core/concurrency/poller.h @@ -106,8 +106,7 @@ struct IPoller virtual void Arm(TFileDescriptor fd, const IPollablePtr& pollable, EPollControl control) = 0; //! Schedule call of #IPollable::OnEvent with EPollControl::Retry. - //! From OnEvent could be called with wakeup = false to not wake new thread. - virtual void Retry(const IPollablePtr& pollable, bool wakeup = true) = 0; + virtual void Retry(const IPollablePtr& pollable) = 0; //! Unarms the poller. virtual void Unarm(TFileDescriptor fd, const IPollablePtr& pollable) = 0; diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp index e6df58c65e..35ee8698ab 100644 --- a/yt/yt/core/concurrency/thread_pool_poller.cpp +++ b/yt/yt/core/concurrency/thread_pool_poller.cpp @@ -4,6 +4,8 @@ #include "private.h" #include "profiling_helpers.h" #include "scheduler_thread.h" +#include "two_level_fair_share_thread_pool.h" +#include "new_fair_share_thread_pool.h" #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/misc/mpsc_stack.h> @@ -36,9 +38,84 @@ class TThreadPoolPoller; namespace { +class TCookieState +{ +public: + // AquireControl is called from poller thread. + bool AquireControl(ui32 control) + { + auto currentState = State_.load(); + do { + if (currentState & UnregisterFlag) { + return false; + } + + if ((static_cast<ui32>(currentState) & control) == control) { + return false; + } + } while (!State_.compare_exchange_weak(currentState, (currentState | static_cast<ui64>(control)) + RefValue)); + + return true; + } + + void ResetControl(ui32 control) + { + auto currentState = State_.load(); + do { + if (!(static_cast<ui32>(currentState) & control)) { + break; + } + } while (!State_.compare_exchange_weak(currentState, currentState & ~static_cast<ui64>(control))); + } + + // Returns destroy flag. + bool SetUnregisterFlag() + { + auto currentState = State_.load(); + do { + // Pollable has been already unregistered. + if (currentState & UnregisterFlag) { + return false; + } + } while (!State_.compare_exchange_weak(currentState, currentState | UnregisterFlag)); + + // No refs. + return currentState == 0; + } + + // Returns destroy flag. + bool ReleaseRef() + { + auto prevState = State_.fetch_sub(RefValue); + + YT_VERIFY(prevState >= RefValue); + auto currentState = prevState - RefValue; + if ((currentState >> ControlShift) == 1) { + // Verify that control flags are empty when there are no refs and unregister flag is set. + YT_VERIFY(static_cast<ui32>(currentState) == 0); + } + + return prevState == (RefValue | UnregisterFlag); + } + +private: + static constexpr auto ControlShift = sizeof(ui32) * 8; + static constexpr ui64 UnregisterFlag = 1ULL << ControlShift; + static constexpr ui64 RefValue = UnregisterFlag * 2; + // No contention expected when accessing this atomic variable. + // So we can safely (regarding to performance) use CAS. + std::atomic<ui64> State_ = 0; +}; + struct TPollableCookie : public TRefCounted + , public TCookieState { + const TPromise<void> UnregisterPromise = NewPromise<void>(); + + TIntrusivePtr<TThreadPoolPoller> PollerThread; + IInvokerPtr Invoker; + explicit TPollableCookie(TThreadPoolPoller* pollerThread) : PollerThread(pollerThread) { } @@ -54,12 +131,6 @@ struct TPollableCookie YT_VERIFY(cookie); return cookie; } - - TThreadPoolPoller* const PollerThread = nullptr; - - // Active event count is equal to 2 * (active events) + (1 for unregister flag). - std::atomic<int> ActiveEventCount = 1; - const TPromise<void> UnregisterPromise = NewPromise<void>(); }; EContPoll ToImplControl(EPollControl control) @@ -98,28 +169,6 @@ EPollControl FromImplControl(int implControl) return control; } -EThreadPriority PollablePriorityToThreadPriority(EPollablePriority priority) -{ - switch (priority) { - case EPollablePriority::RealTime: - return EThreadPriority::RealTime; - - default: - return EThreadPriority::Normal; - } -} - -TString PollablePriorityToPollerThreadNameSuffix(EPollablePriority priority) -{ - switch (priority) { - case EPollablePriority::RealTime: - return "RT"; - - default: - return ""; - } -} - } // namespace class TThreadPoolPoller @@ -134,33 +183,36 @@ public: : TThread(Format("%v:%v", threadNamePrefix, "Poll")) , Logger(ConcurrencyLogger.WithTag("ThreadNamePrefix: %v", threadNamePrefix)) { + // Register auxilary notifictation handle to wake up poller thread when deregistering + // pollables and on shutdown. PollerImpl_.Set(nullptr, WakeupHandle_.GetFD(), CONT_POLL_EDGE_TRIGGERED | CONT_POLL_READ); - for (auto priority : TEnumTraits<EPollablePriority>::GetDomainValues()) { - HandlerThreadPool_[priority] = CreateThreadPool( - threadCount, - threadNamePrefix + PollablePriorityToPollerThreadNameSuffix(priority), - PollablePriorityToThreadPriority(priority), - pollingPeriod); - HandlerInvoker_[priority] = HandlerThreadPool_[priority]->GetInvoker(); - } + FairShareThreadPool_ = CreateNewTwoLevelFairShareThreadPool( + threadCount, + threadNamePrefix + "FS", + { + .PollingPeriod = pollingPeriod + }); + AuxInvoker_ = FairShareThreadPool_->GetInvoker("aux", "default"); } void Reconfigure(int threadCount) override { - for (auto priority : TEnumTraits<EPollablePriority>::GetDomainValues()) { - HandlerThreadPool_[priority]->Configure(threadCount); - } + FairShareThreadPool_->Configure(threadCount); } - // TODO(lukyan): Remove TryRegister and Unregister. Do it in Arm/Unarm. bool TryRegister(const IPollablePtr& pollable) override { + // FIXME(lukyan): Enqueueing in register queue may happen after stopping. + // Create cookie when dequeueing from register queue? + // How to prevent arming FD when stopping. if (IsStopping()) { return false; } - pollable->SetCookie(New<TPollableCookie>(this)); + auto cookie = New<TPollableCookie>(this); + cookie->Invoker = FairShareThreadPool_->GetInvoker("main", Format("%v", pollable.Get())); + pollable->SetCookie(std::move(cookie)); RegisterQueue_.Enqueue(pollable); YT_LOG_DEBUG("Pollable registered (%v)", @@ -189,6 +241,7 @@ public: fd, control, pollable->GetLoggingTag()); + PollerImpl_.Set(pollable.Get(), fd, ToImplControl(control)); } @@ -199,16 +252,17 @@ public: PollerImpl_.Remove(fd); } - void Retry(const IPollablePtr& pollable, bool /*wakeup*/) override + void Retry(const IPollablePtr& pollable) override { if (auto guard = TryAcquireRunEventGuard(pollable.Get(), EPollControl::Retry)) { - HandlerInvoker_[pollable->GetPriority()]->Invoke(BIND(std::move(guard))); + auto* cookie = TPollableCookie::FromPollable(pollable.Get()); + cookie->Invoker->Invoke(BIND(std::move(guard))); } } IInvokerPtr GetInvoker() const override { - return HandlerInvoker_[EPollablePriority::Normal]; + return AuxInvoker_; } void Shutdown() override @@ -243,7 +297,7 @@ private: { if (Pollable_) { // This is unlikely but might happen on thread pool termination. - GetFinalizerInvoker()->Invoke(BIND(&Destroy, Unretained(Pollable_))); + GetFinalizerInvoker()->Invoke(BIND(&ResetAndDestroy, Unretained(Pollable_), Control_)); } } @@ -254,6 +308,9 @@ private: void operator()() { + auto* cookie = TPollableCookie::FromPollable(Pollable_); + cookie->ResetControl(ToUnderlying(Control_)); + Pollable_->OnEvent(Control_); Destroy(Pollable_); Pollable_ = nullptr; @@ -266,21 +323,28 @@ private: static void Destroy(IPollable* pollable) { auto* cookie = TPollableCookie::FromPollable(pollable); - auto activeEventCount = cookie->ActiveEventCount.fetch_sub(2) - 2; - if (activeEventCount == 0) { + if (cookie->ReleaseRef()) { pollable->OnShutdown(); cookie->UnregisterPromise.Set(); - auto pollerThread = MakeStrong(cookie->PollerThread); + cookie->Invoker.Reset(); + auto pollerThread = std::move(cookie->PollerThread); pollerThread->UnregisterQueue_.Enqueue(pollable); pollerThread->WakeupHandle_.Raise(); } } + + static void ResetAndDestroy(IPollable* pollable, EPollControl control) + { + auto* cookie = TPollableCookie::FromPollable(pollable); + cookie->ResetControl(ToUnderlying(control)); + Destroy(pollable); + } }; const NLogging::TLogger Logger; - TEnumIndexedVector<EPollablePriority, IThreadPoolPtr> HandlerThreadPool_; - TEnumIndexedVector<EPollablePriority, IInvokerPtr> HandlerInvoker_; + ITwoLevelFairShareThreadPoolPtr FairShareThreadPool_; + IInvokerPtr AuxInvoker_; // Only makes sense for "select" backend. struct TMutexLocking @@ -298,64 +362,47 @@ private: std::array<TPollerImpl::TEvent, MaxEventsPerPoll> PooledImplEvents_; - TEnumIndexedVector<EPollablePriority, std::vector<TClosure>> Callbacks_; - static TRunEventGuard TryAcquireRunEventGuard(IPollable* pollable, EPollControl control) { auto* cookie = TPollableCookie::FromPollable(pollable); YT_VERIFY(cookie->GetRefCount() > 0); - auto oldEventCount = cookie->ActiveEventCount.fetch_add(2); - if (oldEventCount & 1) { - return TRunEventGuard(pollable, control); + if (cookie->AquireControl(ToUnderlying(control))) { + return {pollable, control}; } - cookie->ActiveEventCount.fetch_sub(2); - return TRunEventGuard(); + return {}; } - bool DoUnregister(const IPollablePtr& pollable) + void DoUnregister(const IPollablePtr& pollable) { YT_LOG_DEBUG("Requesting pollable unregistration (%v)", pollable->GetLoggingTag()); auto* cookie = TPollableCookie::TryFromPollable(pollable.Get()); YT_VERIFY(cookie); - auto activeEventCount = cookie->ActiveEventCount.load(); - while (true) { - // Otherwise pollable has been already unregistered. - if (!(activeEventCount & 1)) { - YT_LOG_DEBUG("Pollable is already unregistered (%v)", - pollable->GetLoggingTag()); - return false; - } + if (cookie->SetUnregisterFlag()) { + // Poller guarantees that OnShutdown is never executed concurrently with OnEvent(). + // Otherwise it will be removed in TRunEventGuard. - if (cookie->ActiveEventCount.compare_exchange_weak(activeEventCount, activeEventCount & ~1)) { - // Poller guarantees that OnShutdown is never executed concurrently with OnEvent(). - // Otherwise it will be removed in TRunEventGuard. - if (activeEventCount == 1) { - pollable->OnShutdown(); - cookie->UnregisterPromise.Set(); - cookie->PollerThread->UnregisterQueue_.Enqueue(pollable); - cookie->PollerThread->WakeupHandle_.Raise(); - } - return true; - } + pollable->OnShutdown(); + cookie->UnregisterPromise.Set(); + cookie->Invoker.Reset(); + auto pollerThread = std::move(cookie->PollerThread); + pollerThread->UnregisterQueue_.Enqueue(pollable); + pollerThread->WakeupHandle_.Raise(); } - - return false; } - void HandleEvents() + void HandleEvents(int eventCount) { - int eventCount = PollerImpl_.Wait(PooledImplEvents_.data(), PooledImplEvents_.size(), PollerThreadQuantum.MicroSeconds()); - for (int index = 0; index < eventCount; ++index) { const auto& event = PooledImplEvents_[index]; auto control = FromImplControl(PollerImpl_.ExtractFilter(&event)); auto* pollable = static_cast<IPollable*>(PollerImpl_.ExtractEvent(&event)); + // Null pollable stands for wakeup handle. if (!pollable) { WakeupHandle_.Clear(); continue; @@ -368,16 +415,11 @@ private: YT_VERIFY(pollable->GetRefCount() > 0); // Can safely dereference pollable because even unregistered pollables are hold in Pollables_. - auto priority = pollable->GetPriority(); if (auto guard = TryAcquireRunEventGuard(pollable, control)) { - Callbacks_[priority].push_back(BIND(std::move(guard))); + auto* cookie = TPollableCookie::FromPollable(pollable); + cookie->Invoker->Invoke(BIND(std::move(guard))); } } - - for (auto priority : TEnumTraits<EPollablePriority>::GetDomainValues()) { - HandlerInvoker_[priority]->Invoke(Callbacks_[priority]); - Callbacks_[priority].clear(); - } } void ThreadMain() override @@ -391,7 +433,9 @@ private: YT_LOG_DEBUG("Thread started (Name: %v)", GetThreadName()); - while (!IsStopping()) { + while (true) { + int eventCount = PollerImpl_.Wait(PooledImplEvents_.data(), PooledImplEvents_.size(), PollerThreadQuantum.MicroSeconds()); + // Save items from unregister queue before processing register queue. // Otherwise registration and unregistration can be reordered: // item was enqueued in register and unregister queues after processing register queue; @@ -405,13 +449,23 @@ private: InsertOrCrash(Pollables_, std::move(pollable)); }); - HandleEvents(); + HandleEvents(eventCount); for (const auto& pollable : unregisterItems) { EraseOrCrash(Pollables_, pollable); } unregisterItems.clear(); + + if (IsStopping()) { + if (Pollables_.empty()) { + break; + } + // Need to unregister pollables when stopping to break reference cycles between pollables and poller. + for (const auto& pollable : Pollables_) { + DoUnregister(pollable); + } + } } YT_LOG_DEBUG("Thread stopped (Name: %v)", @@ -421,10 +475,8 @@ private: GetThreadName()); } - // Shutdown here. - for (const auto& pollable : Pollables_) { - DoUnregister(pollable); - } + RegisterQueue_.DequeueAll(false, [&] (const auto&) { }); + UnregisterQueue_.DequeueAll(false, [&] (const auto&) { }); } void StopPrologue() override diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp index a7e7da4401..4e437fcec8 100644 --- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp @@ -1411,8 +1411,10 @@ TEST_P(TFairShareSchedulerTest, TwoLevelFairness) auto threadPool = CreateNewTwoLevelFairShareThreadPool( numThreads, "MyFairSharePool", - /*poolWeightProvider*/ nullptr, - /*verboseLogging*/ true); + { + /*poolWeightProvider*/ nullptr, + /*verboseLogging*/ true + }); std::vector<TDuration> progresses(numWorkers); std::vector<TDuration> pools(numPools); diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index c0f47f1ead..38427b05b9 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -484,6 +484,7 @@ public: WriteDirection_.Operation.reset(); } + Poller_->Unarm(FD_, this); YT_VERIFY(TryClose(FD_, false)); FD_ = -1; @@ -971,7 +972,7 @@ private: } else if (!result.Value().Retry) { operation->SetResult(); } else if (needRetry) { - Poller_->Retry(this, false); + Poller_->Retry(this); } if (needUnregister) { diff --git a/yt/yt/core/net/listener.cpp b/yt/yt/core/net/listener.cpp index bb658fd68d..7a4bda3a64 100644 --- a/yt/yt/core/net/listener.cpp +++ b/yt/yt/core/net/listener.cpp @@ -93,6 +93,7 @@ public: Error_ = TError("Listener is shut down"); } std::swap(Queue_, queue); + Acceptor_->Unarm(ServerSocket_, this); YT_VERIFY(TryClose(ServerSocket_, false)); } diff --git a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp index a033fa9f19..c8eafd445f 100644 --- a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp +++ b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp @@ -42,6 +42,11 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest) outerServer.InitilizeAddress(); innerServer.InitilizeAddress(); + auto finally = Finally([&] { + outerServer.TearDown(); + innerServer.TearDown(); + }); + auto workerPool = NConcurrency::CreateThreadPool(4, "Worker"); outerServer.InitializeServer( |