aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlukyan <lukyan@yandex-team.com>2023-11-28 11:24:01 +0300
committerlukyan <lukyan@yandex-team.com>2023-11-28 12:32:31 +0300
commit2d9b16afa033b65670357fceee780f89297e2b68 (patch)
treeb7cbe777db4d005c954bd8cb78236225e6640b34
parent38b378a33c9814cc1cf654beceb92bf6fb0c7c2c (diff)
downloadydb-2d9b16afa033b65670357fceee780f89297e2b68.tar.gz
YT-20430: Fair share bus and skip redundant events from poller
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp2
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.cpp22
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.h10
-rw-r--r--yt/yt/core/concurrency/poller.h3
-rw-r--r--yt/yt/core/concurrency/thread_pool_poller.cpp242
-rw-r--r--yt/yt/core/concurrency/unittests/scheduler_ut.cpp6
-rw-r--r--yt/yt/core/net/connection.cpp3
-rw-r--r--yt/yt/core/net/listener.cpp1
-rw-r--r--yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp5
9 files changed, 178 insertions, 116 deletions
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index f1d6e6c39c..d3168a89f5 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -879,7 +879,7 @@ void TTcpConnection::OnEvent(EPollControl control)
YT_ASSERT(Any(previousPendingControl & EPollControl::Running));
if (Any(previousPendingControl & ~EPollControl::Running) && None(previousPendingControl & EPollControl::Shutdown)) {
YT_LOG_TRACE("Retrying event processing for OnEvent (PendingControl: %v)", previousPendingControl);
- Poller_->Retry(this, false);
+ Poller_->Retry(this);
}
}
}
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
index 2bf6585fd1..987fbb4539 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
@@ -1,4 +1,5 @@
#include "two_level_fair_share_thread_pool.h"
+#include "new_fair_share_thread_pool.h"
#include "private.h"
#include "notify_manager.h"
#include "profiling_helpers.h"
@@ -385,14 +386,13 @@ public:
TTwoLevelFairShareQueue(
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
const TString& threadNamePrefix,
- IPoolWeightProviderPtr poolWeightProvider,
- bool verboseLogging)
- : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), TDuration::MilliSeconds(10))
+ const TNewTwoLevelFairShareThreadPoolOptions& options)
+ : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), options.PollingPeriod)
, ThreadNamePrefix_(threadNamePrefix)
, Profiler_(TProfiler{"/fair_share_queue"}
.WithHot())
- , PoolWeightProvider_(std::move(poolWeightProvider))
- , VerboseLogging_(verboseLogging)
+ , PoolWeightProvider_(options.PoolWeightProvider)
+ , VerboseLogging_(options.VerboseLogging)
{ }
~TTwoLevelFairShareQueue()
@@ -1104,14 +1104,12 @@ public:
TTwoLevelFairShareThreadPool(
int threadCount,
const TString& threadNamePrefix,
- IPoolWeightProviderPtr poolWeightProvider,
- bool verboseLogging)
+ const TNewTwoLevelFairShareThreadPoolOptions& options)
: TThreadPoolBase(threadNamePrefix)
, Queue_(New<TTwoLevelFairShareQueue>(
CallbackEventCount_,
ThreadNamePrefix_,
- std::move(poolWeightProvider),
- verboseLogging))
+ options))
{
Configure(threadCount);
}
@@ -1190,14 +1188,12 @@ private:
ITwoLevelFairShareThreadPoolPtr CreateNewTwoLevelFairShareThreadPool(
int threadCount,
const TString& threadNamePrefix,
- IPoolWeightProviderPtr poolWeightProvider = nullptr,
- bool verboseLogging = false)
+ const TNewTwoLevelFairShareThreadPoolOptions& options)
{
return New<TTwoLevelFairShareThreadPool>(
threadCount,
threadNamePrefix,
- std::move(poolWeightProvider),
- verboseLogging);
+ options);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.h b/yt/yt/core/concurrency/new_fair_share_thread_pool.h
index abc8c74fab..72fab95532 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.h
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.h
@@ -8,11 +8,17 @@ namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
+struct TNewTwoLevelFairShareThreadPoolOptions
+{
+ IPoolWeightProviderPtr PoolWeightProvider = nullptr;
+ bool VerboseLogging = false;
+ TDuration PollingPeriod = TDuration::MilliSeconds(10);
+};
+
ITwoLevelFairShareThreadPoolPtr CreateNewTwoLevelFairShareThreadPool(
int threadCount,
const TString& threadNamePrefix,
- IPoolWeightProviderPtr poolWeightProvider = nullptr,
- bool verboseLogging = false);
+ const TNewTwoLevelFairShareThreadPoolOptions& options = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/poller.h b/yt/yt/core/concurrency/poller.h
index 8a2c2e3a97..d9c25295bb 100644
--- a/yt/yt/core/concurrency/poller.h
+++ b/yt/yt/core/concurrency/poller.h
@@ -106,8 +106,7 @@ struct IPoller
virtual void Arm(TFileDescriptor fd, const IPollablePtr& pollable, EPollControl control) = 0;
//! Schedule call of #IPollable::OnEvent with EPollControl::Retry.
- //! From OnEvent could be called with wakeup = false to not wake new thread.
- virtual void Retry(const IPollablePtr& pollable, bool wakeup = true) = 0;
+ virtual void Retry(const IPollablePtr& pollable) = 0;
//! Unarms the poller.
virtual void Unarm(TFileDescriptor fd, const IPollablePtr& pollable) = 0;
diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp
index e6df58c65e..35ee8698ab 100644
--- a/yt/yt/core/concurrency/thread_pool_poller.cpp
+++ b/yt/yt/core/concurrency/thread_pool_poller.cpp
@@ -4,6 +4,8 @@
#include "private.h"
#include "profiling_helpers.h"
#include "scheduler_thread.h"
+#include "two_level_fair_share_thread_pool.h"
+#include "new_fair_share_thread_pool.h"
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/mpsc_stack.h>
@@ -36,9 +38,84 @@ class TThreadPoolPoller;
namespace {
+class TCookieState
+{
+public:
+ // AquireControl is called from poller thread.
+ bool AquireControl(ui32 control)
+ {
+ auto currentState = State_.load();
+ do {
+ if (currentState & UnregisterFlag) {
+ return false;
+ }
+
+ if ((static_cast<ui32>(currentState) & control) == control) {
+ return false;
+ }
+ } while (!State_.compare_exchange_weak(currentState, (currentState | static_cast<ui64>(control)) + RefValue));
+
+ return true;
+ }
+
+ void ResetControl(ui32 control)
+ {
+ auto currentState = State_.load();
+ do {
+ if (!(static_cast<ui32>(currentState) & control)) {
+ break;
+ }
+ } while (!State_.compare_exchange_weak(currentState, currentState & ~static_cast<ui64>(control)));
+ }
+
+ // Returns destroy flag.
+ bool SetUnregisterFlag()
+ {
+ auto currentState = State_.load();
+ do {
+ // Pollable has been already unregistered.
+ if (currentState & UnregisterFlag) {
+ return false;
+ }
+ } while (!State_.compare_exchange_weak(currentState, currentState | UnregisterFlag));
+
+ // No refs.
+ return currentState == 0;
+ }
+
+ // Returns destroy flag.
+ bool ReleaseRef()
+ {
+ auto prevState = State_.fetch_sub(RefValue);
+
+ YT_VERIFY(prevState >= RefValue);
+ auto currentState = prevState - RefValue;
+ if ((currentState >> ControlShift) == 1) {
+ // Verify that control flags are empty when there are no refs and unregister flag is set.
+ YT_VERIFY(static_cast<ui32>(currentState) == 0);
+ }
+
+ return prevState == (RefValue | UnregisterFlag);
+ }
+
+private:
+ static constexpr auto ControlShift = sizeof(ui32) * 8;
+ static constexpr ui64 UnregisterFlag = 1ULL << ControlShift;
+ static constexpr ui64 RefValue = UnregisterFlag * 2;
+ // No contention expected when accessing this atomic variable.
+ // So we can safely (regarding to performance) use CAS.
+ std::atomic<ui64> State_ = 0;
+};
+
struct TPollableCookie
: public TRefCounted
+ , public TCookieState
{
+ const TPromise<void> UnregisterPromise = NewPromise<void>();
+
+ TIntrusivePtr<TThreadPoolPoller> PollerThread;
+ IInvokerPtr Invoker;
+
explicit TPollableCookie(TThreadPoolPoller* pollerThread)
: PollerThread(pollerThread)
{ }
@@ -54,12 +131,6 @@ struct TPollableCookie
YT_VERIFY(cookie);
return cookie;
}
-
- TThreadPoolPoller* const PollerThread = nullptr;
-
- // Active event count is equal to 2 * (active events) + (1 for unregister flag).
- std::atomic<int> ActiveEventCount = 1;
- const TPromise<void> UnregisterPromise = NewPromise<void>();
};
EContPoll ToImplControl(EPollControl control)
@@ -98,28 +169,6 @@ EPollControl FromImplControl(int implControl)
return control;
}
-EThreadPriority PollablePriorityToThreadPriority(EPollablePriority priority)
-{
- switch (priority) {
- case EPollablePriority::RealTime:
- return EThreadPriority::RealTime;
-
- default:
- return EThreadPriority::Normal;
- }
-}
-
-TString PollablePriorityToPollerThreadNameSuffix(EPollablePriority priority)
-{
- switch (priority) {
- case EPollablePriority::RealTime:
- return "RT";
-
- default:
- return "";
- }
-}
-
} // namespace
class TThreadPoolPoller
@@ -134,33 +183,36 @@ public:
: TThread(Format("%v:%v", threadNamePrefix, "Poll"))
, Logger(ConcurrencyLogger.WithTag("ThreadNamePrefix: %v", threadNamePrefix))
{
+ // Register auxilary notifictation handle to wake up poller thread when deregistering
+ // pollables and on shutdown.
PollerImpl_.Set(nullptr, WakeupHandle_.GetFD(), CONT_POLL_EDGE_TRIGGERED | CONT_POLL_READ);
- for (auto priority : TEnumTraits<EPollablePriority>::GetDomainValues()) {
- HandlerThreadPool_[priority] = CreateThreadPool(
- threadCount,
- threadNamePrefix + PollablePriorityToPollerThreadNameSuffix(priority),
- PollablePriorityToThreadPriority(priority),
- pollingPeriod);
- HandlerInvoker_[priority] = HandlerThreadPool_[priority]->GetInvoker();
- }
+ FairShareThreadPool_ = CreateNewTwoLevelFairShareThreadPool(
+ threadCount,
+ threadNamePrefix + "FS",
+ {
+ .PollingPeriod = pollingPeriod
+ });
+ AuxInvoker_ = FairShareThreadPool_->GetInvoker("aux", "default");
}
void Reconfigure(int threadCount) override
{
- for (auto priority : TEnumTraits<EPollablePriority>::GetDomainValues()) {
- HandlerThreadPool_[priority]->Configure(threadCount);
- }
+ FairShareThreadPool_->Configure(threadCount);
}
- // TODO(lukyan): Remove TryRegister and Unregister. Do it in Arm/Unarm.
bool TryRegister(const IPollablePtr& pollable) override
{
+ // FIXME(lukyan): Enqueueing in register queue may happen after stopping.
+ // Create cookie when dequeueing from register queue?
+ // How to prevent arming FD when stopping.
if (IsStopping()) {
return false;
}
- pollable->SetCookie(New<TPollableCookie>(this));
+ auto cookie = New<TPollableCookie>(this);
+ cookie->Invoker = FairShareThreadPool_->GetInvoker("main", Format("%v", pollable.Get()));
+ pollable->SetCookie(std::move(cookie));
RegisterQueue_.Enqueue(pollable);
YT_LOG_DEBUG("Pollable registered (%v)",
@@ -189,6 +241,7 @@ public:
fd,
control,
pollable->GetLoggingTag());
+
PollerImpl_.Set(pollable.Get(), fd, ToImplControl(control));
}
@@ -199,16 +252,17 @@ public:
PollerImpl_.Remove(fd);
}
- void Retry(const IPollablePtr& pollable, bool /*wakeup*/) override
+ void Retry(const IPollablePtr& pollable) override
{
if (auto guard = TryAcquireRunEventGuard(pollable.Get(), EPollControl::Retry)) {
- HandlerInvoker_[pollable->GetPriority()]->Invoke(BIND(std::move(guard)));
+ auto* cookie = TPollableCookie::FromPollable(pollable.Get());
+ cookie->Invoker->Invoke(BIND(std::move(guard)));
}
}
IInvokerPtr GetInvoker() const override
{
- return HandlerInvoker_[EPollablePriority::Normal];
+ return AuxInvoker_;
}
void Shutdown() override
@@ -243,7 +297,7 @@ private:
{
if (Pollable_) {
// This is unlikely but might happen on thread pool termination.
- GetFinalizerInvoker()->Invoke(BIND(&Destroy, Unretained(Pollable_)));
+ GetFinalizerInvoker()->Invoke(BIND(&ResetAndDestroy, Unretained(Pollable_), Control_));
}
}
@@ -254,6 +308,9 @@ private:
void operator()()
{
+ auto* cookie = TPollableCookie::FromPollable(Pollable_);
+ cookie->ResetControl(ToUnderlying(Control_));
+
Pollable_->OnEvent(Control_);
Destroy(Pollable_);
Pollable_ = nullptr;
@@ -266,21 +323,28 @@ private:
static void Destroy(IPollable* pollable)
{
auto* cookie = TPollableCookie::FromPollable(pollable);
- auto activeEventCount = cookie->ActiveEventCount.fetch_sub(2) - 2;
- if (activeEventCount == 0) {
+ if (cookie->ReleaseRef()) {
pollable->OnShutdown();
cookie->UnregisterPromise.Set();
- auto pollerThread = MakeStrong(cookie->PollerThread);
+ cookie->Invoker.Reset();
+ auto pollerThread = std::move(cookie->PollerThread);
pollerThread->UnregisterQueue_.Enqueue(pollable);
pollerThread->WakeupHandle_.Raise();
}
}
+
+ static void ResetAndDestroy(IPollable* pollable, EPollControl control)
+ {
+ auto* cookie = TPollableCookie::FromPollable(pollable);
+ cookie->ResetControl(ToUnderlying(control));
+ Destroy(pollable);
+ }
};
const NLogging::TLogger Logger;
- TEnumIndexedVector<EPollablePriority, IThreadPoolPtr> HandlerThreadPool_;
- TEnumIndexedVector<EPollablePriority, IInvokerPtr> HandlerInvoker_;
+ ITwoLevelFairShareThreadPoolPtr FairShareThreadPool_;
+ IInvokerPtr AuxInvoker_;
// Only makes sense for "select" backend.
struct TMutexLocking
@@ -298,64 +362,47 @@ private:
std::array<TPollerImpl::TEvent, MaxEventsPerPoll> PooledImplEvents_;
- TEnumIndexedVector<EPollablePriority, std::vector<TClosure>> Callbacks_;
-
static TRunEventGuard TryAcquireRunEventGuard(IPollable* pollable, EPollControl control)
{
auto* cookie = TPollableCookie::FromPollable(pollable);
YT_VERIFY(cookie->GetRefCount() > 0);
- auto oldEventCount = cookie->ActiveEventCount.fetch_add(2);
- if (oldEventCount & 1) {
- return TRunEventGuard(pollable, control);
+ if (cookie->AquireControl(ToUnderlying(control))) {
+ return {pollable, control};
}
- cookie->ActiveEventCount.fetch_sub(2);
- return TRunEventGuard();
+ return {};
}
- bool DoUnregister(const IPollablePtr& pollable)
+ void DoUnregister(const IPollablePtr& pollable)
{
YT_LOG_DEBUG("Requesting pollable unregistration (%v)",
pollable->GetLoggingTag());
auto* cookie = TPollableCookie::TryFromPollable(pollable.Get());
YT_VERIFY(cookie);
- auto activeEventCount = cookie->ActiveEventCount.load();
- while (true) {
- // Otherwise pollable has been already unregistered.
- if (!(activeEventCount & 1)) {
- YT_LOG_DEBUG("Pollable is already unregistered (%v)",
- pollable->GetLoggingTag());
- return false;
- }
+ if (cookie->SetUnregisterFlag()) {
+ // Poller guarantees that OnShutdown is never executed concurrently with OnEvent().
+ // Otherwise it will be removed in TRunEventGuard.
- if (cookie->ActiveEventCount.compare_exchange_weak(activeEventCount, activeEventCount & ~1)) {
- // Poller guarantees that OnShutdown is never executed concurrently with OnEvent().
- // Otherwise it will be removed in TRunEventGuard.
- if (activeEventCount == 1) {
- pollable->OnShutdown();
- cookie->UnregisterPromise.Set();
- cookie->PollerThread->UnregisterQueue_.Enqueue(pollable);
- cookie->PollerThread->WakeupHandle_.Raise();
- }
- return true;
- }
+ pollable->OnShutdown();
+ cookie->UnregisterPromise.Set();
+ cookie->Invoker.Reset();
+ auto pollerThread = std::move(cookie->PollerThread);
+ pollerThread->UnregisterQueue_.Enqueue(pollable);
+ pollerThread->WakeupHandle_.Raise();
}
-
- return false;
}
- void HandleEvents()
+ void HandleEvents(int eventCount)
{
- int eventCount = PollerImpl_.Wait(PooledImplEvents_.data(), PooledImplEvents_.size(), PollerThreadQuantum.MicroSeconds());
-
for (int index = 0; index < eventCount; ++index) {
const auto& event = PooledImplEvents_[index];
auto control = FromImplControl(PollerImpl_.ExtractFilter(&event));
auto* pollable = static_cast<IPollable*>(PollerImpl_.ExtractEvent(&event));
+ // Null pollable stands for wakeup handle.
if (!pollable) {
WakeupHandle_.Clear();
continue;
@@ -368,16 +415,11 @@ private:
YT_VERIFY(pollable->GetRefCount() > 0);
// Can safely dereference pollable because even unregistered pollables are hold in Pollables_.
- auto priority = pollable->GetPriority();
if (auto guard = TryAcquireRunEventGuard(pollable, control)) {
- Callbacks_[priority].push_back(BIND(std::move(guard)));
+ auto* cookie = TPollableCookie::FromPollable(pollable);
+ cookie->Invoker->Invoke(BIND(std::move(guard)));
}
}
-
- for (auto priority : TEnumTraits<EPollablePriority>::GetDomainValues()) {
- HandlerInvoker_[priority]->Invoke(Callbacks_[priority]);
- Callbacks_[priority].clear();
- }
}
void ThreadMain() override
@@ -391,7 +433,9 @@ private:
YT_LOG_DEBUG("Thread started (Name: %v)",
GetThreadName());
- while (!IsStopping()) {
+ while (true) {
+ int eventCount = PollerImpl_.Wait(PooledImplEvents_.data(), PooledImplEvents_.size(), PollerThreadQuantum.MicroSeconds());
+
// Save items from unregister queue before processing register queue.
// Otherwise registration and unregistration can be reordered:
// item was enqueued in register and unregister queues after processing register queue;
@@ -405,13 +449,23 @@ private:
InsertOrCrash(Pollables_, std::move(pollable));
});
- HandleEvents();
+ HandleEvents(eventCount);
for (const auto& pollable : unregisterItems) {
EraseOrCrash(Pollables_, pollable);
}
unregisterItems.clear();
+
+ if (IsStopping()) {
+ if (Pollables_.empty()) {
+ break;
+ }
+ // Need to unregister pollables when stopping to break reference cycles between pollables and poller.
+ for (const auto& pollable : Pollables_) {
+ DoUnregister(pollable);
+ }
+ }
}
YT_LOG_DEBUG("Thread stopped (Name: %v)",
@@ -421,10 +475,8 @@ private:
GetThreadName());
}
- // Shutdown here.
- for (const auto& pollable : Pollables_) {
- DoUnregister(pollable);
- }
+ RegisterQueue_.DequeueAll(false, [&] (const auto&) { });
+ UnregisterQueue_.DequeueAll(false, [&] (const auto&) { });
}
void StopPrologue() override
diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
index a7e7da4401..4e437fcec8 100644
--- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
@@ -1411,8 +1411,10 @@ TEST_P(TFairShareSchedulerTest, TwoLevelFairness)
auto threadPool = CreateNewTwoLevelFairShareThreadPool(
numThreads,
"MyFairSharePool",
- /*poolWeightProvider*/ nullptr,
- /*verboseLogging*/ true);
+ {
+ /*poolWeightProvider*/ nullptr,
+ /*verboseLogging*/ true
+ });
std::vector<TDuration> progresses(numWorkers);
std::vector<TDuration> pools(numPools);
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index c0f47f1ead..38427b05b9 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -484,6 +484,7 @@ public:
WriteDirection_.Operation.reset();
}
+ Poller_->Unarm(FD_, this);
YT_VERIFY(TryClose(FD_, false));
FD_ = -1;
@@ -971,7 +972,7 @@ private:
} else if (!result.Value().Retry) {
operation->SetResult();
} else if (needRetry) {
- Poller_->Retry(this, false);
+ Poller_->Retry(this);
}
if (needUnregister) {
diff --git a/yt/yt/core/net/listener.cpp b/yt/yt/core/net/listener.cpp
index bb658fd68d..7a4bda3a64 100644
--- a/yt/yt/core/net/listener.cpp
+++ b/yt/yt/core/net/listener.cpp
@@ -93,6 +93,7 @@ public:
Error_ = TError("Listener is shut down");
}
std::swap(Queue_, queue);
+ Acceptor_->Unarm(ServerSocket_, this);
YT_VERIFY(TryClose(ServerSocket_, false));
}
diff --git a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
index a033fa9f19..c8eafd445f 100644
--- a/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
+++ b/yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
@@ -42,6 +42,11 @@ TYPED_TEST(THandleChannelFailureTest, HandleChannelFailureTest)
outerServer.InitilizeAddress();
innerServer.InitilizeAddress();
+ auto finally = Finally([&] {
+ outerServer.TearDown();
+ innerServer.TearDown();
+ });
+
auto workerPool = NConcurrency::CreateThreadPool(4, "Worker");
outerServer.InitializeServer(