aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-05-27 11:31:21 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-05-27 11:44:57 +0300
commit89f880c2c27de159962127b46e71fed21d21f7db (patch)
tree5b2326652356602bdec4a31d6a88cdea3a6b30b3
parent8252f93d81626e09c191953f7616eacd16721565 (diff)
downloadydb-89f880c2c27de159962127b46e71fed21d21f7db.tar.gz
YT-21634: Make shutdown sequences of Invokers more robust
Basically, there are two bugs/bad behaviors: 1) Some parts of shutdown are delegated to Finalizer/Shutdown invoker. This causes a race (not a data race, just non-deterministic behaviour) between deadlocking producer (if destruction of their callback causes a deadlock) and a finalizer/shutdown thread who closes the queue triggering deadlock event. We change behaviour so that queues are closed immediately thus if any deadlocks are present, they are guaranteed to happen and be reliably detected. 2) Most queues that implement a shutdown mechanism, do not have a proper way of double checking if queue was closed after they've checked previously but before they enqueued their task. This results in a callback being stuck in queue forever. On its own this is not an issues, but given the fact that callback can and usually does hold a strong reference to the class which usually owns the queue itself, we have a reference cycle queue -owns-> callback -owns-> queue. We implement appropriate double checking mechanics which ensure such a think never occurs. ad8c42eb67f74bc19c32c85fd8b7790b294a7144
-rw-r--r--yt/yt/core/concurrency/action_queue.cpp22
-rw-r--r--yt/yt/core/concurrency/delayed_executor.cpp96
-rw-r--r--yt/yt/core/concurrency/fair_share_action_queue.cpp21
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_queue.cpp15
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_queue.h7
-rw-r--r--yt/yt/core/concurrency/fair_share_thread_pool.cpp25
-rw-r--r--yt/yt/core/concurrency/invoker_queue.cpp155
-rw-r--r--yt/yt/core/concurrency/invoker_queue.h23
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.cpp78
-rw-r--r--yt/yt/core/concurrency/private.h9
-rw-r--r--yt/yt/core/concurrency/scheduler_thread.cpp4
-rw-r--r--yt/yt/core/concurrency/scheduler_thread.h9
-rw-r--r--yt/yt/core/concurrency/single_queue_scheduler_thread.cpp6
-rw-r--r--yt/yt/core/concurrency/single_queue_scheduler_thread.h1
-rw-r--r--yt/yt/core/concurrency/suspendable_action_queue.cpp21
-rw-r--r--yt/yt/core/concurrency/thread_pool.cpp11
-rw-r--r--yt/yt/core/concurrency/thread_pool_detail.cpp13
-rw-r--r--yt/yt/core/concurrency/thread_pool_detail.h1
-rw-r--r--yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp26
-rw-r--r--yt/yt/core/logging/log_manager.cpp8
-rw-r--r--yt/yt/core/misc/shutdown_priorities.h2
21 files changed, 386 insertions, 167 deletions
diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp
index 5ff68e0dcc..ca30470ef4 100644
--- a/yt/yt/core/concurrency/action_queue.cpp
+++ b/yt/yt/core/concurrency/action_queue.cpp
@@ -54,16 +54,14 @@ public:
void Shutdown(bool graceful)
{
- if (Stopped_.exchange(true)) {
+ // Proper synchronization done via Queue_->Shutdown().
+ if (Stopped_.exchange(true, std::memory_order::relaxed)) {
return;
}
- Queue_->Shutdown();
-
- ShutdownInvoker_->Invoke(BIND_NO_PROPAGATE([graceful, thread = Thread_, queue = Queue_] {
- thread->Stop(graceful);
- queue->DrainConsumer();
- }));
+ Queue_->Shutdown(graceful);
+ Thread_->Stop(graceful);
+ Queue_->OnConsumerFinished();
}
const IInvokerPtr& GetInvoker()
@@ -79,20 +77,14 @@ private:
const TMpscSingleQueueSchedulerThreadPtr Thread_;
const TShutdownCookie ShutdownCookie_;
- const IInvokerPtr ShutdownInvoker_ = GetShutdownInvoker();
- std::atomic<bool> Started_ = false;
std::atomic<bool> Stopped_ = false;
void EnsureStarted()
{
- if (Started_.load(std::memory_order::relaxed)) {
- return;
- }
- if (Started_.exchange(true)) {
- return;
- }
+ // Thread::Start already has
+ // its own short-circ mechanism.
Thread_->Start();
}
};
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp
index 23aabbff95..581163dcfa 100644
--- a/yt/yt/core/concurrency/delayed_executor.cpp
+++ b/yt/yt/core/concurrency/delayed_executor.cpp
@@ -3,6 +3,7 @@
#include "scheduler.h"
#include "private.h"
+#include <yt/yt/core/actions/invoker_util.h>
#include <yt/yt/core/misc/relaxed_mpsc_queue.h>
#include <yt/yt/core/misc/singleton.h>
@@ -60,6 +61,41 @@ DEFINE_REFCOUNTED_TYPE(TDelayedExecutorEntry)
////////////////////////////////////////////////////////////////////////////////
+//! (arkady-e1ppa) MO's/Fence explanation:
+/*
+ We want for shutdown to guarantee that no callback is left in any queue
+ once it is over. Otherwise, memory leak or a deadlock of Poller/GrpcServer
+ (or someone else who blocks thread until some callback is run) will occur.
+
+ We model our queue with Enqueue being RMW^rel(Queue_, x, y) and Dequeue
+ being RMW^acq(Queue_, x, y), where x is what we have read and y is what we
+ have observed. Missing callback would imply that in Submit method enqueueing |CB|
+ we have observed Stopping_ |false| (e.g. TThread::Start (c) returned |true|)
+ but also in ThreadMain during the SubmitQueue drain (f) we have not observed the
+ |CB|. Execution is schematically listed below:
+ T1(Submit) T2(Shutdown)
+ RMW^rel(Queue_, empty, CB) (a) W^rel(Stopping_, true) (d)
+ |sequenced-before |simply hb
+ Fence^sc (b) Fence^sc (e)
+ |sequenced-before |sequenced-before
+ R^acq(Stopping_, false) (c) RMW^acq(Queue_, empty, empty) (f)
+
+ Since (c) reads |false| it must be reading from Stopping_ ctor which is
+ W^na(Stopping_, false) which preceedes (d) in modification order. Thus
+ (c) must read-from some modification preceding (d) in modification order (ctor)
+ and therefore (c) -cob-> (d) (coherence ordered before).
+ Likewise, (f) reads |empty| which can only be read from Queue_ ctor or
+ prior Dequeue both of which preceede (a) in modification order (ctor is obvious;
+ former Dequeue by assumption that no one has read |CB| ever: if some (a) was
+ prior to some Dequeue in modification order, |CB| would inevitably be read).
+ So, (f) -cob-> (a). For fences we now have to relations:
+ (b) -sb-> (c) -cob-> (d) -simply hb-> (e) => (b) -S-> (e)
+ (e) -sb-> (f) -cob-> (a) -sb-> (b) => (e) -S-> (b)
+ Here sb is sequenced-before and S is sequentially-consistent total ordering.
+ We have formed a loop in S thus contradicting the assumption.
+*/
+
+
class TDelayedExecutorImpl
{
public:
@@ -139,9 +175,11 @@ public:
{
YT_VERIFY(callback);
auto entry = New<TDelayedExecutorEntry>(std::move(callback), deadline, std::move(invoker));
- PollerThread_->EnqueueSubmission(entry);
+ PollerThread_->EnqueueSubmission(entry); // <- (a)
+
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b)
- if (!PollerThread_->Start()) {
+ if (!PollerThread_->Start()) { // <- (c)
if (auto callback = TakeCallback(entry)) {
callback(/*aborted*/ true);
}
@@ -213,6 +251,43 @@ private:
NProfiling::TCounter CanceledCallbacksCounter_ = ConcurrencyProfiler.Counter("/delayed_executor/canceled_callbacks");
NProfiling::TCounter StaleCallbacksCounter_ = ConcurrencyProfiler.Counter("/delayed_executor/stale_callbacks");
+ class TCallbackGuard
+ {
+ public:
+ TCallbackGuard(TCallback<void(bool)> callback, bool aborted) noexcept
+ : Callback_(std::move(callback))
+ , Aborted_(aborted)
+ { }
+
+ TCallbackGuard(TCallbackGuard&& other) = default;
+
+ TCallbackGuard(const TCallbackGuard&) = delete;
+
+ TCallbackGuard& operator=(const TCallbackGuard&) = delete;
+ TCallbackGuard& operator=(TCallbackGuard&&) = delete;
+
+ void operator()()
+ {
+ auto callback = std::move(Callback_);
+ YT_VERIFY(callback);
+ callback.Run(Aborted_);
+ }
+
+ ~TCallbackGuard()
+ {
+ if (Callback_) {
+ YT_LOG_DEBUG("Aborting delayed executor callback");
+
+ auto callback = std::move(Callback_);
+ callback(/*aborted*/ true);
+ }
+ }
+
+ private:
+ TCallback<void(bool)> Callback_;
+ bool Aborted_;
+ };
+
void StartPrologue() override
{
@@ -240,6 +315,13 @@ private:
ProcessQueues();
if (IsStopping()) {
+ // We have Stopping_.store(true) in simply happens-before relation.
+ // Assume Stopping_.store(true, release) is (d).
+ // NB(arkady-e1ppa): At the time of writing it is seq_cst
+ // actually, but
+ // 1. We don't need it to be for correctness hehe
+ // 2. It won't help us here anyway
+ // 3. It might be changed as it could be suboptimal.
break;
}
@@ -251,6 +333,8 @@ private:
EventCount_->Wait(cookie, deadline);
}
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (e)
+
// Perform graceful shutdown.
// First run the scheduled callbacks with |aborted = true|.
@@ -267,7 +351,7 @@ private:
// Now we handle the queued callbacks similarly.
{
TDelayedExecutorEntryPtr entry;
- while (SubmitQueue_.TryDequeue(&entry)) {
+ while (SubmitQueue_.TryDequeue(&entry)) { // <- (f)
runAbort(entry);
}
}
@@ -349,7 +433,11 @@ private:
void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort)
{
if (auto callback = TakeCallback(entry)) {
- (entry->Invoker ? entry->Invoker : DelayedInvoker_)->Invoke(BIND_NO_PROPAGATE(std::move(callback), abort));
+ auto invoker = entry->Invoker
+ ? entry->Invoker
+ : DelayedInvoker_;
+ invoker
+ ->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), abort)));
}
}
};
diff --git a/yt/yt/core/concurrency/fair_share_action_queue.cpp b/yt/yt/core/concurrency/fair_share_action_queue.cpp
index dae24f21ba..ecef8faa85 100644
--- a/yt/yt/core/concurrency/fair_share_action_queue.cpp
+++ b/yt/yt/core/concurrency/fair_share_action_queue.cpp
@@ -105,16 +105,14 @@ public:
void Shutdown(bool graceful)
{
- if (Stopped_.exchange(true)) {
+ // Syncrhonization done via Queue_->Shutdown().
+ if (Stopped_.exchange(true, std::memory_order::relaxed)) {
return;
}
- Queue_->Shutdown();
-
- ShutdownInvoker_->Invoke(BIND([graceful, thread = Thread_, queue = Queue_] {
- thread->Stop(graceful);
- queue->DrainConsumer();
- }));
+ Queue_->Shutdown(graceful);
+ Thread_->Stop(graceful);
+ Queue_->OnConsumerFinished();
}
const IInvokerPtr& GetInvoker(int index) override
@@ -156,18 +154,13 @@ private:
std::vector<TString> BucketNames_;
- std::atomic<bool> Started_ = false;
std::atomic<bool> Stopped_ = false;
void EnsuredStarted()
{
- if (Started_.load(std::memory_order::relaxed)) {
- return;
- }
- if (Started_.exchange(true)) {
- return;
- }
+ // Thread::Start already has
+ // its own short-circ.
Thread_->Start();
}
};
diff --git a/yt/yt/core/concurrency/fair_share_invoker_queue.cpp b/yt/yt/core/concurrency/fair_share_invoker_queue.cpp
index e42cdc16a6..f4c073144a 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_queue.cpp
+++ b/yt/yt/core/concurrency/fair_share_invoker_queue.cpp
@@ -54,24 +54,17 @@ const IInvokerPtr& TFairShareInvokerQueue::GetInvoker(int bucketIndex, int queue
return bucket.Invokers[queueIndex];
}
-void TFairShareInvokerQueue::Shutdown()
+void TFairShareInvokerQueue::Shutdown(bool graceful)
{
for (auto& bucket : Buckets_) {
- bucket.Queue->Shutdown();
+ bucket.Queue->Shutdown(graceful);
}
}
-void TFairShareInvokerQueue::DrainProducer()
+void TFairShareInvokerQueue::OnConsumerFinished()
{
for (auto& bucket : Buckets_) {
- bucket.Queue->DrainProducer();
- }
-}
-
-void TFairShareInvokerQueue::DrainConsumer()
-{
- for (auto& bucket : Buckets_) {
- bucket.Queue->DrainConsumer();
+ bucket.Queue->OnConsumerFinished();
}
}
diff --git a/yt/yt/core/concurrency/fair_share_invoker_queue.h b/yt/yt/core/concurrency/fair_share_invoker_queue.h
index c474ffe104..fe943152b7 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_queue.h
+++ b/yt/yt/core/concurrency/fair_share_invoker_queue.h
@@ -40,10 +40,9 @@ public:
const IInvokerPtr& GetInvoker(int bucketIndex, int queueIndex) const;
- void Shutdown();
-
- void DrainProducer();
- void DrainConsumer();
+ // See TInvokerQueue::Shutdown/OnConsumerFinished.
+ void Shutdown(bool graceful = false);
+ void OnConsumerFinished();
bool IsRunning() const;
diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
index 4b4a48224d..91444d3f5d 100644
--- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
@@ -187,6 +187,10 @@ public:
void Invoke(TClosure callback, TBucket* bucket)
{
auto guard = Guard(SpinLock_);
+ // See Shutdown.
+ if (Stopping_) {
+ return;
+ }
QueueSize_.fetch_add(1, std::memory_order::relaxed);
@@ -228,12 +232,13 @@ public:
void Shutdown()
{
- Drain();
- }
-
- void Drain()
- {
auto guard = Guard(SpinLock_);
+ // Setting under spinlock because this way
+ // we have atomicity of two actions:
+ // 1) Write/read flag and 2) Drain/Enqueue callback.
+ // See two_level_fair_share_thread_pool Queue
+ // for more detailed explanation.
+ Stopping_ = true;
for (const auto& item : Heap_) {
item.Bucket->Drain();
}
@@ -339,7 +344,7 @@ private:
const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
-
+ bool Stopping_ = false;
std::vector<THeapItem> Heap_;
std::atomic<int> ThreadCount_ = 0;
@@ -548,14 +553,6 @@ private:
TThreadPoolBase::DoShutdown();
}
- TClosure MakeFinalizerCallback() override
- {
- return BIND_NO_PROPAGATE([queue = Queue_, callback = TThreadPoolBase::MakeFinalizerCallback()] {
- callback();
- queue->Drain();
- });
- }
-
void DoConfigure(int threadCount) override
{
Queue_->Configure(threadCount);
diff --git a/yt/yt/core/concurrency/invoker_queue.cpp b/yt/yt/core/concurrency/invoker_queue.cpp
index 27d985eb8a..849fd8c255 100644
--- a/yt/yt/core/concurrency/invoker_queue.cpp
+++ b/yt/yt/core/concurrency/invoker_queue.cpp
@@ -284,6 +284,86 @@ private:
////////////////////////////////////////////////////////////////////////////////
+// (arkady-e1ppa): Memory orders explanation around Shutdown:
+/*
+ There are two guarantees we want to enforce:
+ 1) If we read Running_ |false| we want to observe the correct value
+ of Graceful_. Otherwise we may discard tasks even when graceful
+ shutdown was requested.
+ 2) We must ensure that no callback will be left in the queue. That is if
+ shutdown is not graceful, there must be no execution of
+ concurrent Enqueue and Shutdown such that (c) (or (c'))
+ reads |true| from Running_ and (f) doesn't observe
+ action placed by (a) (or (a')). Note: in case of
+ graceful shutdown we expect that caller manually drains the queue
+ when they feel like it. This is sufficient because queue will either
+ be drained by another producer or enqueue requests will stop and
+ caller's method of draining the queue (e.g. stopping thread which polls
+ the queue and ask it to drain the queue) would process the queue normally.
+
+ Let's deal with 1) first since it is very easy. Consider relevant
+ part of EnqueueCallback call (some calls are inlined):
+ if (!Running_.load(std::memory_order::relaxed)) { <- (a0)
+ std::atomic_thread_fence(std::memory_order::acquire); <- (b0)
+ if (!Graceful_.load(std::memory_order::relaxed)) { <- (c0)
+ Queue_.DrainProducer();
+ }
+
+ return GetCpuInstant();
+ }
+
+ Relevant part of Shutdown:
+ if (graceful) {
+ Graceful_.store(true, std::memory_order::relaxed); <- (d0)
+ }
+
+ Running_.store(false, std::memory_order::release); <- (e0)
+
+ Suppose we read false in a0. Then we must has (e0) -rf-> (a0)
+ rf is reads-from. Then we have (a0) -sb-> (b0)
+ and b0 is acquire-fence, thus (e0) -rf-> (a0) -sb-> (b0)
+ and so (e0) -SW-> (b0) (SW is synchronizes with) implying
+ (e0) -SIHB-> (b0) (simply happens before). By transitivity
+ (d0) (if it happened) is SIHB (b0) and thus (c0) too.
+ (d0) -SIHB-> (c0) and (d0) being the last modification
+ in modification order implies that (c0) must be reading
+ from (d0) and thus (c0) always reads the correct value.
+
+ Now, let's deal with 2). Suppose otherwise. In this case
+ (c) reads |true| and (f) doesn't observe result of (a).
+ We shall model (a) as release RMW (currently it is
+ a seq_cst rmw, but this is suboptimal) and (f)
+ as acquire RMW (again currently seq_cst but can be
+ optimized). Then we have execution
+ T1(Enqueue) T2(Shutdown)
+ RMW^rel(Queue_, 0, 1) (a) W^rlx(Running_, false) (d)
+ Fence^sc (b) Fence^sc (e)
+ R^rlx(Running_, true) (c) RMW^acq(Queue_, 0, 0) (f)
+
+ Here RMW^rel(Queue_, 0, 1) means an rmw op on Queue_ state
+ which reads 0 (empty queue) and writes 1 (some callback address).
+
+ Since (f) doesn't read result of (a) then it must read value of
+ another modification (either ctor or another dequeue) which
+ preceedes (a) in modification order. Thus we have (f) -cob-> (a)
+ (cob is coherence-ordered before, https://eel.is/c++draft/atomics.order#3.3).
+ For fences we have (e) -sb-> (f) (sequenced before) and (a) -sb-> (b).
+ Thus (e) -sb-> (f) -cob-> (a) -sb-> (b) => (e) -S-> (b)
+ (Here S is total order on sequentially consistent events,
+ see https://eel.is/c++draft/atomics.order#4.4).
+ Like-wise, (c) -cob-> (d) because it must be reading from
+ another modification prior to (d) in modification order.
+ (b) -sb-> (c) -cob-> (d) -sb-> (e) => (b) -S-> (e)
+ and we have a loop in S, contracting the assumption.
+
+ NB(arkady-e1ppa): We do force-drain after (c) and (c')
+ because otherwise we could be an Enqueue happening
+ concurrently to a processing thread draining the queue
+ and leave our callback in queue forever.
+*/
+
+////////////////////////////////////////////////////////////////////////////////
+
template <class TQueueImpl>
TInvokerQueue<TQueueImpl>::TInvokerQueue(
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
@@ -375,8 +455,12 @@ TCpuInstant TInvokerQueue<TQueueImpl>::EnqueueCallback(
NProfiling::TTagId profilingTag,
TProfilerTagPtr profilerTag)
{
+ // NB: We are likely to never read false here
+ // so we do relaxed load but acquire fence in
+ // IF branch.
if (!Running_.load(std::memory_order::relaxed)) {
- DrainProducer();
+ std::atomic_thread_fence(std::memory_order::acquire);
+ TryDrainProducer();
YT_LOG_TRACE(
"Queue had been shut down, incoming action ignored (Callback: %v)",
callback.GetHandle());
@@ -392,7 +476,16 @@ TCpuInstant TInvokerQueue<TQueueImpl>::EnqueueCallback(
Counters_[profilingTag]->EnqueuedCounter.Increment();
}
- QueueImpl_.Enqueue(std::move(action));
+ QueueImpl_.Enqueue(std::move(action)); // <- (a)
+
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b)
+ if (!Running_.load(std::memory_order::relaxed)) { // <- (c)
+ TryDrainProducer(/*force*/ true);
+ YT_LOG_TRACE(
+ "Queue had been shut down concurrently, incoming action ignored (Callback: %v)",
+ callback.GetHandle());
+ }
+
return cpuInstant;
}
@@ -405,7 +498,10 @@ TCpuInstant TInvokerQueue<TQueueImpl>::EnqueueCallbacks(
auto cpuInstant = GetCpuInstant();
if (!Running_.load(std::memory_order::relaxed)) {
- DrainProducer();
+ std::atomic_thread_fence(std::memory_order::acquire);
+ TryDrainProducer();
+ YT_LOG_TRACE(
+ "Queue had been shut down, incoming actions ignored");
return cpuInstant;
}
@@ -421,7 +517,16 @@ TCpuInstant TInvokerQueue<TQueueImpl>::EnqueueCallbacks(
Counters_[profilingTag]->EnqueuedCounter.Increment(std::ssize(actions));
}
- QueueImpl_.Enqueue(actions);
+ QueueImpl_.Enqueue(actions); // <- (a')
+
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b')
+ if (!Running_.load(std::memory_order::relaxed)) { // <- (c')
+ TryDrainProducer(/*force*/ true);
+ YT_LOG_TRACE(
+ "Queue had been shut down concurrently, incoming actions ignored");
+ return cpuInstant;
+ }
+
return cpuInstant;
}
@@ -444,24 +549,36 @@ bool TInvokerQueue<TQueueImpl>::IsSerialized() const
}
template <class TQueueImpl>
-void TInvokerQueue<TQueueImpl>::Shutdown()
+void TInvokerQueue<TQueueImpl>::Shutdown(bool graceful)
{
- Running_.store(false, std::memory_order::relaxed);
-}
+ // This is done to protect both Graceful_
+ // and Running_ to be modified independently.
+ if (Stopping_.exchange(true, std::memory_order::relaxed)) {
+ return;
+ }
-template <class TQueueImpl>
-void TInvokerQueue<TQueueImpl>::DrainProducer()
-{
- YT_VERIFY(!Running_.load(std::memory_order::relaxed));
+ if (graceful) {
+ Graceful_.store(true, std::memory_order::relaxed);
+ }
- QueueImpl_.DrainProducer();
+ Running_.store(false, std::memory_order::release); // <- (d)
+
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (e)
+
+ if (!graceful) {
+ // NB: There may still be tasks in
+ // local part of the Queue in case
+ // of single consumer.
+ // One must drain it after stopping
+ // consumer.
+ QueueImpl_.DrainProducer(); // <- (f)
+ }
}
template <class TQueueImpl>
-void TInvokerQueue<TQueueImpl>::DrainConsumer()
+void TInvokerQueue<TQueueImpl>::OnConsumerFinished()
{
- YT_VERIFY(!Running_.load(std::memory_order::relaxed));
-
+ QueueImpl_.DrainProducer();
QueueImpl_.DrainConsumer();
}
@@ -588,6 +705,14 @@ typename TInvokerQueue<TQueueImpl>::TCountersPtr TInvokerQueue<TQueueImpl>::Crea
return counters;
}
+template <class TQueueImpl>
+void TInvokerQueue<TQueueImpl>::TryDrainProducer(bool force)
+{
+ if (force || !Graceful_.load(std::memory_order::relaxed)) {
+ QueueImpl_.DrainProducer();
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
template class TInvokerQueue<TMpmcQueueImpl>;
diff --git a/yt/yt/core/concurrency/invoker_queue.h b/yt/yt/core/concurrency/invoker_queue.h
index 48b86df069..f4464807da 100644
--- a/yt/yt/core/concurrency/invoker_queue.h
+++ b/yt/yt/core/concurrency/invoker_queue.h
@@ -167,10 +167,21 @@ public:
bool CheckAffinity(const IInvokerPtr& invoker) const override;
bool IsSerialized() const override;
- void Shutdown();
-
- void DrainProducer();
- void DrainConsumer();
+ // NB(arkady-e1ppa): Trying to call graceful shutdown
+ // concurrently with someone calling Invoke
+ // may end up making shutdown not graceful
+ // as double-checking in Invoke would drain
+ // the queue. If want a truly graceful
+ // Shutdown (e.g. until you run out of callbacks)
+ // just drain the queue without shutting it down.
+ void Shutdown(bool graceful = false);
+
+ // NB(arkady-e1ppa): Calling shutdown is not
+ // enough to prevent leaks of callbacks
+ // as there might be some callbacks left in
+ // local queue of mpsc queue if shutdown
+ // was not graceful.
+ void OnConsumerFinished();
bool BeginExecute(TEnqueuedAction* action, typename TQueueImpl::TConsumerToken* token = nullptr);
void EndExecute(TEnqueuedAction* action);
@@ -191,6 +202,8 @@ private:
NThreading::TThreadId ThreadId_ = NThreading::InvalidThreadId;
std::atomic<bool> Running_ = true;
+ std::atomic<bool> Stopping_ = false;
+ std::atomic<bool> Graceful_ = false;
struct TCounters
{
@@ -212,6 +225,8 @@ private:
TWaitTimeObserver WaitTimeObserver_;
TCountersPtr CreateCounters(const NProfiling::TTagSet& tagSet, NProfiling::IRegistryImplPtr registry);
+
+ void TryDrainProducer(bool force = false);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
index f65dfdbff2..585665cdf6 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
@@ -618,10 +618,56 @@ public:
ThreadCount_.store(threadCount);
}
- // Invoke is lock free.
+ // (arkady-e1ppa): Explanation of memory orders and fences around Stopped_:
+ /*
+ We have two concurrent actions: Invoke and Shutdown.
+ For our logic Invoke method does 2 things:
+ 1) It pushes callback to the queue (we assume it be release RMW operation)
+ In reality right now it is seq_cst, but it this can and should be changed in the future.
+ 2) Now, the second (and third) action is seq_cst fence and then seq_cst read of Stopping_.
+ Shutdown also does two thigns:
+ 1) It does a seq_cst rmw in Stopping_ (effectively, just write)
+ 2) It drains the queue.
+ In order to prevent losing callbacks in queue
+ We need to make sure that either Invoke observes |true| in the read
+ or Shutdown observes places callback from said Invoke.
+ We care about this, because if callback is stuck in queue it will
+ remain there until process is killed. If said callback MUST be
+ either executed or discarded (e.g. IPollable or Fiber) in order
+ for program to finish correctly (an not get stuck), then such a leak
+ can cause hung shutdown (and it used to do exactly that).
+
+ Relevant parts of execution are written below
+ (letter is the same in the code).
+ T1(Invoke) T2(Shutdown)
+ RMW^rel(Queue, 0, 1) (a) RMW^rlx(Stopping, false, true) (d)
+ Fence^sc (b) Fence^sc (e)
+ R^rlx(Stopping, false) (c) RMW^acq(Queue, 0, 0) (f)
+
+ We suppose that callback is lost so we haven't
+ seen 1 in (f) nor true in (c).
+ Since (c) reads false it reads from ctor which precedes
+ (d) in modification order. Thus we have (c) -cob-> (d)
+ (coherence-ordered before, https://eel.is/c++draft/atomics.order#3.3).
+ Likewise (f) must have read from some previous
+ dequeue attempt (or ctor) which precedes (a) in modification order.
+ Again, (f) -cob-> (a).
+ Now, for fences we have (sb means sequenced-before):
+ (b) -sb-> (c) -cob-> (d) -sb-> (e) => (b) -S-> (e)
+ (see https://eel.is/c++draft/atomics.order#4.4).
+ We also have
+ (e) -sb-> (f) -cob-> (a) -sb-> (b) => (b) -S-> (e).
+ Thus loop in S found contradicting the assumption
+ that (f) doesn't read 1 and (c) doesn't read true.
+ */
+
+ // Invoke is lock free on a fast path (a.k.a. no shutdown).
void Invoke(TClosure callback, TBucket* bucket) override
{
- if (Stopped_.load()) {
+ // We can't guarantee read of |true| in time anyway
+ // So relaxed order is enough.
+ if (Stopped_.load(std::memory_order::relaxed)) {
+ Drain();
return;
}
@@ -638,7 +684,15 @@ public:
action.BucketHolder = MakeStrong(bucket);
action.EnqueuedThreadCookie = ThreadCookie();
- InvokeQueue_.Enqueue(std::move(action));
+ InvokeQueue_.Enqueue(std::move(action)); // <- (a)
+
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b)
+ if (Stopped_.load(std::memory_order::relaxed)) { // <- (c)
+ // We have encountered stop right after we have enqueued
+ // callback. There is a possibility that it is now stuck
+ // in queue forever. We dequeue it ourselves.
+ Drain();
+ }
NotifyFromInvoke(cpuInstant, ActiveThreads_.load() == 0);
}
@@ -683,12 +737,16 @@ public:
void Shutdown()
{
- Drain();
+ if (Stopped_.exchange(true, std::memory_order::relaxed)) { // <- (d)
+ return;
+ }
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (e)
+
+ Drain(); // <- (f)
}
void Drain()
{
- Stopped_.store(true);
auto guard = Guard(MainLock_);
WaitHeap_.Clear();
@@ -1290,15 +1348,9 @@ private:
void DoShutdown() override
{
+ Queue_->Shutdown();
TThreadPoolBase::DoShutdown();
- }
-
- TClosure MakeFinalizerCallback() override
- {
- return BIND([queue = Queue_, callback = TThreadPoolBase::MakeFinalizerCallback()] {
- callback();
- queue->Drain();
- });
+ Queue_->Drain();
}
void DoConfigure(int threadCount) override
diff --git a/yt/yt/core/concurrency/private.h b/yt/yt/core/concurrency/private.h
index e6ad8eaf54..bbdec430de 100644
--- a/yt/yt/core/concurrency/private.h
+++ b/yt/yt/core/concurrency/private.h
@@ -31,17 +31,8 @@ template <class TQueueImpl>
class TSingleQueueSchedulerThread;
template <class TQueueImpl>
-using TSingleQueueSchedulerThreadPtr = TIntrusivePtr<TSingleQueueSchedulerThread<TQueueImpl>>;
-
-template <class TQueueImpl>
class TSuspendableSingleQueueSchedulerThread;
-template <class TQueueImpl>
-using TSuspendableSingleQueueSchedulerThreadPtr = TIntrusivePtr<TSuspendableSingleQueueSchedulerThread<TQueueImpl>>;
-
-using TMpmcSingleQueueSchedulerThread = TSingleQueueSchedulerThread<TMpmcQueueImpl>;
-using TMpmcSingleQueueSchedulerThreadPtr = TIntrusivePtr<TMpmcSingleQueueSchedulerThread>;
-
using TMpscSingleQueueSchedulerThread = TSingleQueueSchedulerThread<TMpscQueueImpl>;
using TMpscSingleQueueSchedulerThreadPtr = TIntrusivePtr<TMpscSingleQueueSchedulerThread>;
diff --git a/yt/yt/core/concurrency/scheduler_thread.cpp b/yt/yt/core/concurrency/scheduler_thread.cpp
index b4a2999bee..2a6c7fbc3d 100644
--- a/yt/yt/core/concurrency/scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/scheduler_thread.cpp
@@ -31,9 +31,6 @@ TSchedulerThread::~TSchedulerThread()
void TSchedulerThread::OnStart()
{ }
-void TSchedulerThread::OnStop()
-{ }
-
void TSchedulerThread::Stop(bool graceful)
{
GracefulStop_.store(graceful);
@@ -61,7 +58,6 @@ void TSchedulerThread::StopPrologue()
void TSchedulerThread::StopEpilogue()
{
TFiberSchedulerThread::StopEpilogue();
- OnStop();
}
TClosure TSchedulerThread::OnExecute()
diff --git a/yt/yt/core/concurrency/scheduler_thread.h b/yt/yt/core/concurrency/scheduler_thread.h
index c4953c9bd1..5eb34ae18c 100644
--- a/yt/yt/core/concurrency/scheduler_thread.h
+++ b/yt/yt/core/concurrency/scheduler_thread.h
@@ -26,8 +26,15 @@ protected:
~TSchedulerThread();
+ // NB(arkady-e1ppa): We don't need a customisation point OnStop
+ // because the only sensible case when we need to do something
+ // After stop is a graceful shutdown for which we might want
+ // to clear the queue. Now, every shutdownable queue is
+ // either drained automatically (graceful = false) or
+ // the Shutdown is graceful (TSchedulerThread::Stop(true)) will
+ // be called. In the latter case |OnExecute| loop will
+ // continue working until the queue is empty anyway. So we are safe.
virtual void OnStart();
- virtual void OnStop();
TClosure OnExecute() override;
diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
index 48525b477a..23f448db13 100644
--- a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
@@ -139,12 +139,6 @@ void TSuspendableSingleQueueSchedulerThread<TQueueImpl>::OnStart()
Queue_->SetThreadId(GetThreadId());
}
-template <class TQueueImpl>
-void TSuspendableSingleQueueSchedulerThread<TQueueImpl>::OnStop()
-{
- Queue_->DrainConsumer();
-}
-
////////////////////////////////////////////////////////////////////////////////
template class TSuspendableSingleQueueSchedulerThread<TMpscQueueImpl>;
diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.h b/yt/yt/core/concurrency/single_queue_scheduler_thread.h
index 2adffcde9d..00540cd372 100644
--- a/yt/yt/core/concurrency/single_queue_scheduler_thread.h
+++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.h
@@ -72,7 +72,6 @@ protected:
void EndExecute() override;
void OnStart() override;
- void OnStop() override;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/suspendable_action_queue.cpp b/yt/yt/core/concurrency/suspendable_action_queue.cpp
index c5e6bed21f..8547adcff2 100644
--- a/yt/yt/core/concurrency/suspendable_action_queue.cpp
+++ b/yt/yt/core/concurrency/suspendable_action_queue.cpp
@@ -42,16 +42,14 @@ public:
void Shutdown(bool graceful) final
{
- if (Stopped_.exchange(true)) {
+ // Synchronization done via Queue_->Shutdown().
+ if (Stopped_.exchange(true, std::memory_order::relaxed)) {
return;
}
- Queue_->Shutdown();
-
- ShutdownInvoker_->Invoke(BIND([graceful, thread = Thread_, queue = Queue_] {
- thread->Shutdown(graceful);
- queue->DrainConsumer();
- }));
+ Queue_->Shutdown(graceful);
+ Thread_->Shutdown(graceful);
+ Queue_->OnConsumerFinished();
}
const IInvokerPtr& GetInvoker() override
@@ -85,17 +83,12 @@ private:
const TShutdownCookie ShutdownCookie_;
const IInvokerPtr ShutdownInvoker_ = GetShutdownInvoker();
- std::atomic<bool> Started_ = false;
std::atomic<bool> Stopped_ = false;
void EnsureStarted()
{
- if (Started_.load(std::memory_order::relaxed)) {
- return;
- }
- if (Started_.exchange(true)) {
- return;
- }
+ // Thread::Start already has
+ // its own short-circ.
Thread_->Start();
}
};
diff --git a/yt/yt/core/concurrency/thread_pool.cpp b/yt/yt/core/concurrency/thread_pool.cpp
index aa56320bfd..4d40748f83 100644
--- a/yt/yt/core/concurrency/thread_pool.cpp
+++ b/yt/yt/core/concurrency/thread_pool.cpp
@@ -191,16 +191,9 @@ private:
void DoShutdown() override
{
- Queue_->Shutdown();
+ Queue_->Shutdown(/*graceful*/ false);
TThreadPoolBase::DoShutdown();
- }
-
- TClosure MakeFinalizerCallback() override
- {
- return BIND_NO_PROPAGATE([queue = Queue_, callback = TThreadPoolBase::MakeFinalizerCallback()] {
- callback();
- queue->DrainConsumer();
- });
+ Queue_->OnConsumerFinished();
}
TSchedulerThreadPtr SpawnThread(int index) override
diff --git a/yt/yt/core/concurrency/thread_pool_detail.cpp b/yt/yt/core/concurrency/thread_pool_detail.cpp
index cb9c5dc1e6..f51e7110f3 100644
--- a/yt/yt/core/concurrency/thread_pool_detail.cpp
+++ b/yt/yt/core/concurrency/thread_pool_detail.cpp
@@ -62,22 +62,15 @@ void TThreadPoolBase::DoStart()
void TThreadPoolBase::DoShutdown()
{
- GetFinalizerInvoker()->Invoke(MakeFinalizerCallback());
-}
-
-TClosure TThreadPoolBase::MakeFinalizerCallback()
-{
decltype(Threads_) threads;
{
auto guard = Guard(SpinLock_);
std::swap(threads, Threads_);
}
- return BIND_NO_PROPAGATE([threads = std::move(threads)] {
- for (const auto& thread : threads) {
- thread->Stop();
- }
- });
+ for (const auto& thread : threads) {
+ thread->Stop();
+ }
}
int TThreadPoolBase::GetThreadCount()
diff --git a/yt/yt/core/concurrency/thread_pool_detail.h b/yt/yt/core/concurrency/thread_pool_detail.h
index 14a306d116..31ef28b922 100644
--- a/yt/yt/core/concurrency/thread_pool_detail.h
+++ b/yt/yt/core/concurrency/thread_pool_detail.h
@@ -42,7 +42,6 @@ protected:
virtual void DoStart();
virtual void DoShutdown();
- virtual TClosure MakeFinalizerCallback();
virtual void DoConfigure(int threadCount);
virtual TSchedulerThreadPtr SpawnThread(int index) = 0;
diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
index cb8a8fd010..3c192b6ce1 100644
--- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
@@ -219,6 +219,10 @@ public:
void Invoke(TClosure callback, TBucket* bucket)
{
auto guard = Guard(SpinLock_);
+ // See Shutdown method.
+ if (Stopping_) {
+ return;
+ }
const auto& pool = IdToPool_[bucket->PoolId];
pool->SizeCounter.Record(++pool->Size);
@@ -267,12 +271,13 @@ public:
void Shutdown()
{
- Drain();
- }
-
- void Drain()
- {
auto guard = Guard(SpinLock_);
+ // NB(arkady-e1ppa): We write/read value under spinlock
+ // instead of atomic in order to ensure that in Invoke method
+ // we either observe |false| and enqueue callback
+ // which will be drained here or we observe |true|
+ // and not enqueue callback at all.
+ Stopping_ = true;
for (const auto& pool : IdToPool_) {
if (pool) {
@@ -453,6 +458,9 @@ private:
const IPoolWeightProviderPtr PoolWeightProvider_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
+ // NB: We set this flag to true so that whomever may spam tasks to queue
+ // will stop doing so after the shutdown.
+ bool Stopping_ = false;
std::vector<std::unique_ptr<TExecutionPool>> IdToPool_;
THashMap<TString, int> NameToPoolId_;
@@ -697,14 +705,6 @@ private:
TThreadPoolBase::DoShutdown();
}
- TClosure MakeFinalizerCallback() override
- {
- return BIND_NO_PROPAGATE([queue = Queue_, callback = TThreadPoolBase::MakeFinalizerCallback()] {
- callback();
- queue->Drain();
- });
- }
-
void DoConfigure(int threadCount) override
{
Queue_->Configure(threadCount);
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index 81a2a4b1d7..d74b8f3220 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -379,7 +379,7 @@ public:
: EventQueue_(New<TMpscInvokerQueue>(
EventCount_,
NConcurrency::GetThreadTags("Logging")))
- , LoggingThread_(New<TThread>(this))
+ , LoggingThread_(New<TLoggingThread>(this))
, SystemWriters_({
CreateStderrLogWriter(
std::make_unique<TPlainTextLogFormatter>(),
@@ -708,11 +708,11 @@ public:
}
private:
- class TThread
+ class TLoggingThread
: public TSchedulerThread
{
public:
- explicit TThread(TImpl* owner)
+ explicit TLoggingThread(TImpl* owner)
: TSchedulerThread(
owner->EventCount_,
"Logging",
@@ -1391,7 +1391,7 @@ private:
private:
const TIntrusivePtr<NThreading::TEventCount> EventCount_ = New<NThreading::TEventCount>();
const TMpscInvokerQueuePtr EventQueue_;
- const TIntrusivePtr<TThread> LoggingThread_;
+ const TIntrusivePtr<TLoggingThread> LoggingThread_;
const TShutdownCookie ShutdownCookie_ = RegisterShutdownCallback(
"LogManager",
BIND_NO_PROPAGATE(&TImpl::Shutdown, MakeWeak(this)),
diff --git a/yt/yt/core/misc/shutdown_priorities.h b/yt/yt/core/misc/shutdown_priorities.h
index f9620ba857..bd47e55a97 100644
--- a/yt/yt/core/misc/shutdown_priorities.h
+++ b/yt/yt/core/misc/shutdown_priorities.h
@@ -5,7 +5,7 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
constexpr int GrpcDispatcherThreadShutdownPriority = 0;
-constexpr int GrpcServerShutdownPriority = 100;
+constexpr int GrpcServerShutdownPriority = 120;
static_assert(GrpcServerShutdownPriority > GrpcDispatcherThreadShutdownPriority);