diff options
author | shadchin <shadchin@yandex-team.com> | 2024-12-17 12:59:42 +0300 |
---|---|---|
committer | shadchin <shadchin@yandex-team.com> | 2024-12-17 13:19:38 +0300 |
commit | 30272794b11546a1e358cac78fc561ad1d4ef01b (patch) | |
tree | 7acbd64003d961bd7ff206155e6d0dbe3587f7b8 /contrib/libs/grpc/src | |
parent | f32d21718185bebaf27b832f94f21c02825660b3 (diff) | |
download | ydb-30272794b11546a1e358cac78fc561ad1d4ef01b.tar.gz |
Update contrib/libs/grpc and contrib/python/grpcio/py3 to 1.54.3
commit_hash:bb9eada94a255f7675a516b6c69ab9b64e741eec
Diffstat (limited to 'contrib/libs/grpc/src')
9 files changed, 109 insertions, 18 deletions
diff --git a/contrib/libs/grpc/src/compiler/grpc_cpp_plugin/ya.make b/contrib/libs/grpc/src/compiler/grpc_cpp_plugin/ya.make index 4b68d39f28..1552ac1891 100644 --- a/contrib/libs/grpc/src/compiler/grpc_cpp_plugin/ya.make +++ b/contrib/libs/grpc/src/compiler/grpc_cpp_plugin/ya.make @@ -4,7 +4,7 @@ PROGRAM() WITHOUT_LICENSE_TEXTS() -VERSION(1.54.2) +VERSION(1.54.3) LICENSE(Apache-2.0) diff --git a/contrib/libs/grpc/src/compiler/grpc_plugin_support/ya.make b/contrib/libs/grpc/src/compiler/grpc_plugin_support/ya.make index 38344b481b..6b1e5e8283 100644 --- a/contrib/libs/grpc/src/compiler/grpc_plugin_support/ya.make +++ b/contrib/libs/grpc/src/compiler/grpc_plugin_support/ya.make @@ -4,7 +4,7 @@ LIBRARY() WITHOUT_LICENSE_TEXTS() -VERSION(1.54.2) +VERSION(1.54.3) LICENSE(Apache-2.0) diff --git a/contrib/libs/grpc/src/compiler/grpc_python_plugin/ya.make b/contrib/libs/grpc/src/compiler/grpc_python_plugin/ya.make index 77d76cc825..e849105ecc 100644 --- a/contrib/libs/grpc/src/compiler/grpc_python_plugin/ya.make +++ b/contrib/libs/grpc/src/compiler/grpc_python_plugin/ya.make @@ -4,7 +4,7 @@ PROGRAM() WITHOUT_LICENSE_TEXTS() -VERSION(1.54.2) +VERSION(1.54.3) LICENSE(Apache-2.0) diff --git a/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine.h b/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine.h index a59c943d40..d8a055cc27 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -197,6 +197,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport, const DNSResolver::ResolverOptions& options) override; void Run(Closure* closure) override; void Run(y_absl::AnyInvocable<void()> closure) override; + // Caution!! The timer implementation cannot create any fds. See #20418. TaskHandle RunAfter(Duration when, Closure* closure) override; TaskHandle RunAfter(Duration when, y_absl::AnyInvocable<void()> closure) override; diff --git a/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 3230b72c7e..d6c2d9052a 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -23,8 +23,10 @@ #include <sys/socket.h> // IWYU pragma: keep #include <unistd.h> // IWYU pragma: keep +#include <atomic> #include <util/generic/string.h> #include <util/string/cast.h> +#include <tuple> #include <utility> #include "y_absl/functional/any_invocable.h" @@ -42,6 +44,7 @@ #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/socket_mutator.h" namespace grpc_event_engine { @@ -137,6 +140,32 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( switch (errno) { case EINTR: continue; + case EMFILE: + // When the process runs out of fds, accept4() returns EMFILE. When + // this happens, the connection is left in the accept queue until + // either a read event triggers the on_read callback, or time has + // passed and the accept should be re-tried regardless. This callback + // is not cancelled, so a spurious wakeup may occur even when there's + // nothing to accept. This is not a performant code path, but if an fd + // limit has been reached, the system is likely in an unhappy state + // regardless. + GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s", + "File descriptor limit reached. Retrying."); + handle_->NotifyOnRead(notify_on_accept_); + // Do not schedule another timer if one is already armed. + if (retry_timer_armed_.exchange(true)) return; + // Hold a ref while the retry timer is waiting, to prevent listener + // destruction and the races that would ensue. + Ref(); + std::ignore = + engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() { + retry_timer_armed_.store(false); + if (!handle_->IsHandleShutdown()) { + handle_->SetReadable(); + } + Unref(); + }); + return; case EAGAIN: case ECONNABORTED: handle_->NotifyOnRead(notify_on_accept_); diff --git a/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.h index ce90abf8f4..bc91079ae4 100644 --- a/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.h +++ b/contrib/libs/grpc/src/core/lib/event_engine/posix_engine/posix_engine_listener.h @@ -122,6 +122,9 @@ class PosixEngineListenerImpl ListenerSocketsContainer::ListenerSocket socket_; EventHandle* handle_; PosixEngineClosure* notify_on_accept_; + // Tracks the status of a backup timer to retry accept4 calls after file + // descriptor exhaustion. + std::atomic<bool> retry_timer_armed_{false}; }; class ListenerAsyncAcceptors : public ListenerSocketsContainer { public: diff --git a/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_posix.cc b/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_posix.cc index 6906d847b6..ef099594e3 100644 --- a/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_posix.cc +++ b/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_posix.cc @@ -16,13 +16,17 @@ // // +#include <grpc/support/port_platform.h> + +#include <utility> + +#include <grpc/support/atm.h> + // FIXME: "posix" files shouldn't be depending on _GNU_SOURCE #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif -#include <grpc/support/port_platform.h> - #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP_SERVER @@ -47,6 +51,7 @@ #include <grpc/byte_buffer.h> #include <grpc/event_engine/endpoint_config.h> +#include <grpc/event_engine/event_engine.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -77,6 +82,8 @@ #include "src/core/lib/transport/error_utils.h" static std::atomic<int64_t> num_dropped_connections{0}; +static constexpr grpc_core::Duration kRetryAcceptWaitTime{ + grpc_core::Duration::Seconds(1)}; using ::grpc_event_engine::experimental::EndpointConfig; using ::grpc_event_engine::experimental::EventEngine; @@ -341,24 +348,40 @@ static void on_read(void* arg, grpc_error_handle err) { if (fd < 0) { if (errno == EINTR) { continue; - } else if (errno == EAGAIN || errno == ECONNABORTED || - errno == EWOULDBLOCK) { + } + // When the process runs out of fds, accept4() returns EMFILE. When this + // happens, the connection is left in the accept queue until either a + // read event triggers the on_read callback, or time has passed and the + // accept should be re-tried regardless. This callback is not cancelled, + // so a spurious wakeup may occur even when there's nothing to accept. + // This is not a performant code path, but if an fd limit has been + // reached, the system is likely in an unhappy state regardless. + if (errno == EMFILE) { + GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s", + "File descriptor limit reached. Retrying."); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return; + grpc_timer_init(&sp->retry_timer, + grpc_core::Timestamp::Now() + kRetryAcceptWaitTime, + &sp->retry_closure); + return; + } + if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) { grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); return; - } else if (errno == EMFILE || errno == ENFILE) { + } else if (errno == ENFILE) { Y_ABORT("grpc failed accept4: %s", strerror(errno)); + } + gpr_mu_lock(&sp->server->mu); + if (!sp->server->shutdown_listeners) { + gpr_log(GPR_ERROR, "Failed accept4: %s", + grpc_core::StrError(errno).c_str()); } else { - gpr_mu_lock(&sp->server->mu); - if (!sp->server->shutdown_listeners) { - gpr_log(GPR_ERROR, "Failed accept4: %s", - grpc_core::StrError(errno).c_str()); - } else { - // if we have shutdown listeners, accept4 could fail, and we - // needn't notify users - } - gpr_mu_unlock(&sp->server->mu); - goto error; + // if we have shutdown listeners, accept4 could fail, and we + // needn't notify users } + gpr_mu_unlock(&sp->server->mu); + goto error; } if (sp->server->memory_quota->IsMemoryPressureHigh()) { @@ -551,6 +574,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener, sp->port_index = listener->port_index; sp->fd_index = listener->fd_index + count - i; GPR_ASSERT(sp->emfd); + grpc_tcp_server_listener_initialize_retry_timer(sp); while (listener->server->tail->next != nullptr) { listener->server->tail = listener->server->tail->next; } @@ -784,6 +808,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) { if (s->active_ports) { grpc_tcp_listener* sp; for (sp = s->head; sp; sp = sp->next) { + grpc_timer_cancel(&sp->retry_timer); grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown")); } } diff --git a/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix.h b/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix.h index fc33fcd81e..25aa1a5894 100644 --- a/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/resource_quota/memory_quota.h" // one listening port @@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener { // identified while iterating through 'next'. struct grpc_tcp_listener* sibling; int is_sibling; + // If an accept4() call fails, a timer is started to drain the accept queue in + // case no further connection attempts reach the gRPC server. + grpc_closure retry_closure; + grpc_timer retry_timer; + gpr_atm retry_timer_armed; } grpc_tcp_listener; // the overall server @@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket( // Ruturn true if the platform supports ifaddrs bool grpc_tcp_server_have_ifaddrs(void); +// Initialize (but don't start) the timer and callback to retry accept4() on a +// listening socket after file descriptors have been exhausted. This must be +// called when creating a new listener. +void grpc_tcp_server_listener_initialize_retry_timer( + grpc_tcp_listener* listener); + #endif // GRPC_SRC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H diff --git a/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index dd3d68e72f..2af2fd8827 100644 --- a/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/contrib/libs/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -18,6 +18,8 @@ #include <grpc/support/port_platform.h> +#include <grpc/support/atm.h> + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON @@ -82,6 +84,24 @@ static int get_max_accept_queue_size(void) { return s_max_accept_queue_size; } +static void listener_retry_timer_cb(void* arg, grpc_error_handle err) { + // Do nothing if cancelled. + if (!err.ok()) return; + grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg); + gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); + if (!grpc_fd_is_shutdown(listener->emfd)) { + grpc_fd_set_readable(listener->emfd); + } +} + +void grpc_tcp_server_listener_initialize_retry_timer( + grpc_tcp_listener* listener) { + gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); + grpc_timer_init_unset(&listener->retry_timer); + GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener, + grpc_schedule_on_exec_ctx); +} + static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, const grpc_resolved_address* addr, unsigned port_index, @@ -113,6 +133,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name.c_str(), true); + grpc_tcp_server_listener_initialize_retry_timer(sp); // Check and set fd as prellocated if (grpc_tcp_server_pre_allocated_fd(s) == fd) { |