diff options
| author | somov <[email protected]> | 2023-07-07 10:43:23 +0300 | 
|---|---|---|
| committer | somov <[email protected]> | 2023-07-07 10:43:23 +0300 | 
| commit | 76a8d10767f44c35f29a333cca2573b090a52a7f (patch) | |
| tree | 9eeb994a2ddca2ac7c526f3773a70730f1014d47 /contrib/libs/grpc | |
| parent | c1710d05f5ce29696c2b28cfbbb0b64e7bb1215c (diff) | |
Backport gRPC forking assert fix
https://github.com/grpc/grpc/commit/51f276e7beef3a9e4e9f96d4525e2035169d24be
Diffstat (limited to 'contrib/libs/grpc')
| -rw-r--r-- | contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc | 45 | ||||
| -rw-r--r-- | contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h | 15 | 
2 files changed, 26 insertions, 34 deletions
diff --git a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc index 5fa18ab7ccd..76797a8ae1a 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc +++ b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc @@ -71,7 +71,7 @@ void ThreadPool::ThreadFunc(StatePtr state) {  bool ThreadPool::Queue::Step() {    grpc_core::ReleasableMutexLock lock(&mu_);    // Wait until work is available or we are shutting down. -  while (state_ == State::kRunning && callbacks_.empty()) { +  while (!shutdown_ && !forking_ && callbacks_.empty()) {      // If there are too many threads waiting, then quit this thread.      // TODO(ctiller): wait some time in this case to be sure.      if (threads_waiting_ >= reserve_threads_) return false; @@ -79,14 +79,8 @@ bool ThreadPool::Queue::Step() {      cv_.Wait(&mu_);      threads_waiting_--;    } -  switch (state_) { -    case State::kRunning: -      break; -    case State::kShutdown: -    case State::kForking: -      if (!callbacks_.empty()) break; -      return false; -  } +  if (forking_) return false; +  if (shutdown_ && callbacks_.empty()) return false;    GPR_ASSERT(!callbacks_.empty());    auto callback = std::move(callbacks_.front());    callbacks_.pop(); @@ -103,7 +97,7 @@ ThreadPool::ThreadPool(int reserve_threads)  }  ThreadPool::~ThreadPool() { -  state_->queue.SetShutdown(); +  state_->queue.SetShutdown(true);    // Wait until all threads are exited.    // Note that if this is a threadpool thread then we won't exit this thread    // until the callstack unwinds a little, so we need to wait for just one @@ -126,27 +120,24 @@ bool ThreadPool::Queue::Add(y_absl::AnyInvocable<void()> callback) {    // Add works to the callbacks list    callbacks_.push(std::move(callback));    cv_.Signal(); -  switch (state_) { -    case State::kRunning: -    case State::kShutdown: -      return threads_waiting_ == 0; -    case State::kForking: -      return false; -  } -  GPR_UNREACHABLE_CODE(return false); +  if (forking_) return false; +  return threads_waiting_ == 0;  } -void ThreadPool::Queue::SetState(State state) { +void ThreadPool::Queue::SetShutdown(bool is_shutdown) {    grpc_core::MutexLock lock(&mu_); -  if (state == State::kRunning) { -    GPR_ASSERT(state_ != State::kRunning); -  } else { -    GPR_ASSERT(state_ == State::kRunning); -  } -  state_ = state; +  auto was_shutdown = std::exchange(shutdown_, is_shutdown); +  GPR_ASSERT(is_shutdown != was_shutdown);    cv_.SignalAll();  } +void ThreadPool::Queue::SetForking(bool is_forking) { +  grpc_core::MutexLock lock(&mu_); +  auto was_forking = std::exchange(forking_, is_forking); +  GPR_ASSERT(is_forking != was_forking); +  cv_.SignalAll(); +} +   void ThreadPool::ThreadCount::Add() {    grpc_core::MutexLock lock(&mu_);    ++threads_; @@ -176,7 +167,7 @@ void ThreadPool::ThreadCount::BlockUntilThreadCount(int threads,  }  void ThreadPool::PrepareFork() { -  state_->queue.SetForking(); +  state_->queue.SetForking(true);    state_->thread_count.BlockUntilThreadCount(0, "forking");  } @@ -185,7 +176,7 @@ void ThreadPool::PostforkParent() { Postfork(); }  void ThreadPool::PostforkChild() { Postfork(); }  void ThreadPool::Postfork() { -  state_->queue.Reset(); +  state_->queue.SetForking(false);    for (int i = 0; i < reserve_threads_; i++) {      StartThread(state_, /*throttled=*/false);    } diff --git a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h index db66ff4b753..dfe1d9ca106 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h +++ b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h @@ -53,24 +53,25 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {     public:      explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}      bool Step(); -    void SetShutdown() { SetState(State::kShutdown); } -    void SetForking() { SetState(State::kForking); }      // Add a callback to the queue.      // Return true if we should also spin up a new thread.      bool Add(y_absl::AnyInvocable<void()> callback); -    void Reset() { SetState(State::kRunning); } +    void SetShutdown(bool is_shutdown); +    void SetForking(bool is_forking);     private: -    enum class State { kRunning, kShutdown, kForking }; - -    void SetState(State state);      grpc_core::Mutex mu_;      grpc_core::CondVar cv_;      std::queue<y_absl::AnyInvocable<void()>> callbacks_ Y_ABSL_GUARDED_BY(mu_);      int threads_waiting_ Y_ABSL_GUARDED_BY(mu_) = 0;      const int reserve_threads_; -    State state_ Y_ABSL_GUARDED_BY(mu_) = State::kRunning; +    // Track shutdown and fork bits separately. +    // It's possible for a ThreadPool to initiate shut down while fork handlers +    // are running, and similarly possible for a fork event to occur during +    // shutdown. +    bool shutdown_ Y_ABSL_GUARDED_BY(mu_) = false; +    bool forking_ Y_ABSL_GUARDED_BY(mu_) = false;    };    class ThreadCount {  | 
