diff options
author | somov <somov@yandex-team.com> | 2023-07-07 10:43:23 +0300 |
---|---|---|
committer | somov <somov@yandex-team.com> | 2023-07-07 10:43:23 +0300 |
commit | 76a8d10767f44c35f29a333cca2573b090a52a7f (patch) | |
tree | 9eeb994a2ddca2ac7c526f3773a70730f1014d47 /contrib/libs | |
parent | c1710d05f5ce29696c2b28cfbbb0b64e7bb1215c (diff) | |
download | ydb-76a8d10767f44c35f29a333cca2573b090a52a7f.tar.gz |
Backport gRPC forking assert fix
https://github.com/grpc/grpc/commit/51f276e7beef3a9e4e9f96d4525e2035169d24be
Diffstat (limited to 'contrib/libs')
-rw-r--r-- | contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc | 45 | ||||
-rw-r--r-- | contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h | 15 |
2 files changed, 26 insertions, 34 deletions
diff --git a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc index 5fa18ab7cc..76797a8ae1 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc +++ b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc @@ -71,7 +71,7 @@ void ThreadPool::ThreadFunc(StatePtr state) { bool ThreadPool::Queue::Step() { grpc_core::ReleasableMutexLock lock(&mu_); // Wait until work is available or we are shutting down. - while (state_ == State::kRunning && callbacks_.empty()) { + while (!shutdown_ && !forking_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread. // TODO(ctiller): wait some time in this case to be sure. if (threads_waiting_ >= reserve_threads_) return false; @@ -79,14 +79,8 @@ bool ThreadPool::Queue::Step() { cv_.Wait(&mu_); threads_waiting_--; } - switch (state_) { - case State::kRunning: - break; - case State::kShutdown: - case State::kForking: - if (!callbacks_.empty()) break; - return false; - } + if (forking_) return false; + if (shutdown_ && callbacks_.empty()) return false; GPR_ASSERT(!callbacks_.empty()); auto callback = std::move(callbacks_.front()); callbacks_.pop(); @@ -103,7 +97,7 @@ ThreadPool::ThreadPool(int reserve_threads) } ThreadPool::~ThreadPool() { - state_->queue.SetShutdown(); + state_->queue.SetShutdown(true); // Wait until all threads are exited. // Note that if this is a threadpool thread then we won't exit this thread // until the callstack unwinds a little, so we need to wait for just one @@ -126,27 +120,24 @@ bool ThreadPool::Queue::Add(y_absl::AnyInvocable<void()> callback) { // Add works to the callbacks list callbacks_.push(std::move(callback)); cv_.Signal(); - switch (state_) { - case State::kRunning: - case State::kShutdown: - return threads_waiting_ == 0; - case State::kForking: - return false; - } - GPR_UNREACHABLE_CODE(return false); + if (forking_) return false; + return threads_waiting_ == 0; } -void ThreadPool::Queue::SetState(State state) { +void ThreadPool::Queue::SetShutdown(bool is_shutdown) { grpc_core::MutexLock lock(&mu_); - if (state == State::kRunning) { - GPR_ASSERT(state_ != State::kRunning); - } else { - GPR_ASSERT(state_ == State::kRunning); - } - state_ = state; + auto was_shutdown = std::exchange(shutdown_, is_shutdown); + GPR_ASSERT(is_shutdown != was_shutdown); cv_.SignalAll(); } +void ThreadPool::Queue::SetForking(bool is_forking) { + grpc_core::MutexLock lock(&mu_); + auto was_forking = std::exchange(forking_, is_forking); + GPR_ASSERT(is_forking != was_forking); + cv_.SignalAll(); +} + void ThreadPool::ThreadCount::Add() { grpc_core::MutexLock lock(&mu_); ++threads_; @@ -176,7 +167,7 @@ void ThreadPool::ThreadCount::BlockUntilThreadCount(int threads, } void ThreadPool::PrepareFork() { - state_->queue.SetForking(); + state_->queue.SetForking(true); state_->thread_count.BlockUntilThreadCount(0, "forking"); } @@ -185,7 +176,7 @@ void ThreadPool::PostforkParent() { Postfork(); } void ThreadPool::PostforkChild() { Postfork(); } void ThreadPool::Postfork() { - state_->queue.Reset(); + state_->queue.SetForking(false); for (int i = 0; i < reserve_threads_; i++) { StartThread(state_, /*throttled=*/false); } diff --git a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h index db66ff4b75..dfe1d9ca10 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h +++ b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h @@ -53,24 +53,25 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { public: explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {} bool Step(); - void SetShutdown() { SetState(State::kShutdown); } - void SetForking() { SetState(State::kForking); } // Add a callback to the queue. // Return true if we should also spin up a new thread. bool Add(y_absl::AnyInvocable<void()> callback); - void Reset() { SetState(State::kRunning); } + void SetShutdown(bool is_shutdown); + void SetForking(bool is_forking); private: - enum class State { kRunning, kShutdown, kForking }; - - void SetState(State state); grpc_core::Mutex mu_; grpc_core::CondVar cv_; std::queue<y_absl::AnyInvocable<void()>> callbacks_ Y_ABSL_GUARDED_BY(mu_); int threads_waiting_ Y_ABSL_GUARDED_BY(mu_) = 0; const int reserve_threads_; - State state_ Y_ABSL_GUARDED_BY(mu_) = State::kRunning; + // Track shutdown and fork bits separately. + // It's possible for a ThreadPool to initiate shut down while fork handlers + // are running, and similarly possible for a fork event to occur during + // shutdown. + bool shutdown_ Y_ABSL_GUARDED_BY(mu_) = false; + bool forking_ Y_ABSL_GUARDED_BY(mu_) = false; }; class ThreadCount { |