aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-01-29 08:57:26 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-01-29 09:08:36 +0300
commit5e3cebce04bcb2566d4e8cbdf023c61932f39ad4 (patch)
tree5874d7650f9779855d30bd4318ee36447fa6669b /yt
parent6922728a2f27338a00b68625e876d33f04530f28 (diff)
downloadydb-5e3cebce04bcb2566d4e8cbdf023c61932f39ad4.tar.gz
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp6
-rw-r--r--yt/yt/core/bus/tcp/connection.h2
-rw-r--r--yt/yt/core/concurrency/poller.h5
-rw-r--r--yt/yt/core/concurrency/thread_pool_poller.cpp158
4 files changed, 98 insertions, 73 deletions
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index 02c8209651..c602964133 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -793,6 +793,12 @@ void TTcpConnection::UnsubscribeTerminated(const TCallback<void(const TError&)>&
void TTcpConnection::OnEvent(EPollControl control)
{
+ auto multiplexingBand = MultiplexingBand_.load();
+ if (multiplexingBand != ActualMultiplexingBand_) {
+ Poller_->SetExecutionPool(this, FormatEnum(multiplexingBand));
+ ActualMultiplexingBand_ = multiplexingBand;
+ }
+
EPollControl action;
{
auto rawPendingControl = PendingControl_.load(std::memory_order::acquire);
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index 722f3308db..39c5dea4d2 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -224,6 +224,8 @@ private:
std::atomic<EMultiplexingBand> MultiplexingBand_ = EMultiplexingBand::Default;
+ EMultiplexingBand ActualMultiplexingBand_ = EMultiplexingBand::Default;
+
TAtomicObject<TError> Error_;
NNet::IAsyncDialerSessionPtr DialerSession_;
diff --git a/yt/yt/core/concurrency/poller.h b/yt/yt/core/concurrency/poller.h
index d9c25295bb..30340eea85 100644
--- a/yt/yt/core/concurrency/poller.h
+++ b/yt/yt/core/concurrency/poller.h
@@ -82,7 +82,10 @@ struct IPoller
//! Tries to register a pollable entity but does not arm the poller yet.
//! Returns |false| if the poller is already shutting down.
- virtual bool TryRegister(const IPollablePtr& pollable) = 0;
+ virtual bool TryRegister(const IPollablePtr& pollable, TString poolName = "default") = 0;
+
+ //! Method must be called inside OnEvent.
+ virtual void SetExecutionPool(const IPollablePtr& pollable, TString poolName) = 0;
//! Unregisters the previously registered entity.
/*!
diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp
index f93d507673..4aa261a548 100644
--- a/yt/yt/core/concurrency/thread_pool_poller.cpp
+++ b/yt/yt/core/concurrency/thread_pool_poller.cpp
@@ -1,9 +1,6 @@
-#include "thread_pool.h"
#include "poller.h"
#include "thread_pool_poller.h"
#include "private.h"
-#include "profiling_helpers.h"
-#include "scheduler_thread.h"
#include "two_level_fair_share_thread_pool.h"
#include "new_fair_share_thread_pool.h"
@@ -13,6 +10,8 @@
#include <yt/yt/core/profiling/tscp.h>
+#include <yt/yt/core/threading/thread.h>
+
#include <library/cpp/yt/threading/notification_handle.h>
#include <library/cpp/yt/memory/ref_tracked.h>
@@ -40,10 +39,16 @@ class TThreadPoolPoller;
namespace {
+DEFINE_ENUM(EFinishResult,
+ (None)
+ (Repeat)
+ (Shutdown)
+);
+
class TCookieState
{
public:
- // AquireControl is called from poller thread.
+ // AquireControl is called from poller thread and from Retry.
bool AquireControl(ui32 control)
{
auto currentState = State_.load();
@@ -55,19 +60,17 @@ public:
if ((static_cast<ui32>(currentState) & control) == control) {
return false;
}
- } while (!State_.compare_exchange_weak(currentState, (currentState | static_cast<ui64>(control)) + RefValue));
+ } while (!State_.compare_exchange_weak(currentState, currentState | static_cast<ui64>(control) | RunningFlag));
- return true;
+ return !(currentState & RunningFlag);
}
- void ResetControl(ui32 control)
+ // Resets control and returns previous value.
+ ui32 ResetControl()
{
auto currentState = State_.load();
- do {
- if (!(static_cast<ui32>(currentState) & control)) {
- break;
- }
- } while (!State_.compare_exchange_weak(currentState, currentState & ~static_cast<ui64>(control)));
+ while (!State_.compare_exchange_weak(currentState, currentState & (UnregisterFlag | RunningFlag)));
+ return static_cast<ui32>(currentState);
}
// Returns destroy flag.
@@ -81,29 +84,36 @@ public:
}
} while (!State_.compare_exchange_weak(currentState, currentState | UnregisterFlag));
- // No refs.
- return currentState == 0;
+ return !(currentState & RunningFlag);
}
- // Returns destroy flag.
- bool ReleaseRef()
+ EFinishResult Finish()
{
- auto prevState = State_.fetch_sub(RefValue);
+ auto currentState = State_.load();
- YT_VERIFY(prevState >= RefValue);
- auto currentState = prevState - RefValue;
- if ((currentState >> ControlShift) == 1) {
- // Verify that control flags are empty when there are no refs and unregister flag is set.
- YT_VERIFY(static_cast<ui32>(currentState) == 0);
- }
+ YT_VERIFY(currentState & RunningFlag);
+
+ do {
+ if (currentState & UnregisterFlag) {
+ // Run destroy.
+ return EFinishResult::Shutdown;
+ }
+
+ if (currentState & ~(UnregisterFlag | RunningFlag)) {
+ // Has state. Retry.
+ return EFinishResult::Repeat;
+ }
+
+ } while (!State_.compare_exchange_weak(currentState, currentState & ~RunningFlag));
- return prevState == (RefValue | UnregisterFlag);
+ return EFinishResult::None;
}
private:
static constexpr auto ControlShift = sizeof(ui32) * 8;
static constexpr ui64 UnregisterFlag = 1ULL << ControlShift;
- static constexpr ui64 RefValue = UnregisterFlag * 2;
+ static constexpr ui64 RunningFlag = 1ULL << (ControlShift + 1);
+
// No contention expected when accessing this atomic variable.
// So we can safely (regarding to performance) use CAS.
std::atomic<ui64> State_ = 0;
@@ -139,6 +149,7 @@ EContPoll ToImplControl(EPollControl control)
{
int implControl = CONT_POLL_ONE_SHOT;
if (Any(control & EPollControl::EdgeTriggered)) {
+ // N.B. Edge-triggered mode disables one shot mode.
implControl = CONT_POLL_EDGE_TRIGGERED;
}
if (Any(control & EPollControl::BacklogEmpty)) {
@@ -204,7 +215,7 @@ public:
FairShareThreadPool_->Configure(threadCount);
}
- bool TryRegister(const IPollablePtr& pollable) override
+ bool TryRegister(const IPollablePtr& pollable, TString poolName) override
{
// FIXME(lukyan): Enqueueing in register queue may happen after stopping.
// Create cookie when dequeueing from register queue?
@@ -214,7 +225,9 @@ public:
}
auto cookie = New<TPollableCookie>(this);
- cookie->Invoker = FairShareThreadPool_->GetInvoker("main", Format("%v", pollable.Get()));
+ cookie->Invoker = FairShareThreadPool_->GetInvoker(
+ poolName,
+ Format("%v", pollable.Get()));
pollable->SetCookie(std::move(cookie));
RegisterQueue_.Enqueue(pollable);
@@ -224,6 +237,14 @@ public:
return true;
}
+ void SetExecutionPool(const IPollablePtr& pollable, TString poolName) override
+ {
+ auto* cookie = TPollableCookie::FromPollable(pollable.Get());
+ cookie->Invoker = FairShareThreadPool_->GetInvoker(
+ poolName,
+ Format("%v", pollable.Get()));
+ }
+
// TODO(lukyan): Method OnShutdown in the interface and returned future are redundant.
// Shutdown can be done by subscribing returned future or some promise can be set inside OnShutdown.
TFuture<void> Unregister(const IPollablePtr& pollable) override
@@ -257,10 +278,7 @@ public:
void Retry(const IPollablePtr& pollable) override
{
- if (auto guard = TryAcquireRunEventGuard(pollable.Get(), EPollControl::Retry)) {
- auto* cookie = TPollableCookie::FromPollable(pollable.Get());
- cookie->Invoker->Invoke(BIND(std::move(guard)));
- }
+ ScheduleEvent(pollable, EPollControl::Retry);
}
IInvokerPtr GetInvoker() const override
@@ -274,34 +292,17 @@ public:
}
private:
- static void DoShutdownPollable(TPollableCookie* cookie, IPollable* pollable)
- {
- // Poller guarantees that OnShutdown is never executed concurrently with OnEvent().
- // Otherwise it will be removed in TRunEventGuard.
- RunNoExcept([&] {
- pollable->OnShutdown();
- });
-
- cookie->UnregisterPromise.Set();
- cookie->Invoker.Reset();
- auto pollerThread = std::move(cookie->PollerThread);
- pollerThread->UnregisterQueue_.Enqueue(pollable);
- pollerThread->WakeupHandle_.Raise();
- }
-
class TRunEventGuard
{
public:
TRunEventGuard() = default;
- TRunEventGuard(IPollable* pollable, EPollControl control)
+ explicit TRunEventGuard(IPollable* pollable)
: Pollable_(pollable)
- , Control_(control)
{ }
explicit TRunEventGuard(TRunEventGuard&& other)
: Pollable_(std::move(other.Pollable_))
- , Control_(std::move(other.Control_))
{
other.Pollable_ = nullptr;
}
@@ -318,34 +319,38 @@ private:
}
auto* cookie = TPollableCookie::FromPollable(Pollable_);
- cookie->ResetControl(ToUnderlying(Control_));
+ cookie->ResetControl();
Destroy(Pollable_);
}
- explicit operator bool() const
- {
- return static_cast<bool>(Pollable_);
- }
-
void operator()()
{
auto* cookie = TPollableCookie::FromPollable(Pollable_);
- cookie->ResetControl(ToUnderlying(Control_));
-
- Pollable_->OnEvent(Control_);
+ auto control = EPollControl(cookie->ResetControl());
+ RunNoExcept([&] {
+ Pollable_->OnEvent(control);
+ });
Destroy(Pollable_);
Pollable_ = nullptr;
}
private:
IPollable* Pollable_ = nullptr;
- EPollControl Control_ = EPollControl::None;
static void Destroy(IPollable* pollable)
{
auto* cookie = TPollableCookie::FromPollable(pollable);
- if (cookie->ReleaseRef()) {
- DoShutdownPollable(cookie, pollable);
+
+ auto result = cookie->Finish();
+ switch (result) {
+ case EFinishResult::Shutdown:
+ DoShutdownPollable(cookie, pollable);
+ break;
+ case EFinishResult::Repeat:
+ cookie->Invoker->Invoke(BIND(TRunEventGuard(pollable)));
+ break;
+ case EFinishResult::None:
+ break;
}
}
};
@@ -371,16 +376,29 @@ private:
std::array<TPollerImpl::TEvent, MaxEventsPerPoll> PooledImplEvents_;
- static TRunEventGuard TryAcquireRunEventGuard(IPollable* pollable, EPollControl control)
+ // TODO(lukyan): Move static functions in Cookie?
+ static void ScheduleEvent(const IPollablePtr& pollable, EPollControl control)
{
- auto* cookie = TPollableCookie::FromPollable(pollable);
- YT_VERIFY(cookie->GetRefCount() > 0);
-
+ // Can safely dereference pollable because even unregistered pollables are hold in Pollables_.
+ auto* cookie = TPollableCookie::FromPollable(pollable.Get());
if (cookie->AquireControl(ToUnderlying(control))) {
- return {pollable, control};
+ cookie->Invoker->Invoke(BIND(TRunEventGuard(pollable.Get())));
}
+ }
- return {};
+ static void DoShutdownPollable(TPollableCookie* cookie, IPollable* pollable)
+ {
+ // Poller guarantees that OnShutdown is never executed concurrently with OnEvent().
+ // Otherwise it will be removed in TRunEventGuard.
+ RunNoExcept([&] {
+ pollable->OnShutdown();
+ });
+
+ cookie->UnregisterPromise.Set();
+ cookie->Invoker.Reset();
+ auto pollerThread = std::move(cookie->PollerThread);
+ pollerThread->UnregisterQueue_.Enqueue(pollable);
+ pollerThread->WakeupHandle_.Raise();
}
void DoUnregister(const IPollablePtr& pollable)
@@ -415,11 +433,7 @@ private:
YT_VERIFY(pollable->GetRefCount() > 0);
- // Can safely dereference pollable because even unregistered pollables are hold in Pollables_.
- if (auto guard = TryAcquireRunEventGuard(pollable, control)) {
- auto* cookie = TPollableCookie::FromPollable(pollable);
- cookie->Invoker->Invoke(BIND(std::move(guard)));
- }
+ ScheduleEvent(pollable, control);
}
}