diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-01-29 08:57:26 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-01-29 09:08:36 +0300 |
commit | 5e3cebce04bcb2566d4e8cbdf023c61932f39ad4 (patch) | |
tree | 5874d7650f9779855d30bd4318ee36447fa6669b /yt | |
parent | 6922728a2f27338a00b68625e876d33f04530f28 (diff) | |
download | ydb-5e3cebce04bcb2566d4e8cbdf023c61932f39ad4.tar.gz |
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/bus/tcp/connection.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/bus/tcp/connection.h | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/poller.h | 5 | ||||
-rw-r--r-- | yt/yt/core/concurrency/thread_pool_poller.cpp | 158 |
4 files changed, 98 insertions, 73 deletions
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index 02c8209651..c602964133 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -793,6 +793,12 @@ void TTcpConnection::UnsubscribeTerminated(const TCallback<void(const TError&)>& void TTcpConnection::OnEvent(EPollControl control) { + auto multiplexingBand = MultiplexingBand_.load(); + if (multiplexingBand != ActualMultiplexingBand_) { + Poller_->SetExecutionPool(this, FormatEnum(multiplexingBand)); + ActualMultiplexingBand_ = multiplexingBand; + } + EPollControl action; { auto rawPendingControl = PendingControl_.load(std::memory_order::acquire); diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index 722f3308db..39c5dea4d2 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -224,6 +224,8 @@ private: std::atomic<EMultiplexingBand> MultiplexingBand_ = EMultiplexingBand::Default; + EMultiplexingBand ActualMultiplexingBand_ = EMultiplexingBand::Default; + TAtomicObject<TError> Error_; NNet::IAsyncDialerSessionPtr DialerSession_; diff --git a/yt/yt/core/concurrency/poller.h b/yt/yt/core/concurrency/poller.h index d9c25295bb..30340eea85 100644 --- a/yt/yt/core/concurrency/poller.h +++ b/yt/yt/core/concurrency/poller.h @@ -82,7 +82,10 @@ struct IPoller //! Tries to register a pollable entity but does not arm the poller yet. //! Returns |false| if the poller is already shutting down. - virtual bool TryRegister(const IPollablePtr& pollable) = 0; + virtual bool TryRegister(const IPollablePtr& pollable, TString poolName = "default") = 0; + + //! Method must be called inside OnEvent. + virtual void SetExecutionPool(const IPollablePtr& pollable, TString poolName) = 0; //! Unregisters the previously registered entity. /*! diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp index f93d507673..4aa261a548 100644 --- a/yt/yt/core/concurrency/thread_pool_poller.cpp +++ b/yt/yt/core/concurrency/thread_pool_poller.cpp @@ -1,9 +1,6 @@ -#include "thread_pool.h" #include "poller.h" #include "thread_pool_poller.h" #include "private.h" -#include "profiling_helpers.h" -#include "scheduler_thread.h" #include "two_level_fair_share_thread_pool.h" #include "new_fair_share_thread_pool.h" @@ -13,6 +10,8 @@ #include <yt/yt/core/profiling/tscp.h> +#include <yt/yt/core/threading/thread.h> + #include <library/cpp/yt/threading/notification_handle.h> #include <library/cpp/yt/memory/ref_tracked.h> @@ -40,10 +39,16 @@ class TThreadPoolPoller; namespace { +DEFINE_ENUM(EFinishResult, + (None) + (Repeat) + (Shutdown) +); + class TCookieState { public: - // AquireControl is called from poller thread. + // AquireControl is called from poller thread and from Retry. bool AquireControl(ui32 control) { auto currentState = State_.load(); @@ -55,19 +60,17 @@ public: if ((static_cast<ui32>(currentState) & control) == control) { return false; } - } while (!State_.compare_exchange_weak(currentState, (currentState | static_cast<ui64>(control)) + RefValue)); + } while (!State_.compare_exchange_weak(currentState, currentState | static_cast<ui64>(control) | RunningFlag)); - return true; + return !(currentState & RunningFlag); } - void ResetControl(ui32 control) + // Resets control and returns previous value. + ui32 ResetControl() { auto currentState = State_.load(); - do { - if (!(static_cast<ui32>(currentState) & control)) { - break; - } - } while (!State_.compare_exchange_weak(currentState, currentState & ~static_cast<ui64>(control))); + while (!State_.compare_exchange_weak(currentState, currentState & (UnregisterFlag | RunningFlag))); + return static_cast<ui32>(currentState); } // Returns destroy flag. @@ -81,29 +84,36 @@ public: } } while (!State_.compare_exchange_weak(currentState, currentState | UnregisterFlag)); - // No refs. - return currentState == 0; + return !(currentState & RunningFlag); } - // Returns destroy flag. - bool ReleaseRef() + EFinishResult Finish() { - auto prevState = State_.fetch_sub(RefValue); + auto currentState = State_.load(); - YT_VERIFY(prevState >= RefValue); - auto currentState = prevState - RefValue; - if ((currentState >> ControlShift) == 1) { - // Verify that control flags are empty when there are no refs and unregister flag is set. - YT_VERIFY(static_cast<ui32>(currentState) == 0); - } + YT_VERIFY(currentState & RunningFlag); + + do { + if (currentState & UnregisterFlag) { + // Run destroy. + return EFinishResult::Shutdown; + } + + if (currentState & ~(UnregisterFlag | RunningFlag)) { + // Has state. Retry. + return EFinishResult::Repeat; + } + + } while (!State_.compare_exchange_weak(currentState, currentState & ~RunningFlag)); - return prevState == (RefValue | UnregisterFlag); + return EFinishResult::None; } private: static constexpr auto ControlShift = sizeof(ui32) * 8; static constexpr ui64 UnregisterFlag = 1ULL << ControlShift; - static constexpr ui64 RefValue = UnregisterFlag * 2; + static constexpr ui64 RunningFlag = 1ULL << (ControlShift + 1); + // No contention expected when accessing this atomic variable. // So we can safely (regarding to performance) use CAS. std::atomic<ui64> State_ = 0; @@ -139,6 +149,7 @@ EContPoll ToImplControl(EPollControl control) { int implControl = CONT_POLL_ONE_SHOT; if (Any(control & EPollControl::EdgeTriggered)) { + // N.B. Edge-triggered mode disables one shot mode. implControl = CONT_POLL_EDGE_TRIGGERED; } if (Any(control & EPollControl::BacklogEmpty)) { @@ -204,7 +215,7 @@ public: FairShareThreadPool_->Configure(threadCount); } - bool TryRegister(const IPollablePtr& pollable) override + bool TryRegister(const IPollablePtr& pollable, TString poolName) override { // FIXME(lukyan): Enqueueing in register queue may happen after stopping. // Create cookie when dequeueing from register queue? @@ -214,7 +225,9 @@ public: } auto cookie = New<TPollableCookie>(this); - cookie->Invoker = FairShareThreadPool_->GetInvoker("main", Format("%v", pollable.Get())); + cookie->Invoker = FairShareThreadPool_->GetInvoker( + poolName, + Format("%v", pollable.Get())); pollable->SetCookie(std::move(cookie)); RegisterQueue_.Enqueue(pollable); @@ -224,6 +237,14 @@ public: return true; } + void SetExecutionPool(const IPollablePtr& pollable, TString poolName) override + { + auto* cookie = TPollableCookie::FromPollable(pollable.Get()); + cookie->Invoker = FairShareThreadPool_->GetInvoker( + poolName, + Format("%v", pollable.Get())); + } + // TODO(lukyan): Method OnShutdown in the interface and returned future are redundant. // Shutdown can be done by subscribing returned future or some promise can be set inside OnShutdown. TFuture<void> Unregister(const IPollablePtr& pollable) override @@ -257,10 +278,7 @@ public: void Retry(const IPollablePtr& pollable) override { - if (auto guard = TryAcquireRunEventGuard(pollable.Get(), EPollControl::Retry)) { - auto* cookie = TPollableCookie::FromPollable(pollable.Get()); - cookie->Invoker->Invoke(BIND(std::move(guard))); - } + ScheduleEvent(pollable, EPollControl::Retry); } IInvokerPtr GetInvoker() const override @@ -274,34 +292,17 @@ public: } private: - static void DoShutdownPollable(TPollableCookie* cookie, IPollable* pollable) - { - // Poller guarantees that OnShutdown is never executed concurrently with OnEvent(). - // Otherwise it will be removed in TRunEventGuard. - RunNoExcept([&] { - pollable->OnShutdown(); - }); - - cookie->UnregisterPromise.Set(); - cookie->Invoker.Reset(); - auto pollerThread = std::move(cookie->PollerThread); - pollerThread->UnregisterQueue_.Enqueue(pollable); - pollerThread->WakeupHandle_.Raise(); - } - class TRunEventGuard { public: TRunEventGuard() = default; - TRunEventGuard(IPollable* pollable, EPollControl control) + explicit TRunEventGuard(IPollable* pollable) : Pollable_(pollable) - , Control_(control) { } explicit TRunEventGuard(TRunEventGuard&& other) : Pollable_(std::move(other.Pollable_)) - , Control_(std::move(other.Control_)) { other.Pollable_ = nullptr; } @@ -318,34 +319,38 @@ private: } auto* cookie = TPollableCookie::FromPollable(Pollable_); - cookie->ResetControl(ToUnderlying(Control_)); + cookie->ResetControl(); Destroy(Pollable_); } - explicit operator bool() const - { - return static_cast<bool>(Pollable_); - } - void operator()() { auto* cookie = TPollableCookie::FromPollable(Pollable_); - cookie->ResetControl(ToUnderlying(Control_)); - - Pollable_->OnEvent(Control_); + auto control = EPollControl(cookie->ResetControl()); + RunNoExcept([&] { + Pollable_->OnEvent(control); + }); Destroy(Pollable_); Pollable_ = nullptr; } private: IPollable* Pollable_ = nullptr; - EPollControl Control_ = EPollControl::None; static void Destroy(IPollable* pollable) { auto* cookie = TPollableCookie::FromPollable(pollable); - if (cookie->ReleaseRef()) { - DoShutdownPollable(cookie, pollable); + + auto result = cookie->Finish(); + switch (result) { + case EFinishResult::Shutdown: + DoShutdownPollable(cookie, pollable); + break; + case EFinishResult::Repeat: + cookie->Invoker->Invoke(BIND(TRunEventGuard(pollable))); + break; + case EFinishResult::None: + break; } } }; @@ -371,16 +376,29 @@ private: std::array<TPollerImpl::TEvent, MaxEventsPerPoll> PooledImplEvents_; - static TRunEventGuard TryAcquireRunEventGuard(IPollable* pollable, EPollControl control) + // TODO(lukyan): Move static functions in Cookie? + static void ScheduleEvent(const IPollablePtr& pollable, EPollControl control) { - auto* cookie = TPollableCookie::FromPollable(pollable); - YT_VERIFY(cookie->GetRefCount() > 0); - + // Can safely dereference pollable because even unregistered pollables are hold in Pollables_. + auto* cookie = TPollableCookie::FromPollable(pollable.Get()); if (cookie->AquireControl(ToUnderlying(control))) { - return {pollable, control}; + cookie->Invoker->Invoke(BIND(TRunEventGuard(pollable.Get()))); } + } - return {}; + static void DoShutdownPollable(TPollableCookie* cookie, IPollable* pollable) + { + // Poller guarantees that OnShutdown is never executed concurrently with OnEvent(). + // Otherwise it will be removed in TRunEventGuard. + RunNoExcept([&] { + pollable->OnShutdown(); + }); + + cookie->UnregisterPromise.Set(); + cookie->Invoker.Reset(); + auto pollerThread = std::move(cookie->PollerThread); + pollerThread->UnregisterQueue_.Enqueue(pollable); + pollerThread->WakeupHandle_.Raise(); } void DoUnregister(const IPollablePtr& pollable) @@ -415,11 +433,7 @@ private: YT_VERIFY(pollable->GetRefCount() > 0); - // Can safely dereference pollable because even unregistered pollables are hold in Pollables_. - if (auto guard = TryAcquireRunEventGuard(pollable, control)) { - auto* cookie = TPollableCookie::FromPollable(pollable); - cookie->Invoker->Invoke(BIND(std::move(guard))); - } + ScheduleEvent(pollable, control); } } |