aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.com>2023-07-07 10:43:23 +0300
committersomov <somov@yandex-team.com>2023-07-07 10:43:23 +0300
commit76a8d10767f44c35f29a333cca2573b090a52a7f (patch)
tree9eeb994a2ddca2ac7c526f3773a70730f1014d47 /contrib/libs
parentc1710d05f5ce29696c2b28cfbbb0b64e7bb1215c (diff)
downloadydb-76a8d10767f44c35f29a333cca2573b090a52a7f.tar.gz
Backport gRPC forking assert fix
https://github.com/grpc/grpc/commit/51f276e7beef3a9e4e9f96d4525e2035169d24be
Diffstat (limited to 'contrib/libs')
-rw-r--r--contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc45
-rw-r--r--contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h15
2 files changed, 26 insertions, 34 deletions
diff --git a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc
index 5fa18ab7cc..76797a8ae1 100644
--- a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc
+++ b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.cc
@@ -71,7 +71,7 @@ void ThreadPool::ThreadFunc(StatePtr state) {
bool ThreadPool::Queue::Step() {
grpc_core::ReleasableMutexLock lock(&mu_);
// Wait until work is available or we are shutting down.
- while (state_ == State::kRunning && callbacks_.empty()) {
+ while (!shutdown_ && !forking_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread.
// TODO(ctiller): wait some time in this case to be sure.
if (threads_waiting_ >= reserve_threads_) return false;
@@ -79,14 +79,8 @@ bool ThreadPool::Queue::Step() {
cv_.Wait(&mu_);
threads_waiting_--;
}
- switch (state_) {
- case State::kRunning:
- break;
- case State::kShutdown:
- case State::kForking:
- if (!callbacks_.empty()) break;
- return false;
- }
+ if (forking_) return false;
+ if (shutdown_ && callbacks_.empty()) return false;
GPR_ASSERT(!callbacks_.empty());
auto callback = std::move(callbacks_.front());
callbacks_.pop();
@@ -103,7 +97,7 @@ ThreadPool::ThreadPool(int reserve_threads)
}
ThreadPool::~ThreadPool() {
- state_->queue.SetShutdown();
+ state_->queue.SetShutdown(true);
// Wait until all threads are exited.
// Note that if this is a threadpool thread then we won't exit this thread
// until the callstack unwinds a little, so we need to wait for just one
@@ -126,27 +120,24 @@ bool ThreadPool::Queue::Add(y_absl::AnyInvocable<void()> callback) {
// Add works to the callbacks list
callbacks_.push(std::move(callback));
cv_.Signal();
- switch (state_) {
- case State::kRunning:
- case State::kShutdown:
- return threads_waiting_ == 0;
- case State::kForking:
- return false;
- }
- GPR_UNREACHABLE_CODE(return false);
+ if (forking_) return false;
+ return threads_waiting_ == 0;
}
-void ThreadPool::Queue::SetState(State state) {
+void ThreadPool::Queue::SetShutdown(bool is_shutdown) {
grpc_core::MutexLock lock(&mu_);
- if (state == State::kRunning) {
- GPR_ASSERT(state_ != State::kRunning);
- } else {
- GPR_ASSERT(state_ == State::kRunning);
- }
- state_ = state;
+ auto was_shutdown = std::exchange(shutdown_, is_shutdown);
+ GPR_ASSERT(is_shutdown != was_shutdown);
cv_.SignalAll();
}
+void ThreadPool::Queue::SetForking(bool is_forking) {
+ grpc_core::MutexLock lock(&mu_);
+ auto was_forking = std::exchange(forking_, is_forking);
+ GPR_ASSERT(is_forking != was_forking);
+ cv_.SignalAll();
+}
+
void ThreadPool::ThreadCount::Add() {
grpc_core::MutexLock lock(&mu_);
++threads_;
@@ -176,7 +167,7 @@ void ThreadPool::ThreadCount::BlockUntilThreadCount(int threads,
}
void ThreadPool::PrepareFork() {
- state_->queue.SetForking();
+ state_->queue.SetForking(true);
state_->thread_count.BlockUntilThreadCount(0, "forking");
}
@@ -185,7 +176,7 @@ void ThreadPool::PostforkParent() { Postfork(); }
void ThreadPool::PostforkChild() { Postfork(); }
void ThreadPool::Postfork() {
- state_->queue.Reset();
+ state_->queue.SetForking(false);
for (int i = 0; i < reserve_threads_; i++) {
StartThread(state_, /*throttled=*/false);
}
diff --git a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h
index db66ff4b75..dfe1d9ca10 100644
--- a/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h
+++ b/contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h
@@ -53,24 +53,25 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
public:
explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}
bool Step();
- void SetShutdown() { SetState(State::kShutdown); }
- void SetForking() { SetState(State::kForking); }
// Add a callback to the queue.
// Return true if we should also spin up a new thread.
bool Add(y_absl::AnyInvocable<void()> callback);
- void Reset() { SetState(State::kRunning); }
+ void SetShutdown(bool is_shutdown);
+ void SetForking(bool is_forking);
private:
- enum class State { kRunning, kShutdown, kForking };
-
- void SetState(State state);
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
std::queue<y_absl::AnyInvocable<void()>> callbacks_ Y_ABSL_GUARDED_BY(mu_);
int threads_waiting_ Y_ABSL_GUARDED_BY(mu_) = 0;
const int reserve_threads_;
- State state_ Y_ABSL_GUARDED_BY(mu_) = State::kRunning;
+ // Track shutdown and fork bits separately.
+ // It's possible for a ThreadPool to initiate shut down while fork handlers
+ // are running, and similarly possible for a fork event to occur during
+ // shutdown.
+ bool shutdown_ Y_ABSL_GUARDED_BY(mu_) = false;
+ bool forking_ Y_ABSL_GUARDED_BY(mu_) = false;
};
class ThreadCount {