diff options
author | heretic <heretic@yandex-team.ru> | 2022-03-25 12:34:53 +0300 |
---|---|---|
committer | heretic <heretic@yandex-team.ru> | 2022-03-25 12:34:53 +0300 |
commit | a41f3739eed6fceb6f62056a7620d220958a47e7 (patch) | |
tree | 278103258b510cb4a96761ea79d6ccd397ca05a0 /contrib/libs/grpc/src/cpp/server | |
parent | 73d3613a82e5c217fcbe0ab8bbf8120c1ed1af55 (diff) | |
download | ydb-a41f3739eed6fceb6f62056a7620d220958a47e7.tar.gz |
Update grpc to 1.43.2 DTCC-864
ref:50a492c335cda70f458797cf945e49fe739c2715
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
17 files changed, 130 insertions, 120 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc index 07697a52d1..fdb3da830c 100644 --- a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc +++ b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc @@ -17,7 +17,6 @@ */ #include <grpcpp/generic/async_generic_service.h> - #include <grpcpp/server.h> namespace grpc { diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h index b4a66ba1c6..824f0a9fe1 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h @@ -22,6 +22,7 @@ #include <grpc/support/port_platform.h> #include <grpcpp/grpcpp.h> + #include "src/proto/grpc/channelz/channelz.grpc.pb.h" namespace grpc { diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index f39270924b..52341c1612 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -16,6 +16,8 @@ * */ +#include "src/cpp/server/health/default_health_check_service.h" + #include <memory> #include "y_absl/memory/memory.h" @@ -26,9 +28,7 @@ #include <grpc/support/log.h> #include <grpcpp/impl/codegen/method_handler.h> -#include "src/cpp/server/health/default_health_check_service.h" #include "src/proto/grpc/health/v1/health.upb.h" -#include "upb/upb.hpp" #define MAX_SERVICE_NAME_LENGTH 200 @@ -202,28 +202,16 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( const ByteBuffer& request, TString* service_name) { - std::vector<Slice> slices; - if (!request.Dump(&slices).ok()) return false; + Slice slice; + if (!request.DumpToSingleSlice(&slice).ok()) return false; uint8_t* request_bytes = nullptr; size_t request_size = 0; - if (slices.size() == 1) { - request_bytes = const_cast<uint8_t*>(slices[0].begin()); - request_size = slices[0].size(); - } else if (slices.size() > 1) { - request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); - uint8_t* copy_to = request_bytes; - for (size_t i = 0; i < slices.size(); i++) { - memcpy(copy_to, slices[i].begin(), slices[i].size()); - copy_to += slices[i].size(); - } - } + request_bytes = const_cast<uint8_t*>(slice.begin()); + request_size = slice.size(); upb::Arena arena; grpc_health_v1_HealthCheckRequest* request_struct = grpc_health_v1_HealthCheckRequest_parse( reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); - if (slices.size() > 1) { - gpr_free(request_bytes); - } if (request_struct == nullptr) { return false; } diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc index 4eb492e073..133647a5ed 100644 --- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc @@ -16,10 +16,9 @@ * */ -#include <grpcpp/security/server_credentials.h> - #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpcpp/security/server_credentials.h> namespace grpc { namespace { diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc index 561d4f5048..f778b13785 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc @@ -20,6 +20,8 @@ #ifdef GPR_LINUX +#include <inttypes.h> + #include <cstdio> #include "src/cpp/server/load_reporter/get_cpu_stats.h" @@ -32,7 +34,8 @@ std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { FILE* fp; fp = fopen("/proc/stat", "r"); uint64_t user, nice, system, idle; - if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { + if (fscanf(fp, "cpu %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, &user, + &nice, &system, &idle) != 4) { // Something bad happened with the information, so assume it's all invalid user = nice = system = idle = 0; } diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc index 0a98e848a2..c03daddb35 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc @@ -21,6 +21,7 @@ #ifdef GPR_WINDOWS #include <windows.h> + #include <cstdint> #include "src/cpp/server/load_reporter/get_cpu_stats.h" diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc index 16542bfddf..78e6c32864 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc @@ -18,14 +18,16 @@ #include <grpc/impl/codegen/port_platform.h> +#include "src/cpp/server/load_reporter/load_data_store.h" + #include <stdio.h> + #include <cstdlib> #include <set> #include <unordered_map> #include <vector> #include "src/core/lib/iomgr/socket_utils.h" -#include "src/cpp/server/load_reporter/load_data_store.h" namespace grpc { namespace load_reporter { diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc index 24ad9f3f24..4b4e778a62 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc @@ -18,11 +18,10 @@ #include <grpc/impl/codegen/port_platform.h> -#include <grpcpp/ext/server_load_reporting.h> - #include <cmath> #include <grpc/support/log.h> +#include <grpcpp/ext/server_load_reporting.h> namespace grpc { namespace load_reporter { diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index b6ea87f4fe..819a12e294 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -16,6 +16,8 @@ * */ +#include "src/cpp/server/secure_server_credentials.h" + #include <functional> #include <map> #include <memory> @@ -25,7 +27,6 @@ #include <grpcpp/security/auth_metadata_processor.h> #include "src/cpp/common/secure_auth_context.h" -#include "src/cpp/server/secure_server_credentials.h" namespace grpc { @@ -74,7 +75,6 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( grpc_metadata md_entry; md_entry.key = SliceReferencingString(consumed.first); md_entry.value = SliceReferencingString(consumed.second); - md_entry.flags = 0; consumed_md.push_back(md_entry); } std::vector<grpc_metadata> response_md; @@ -82,7 +82,6 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( grpc_metadata md_entry; md_entry.key = SliceReferencingString(response.first); md_entry.value = SliceReferencingString(response.second); - md_entry.flags = 0; response_md.push_back(md_entry); } auto consumed_md_data = consumed_md.empty() ? nullptr : &consumed_md[0]; diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h index f2b65c2864..95b4cab958 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h @@ -21,11 +21,10 @@ #include <memory> +#include <grpc/grpc_security.h> #include <grpcpp/security/server_credentials.h> #include <grpcpp/security/tls_credentials_options.h> -#include <grpc/grpc_security.h> - #include "src/cpp/server/thread_pool_interface.h" namespace grpc { diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 502d4f20ae..abe95fd834 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -16,15 +16,14 @@ * */ -#include <grpcpp/server_builder.h> +#include <utility> #include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpcpp/impl/service_type.h> #include <grpcpp/resource_quota.h> #include <grpcpp/server.h> - -#include <utility> +#include <grpcpp/server_builder.h> #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" @@ -102,38 +101,23 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( return *this; } -#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL ServerBuilder& ServerBuilder::RegisterCallbackGenericService( CallbackGenericService* service) { if (generic_service_ || callback_generic_service_) { gpr_log(GPR_ERROR, "Adding multiple generic services is unsupported for now. " "Dropping the service %p", - (void*)service); + service); } else { callback_generic_service_ = service; } return *this; } -#else -ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( - experimental::CallbackGenericService* service) { - if (builder_->generic_service_ || builder_->callback_generic_service_) { - gpr_log(GPR_ERROR, - "Adding multiple generic services is unsupported for now. " - "Dropping the service %p", - service); - } else { - builder_->callback_generic_service_ = service; - } - return *builder_; -} -#endif -ServerBuilder& ServerBuilder::experimental_type::SetContextAllocator( +ServerBuilder& ServerBuilder::SetContextAllocator( std::unique_ptr<grpc::ContextAllocator> context_allocator) { - builder_->context_allocator_ = std::move(context_allocator); - return *builder_; + context_allocator_ = std::move(context_allocator); + return *this; } std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> @@ -149,6 +133,12 @@ ServerBuilder::experimental_type::AddExternalConnectionAcceptor( return builder_->acceptors_.back()->GetAcceptor(); } +void ServerBuilder::experimental_type::SetAuthorizationPolicyProvider( + std::shared_ptr<experimental::AuthorizationPolicyProviderInterface> + provider) { + builder_->authorization_provider_ = std::move(provider); +} + ServerBuilder& ServerBuilder::SetOption( std::unique_ptr<ServerBuilderOption> option) { options_.push_back(std::move(option)); @@ -177,9 +167,9 @@ ServerBuilder& ServerBuilder::SetSyncServerOption( ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus( grpc_compression_algorithm algorithm, bool enabled) { if (enabled) { - GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm); + grpc_core::SetBit(&enabled_compression_algorithms_bitset_, algorithm); } else { - GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm); + grpc_core::ClearBit(&enabled_compression_algorithms_bitset_, algorithm); } return *this; } @@ -223,8 +213,8 @@ ServerBuilder& ServerBuilder::AddListeningPort( return *this; } -std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { - grpc::ChannelArguments args; +ChannelArguments ServerBuilder::BuildChannelArgs() { + ChannelArguments args; if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } @@ -245,16 +235,24 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, maybe_default_compression_algorithm_.algorithm); } - if (resource_quota_ != nullptr) { args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_, grpc_resource_quota_arg_vtable()); } - for (const auto& plugin : plugins_) { plugin->UpdateServerBuilder(this); plugin->UpdateChannelArguments(&args); } + if (authorization_provider_ != nullptr) { + args.SetPointerWithVtable(GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER, + authorization_provider_->c_provider(), + grpc_authorization_policy_provider_arg_vtable()); + } + return args; +} + +std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { + ChannelArguments args = BuildChannelArgs(); // == Determine if the server has any syncrhonous methods == bool has_sync_methods = false; @@ -304,6 +302,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } } + if (callback_generic_service_ != nullptr) { + has_frequently_polled_cqs = true; + } + const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs; if (has_sync_methods) { @@ -375,12 +377,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { return nullptr; } -#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL server->RegisterContextAllocator(std::move(context_allocator_)); -#else - server->experimental_registration()->RegisterContextAllocator( - std::move(context_allocator_)); -#endif for (const auto& value : services_) { if (!server->RegisterService(value->host.get(), value->service)) { diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc index f6b72c0fcc..5b2d328b81 100644 --- a/contrib/libs/grpc/src/cpp/server/server_callback.cc +++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc @@ -37,7 +37,7 @@ void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) { GRPC_CLOSURE_INIT( &closure, - [](void* void_arg, grpc_error*) { + [](void* void_arg, grpc_error_handle) { ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg); arg->call->CallOnDone(); delete arg; @@ -66,7 +66,7 @@ void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { : call(call_arg), reactor(reactor_arg) { GRPC_CLOSURE_INIT( &closure, - [](void* void_arg, grpc_error*) { + [](void* void_arg, grpc_error_handle) { ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg); arg->reactor->OnCancel(); arg->call->MaybeDone(); diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index dab7f488a7..edb1fe4d1d 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -15,13 +15,13 @@ * */ -#include <grpcpp/server.h> - #include <cstdlib> #include <sstream> #include <type_traits> #include <utility> +#include "y_absl/memory/memory.h" + #include <grpc/grpc.h> #include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/alloc.h> @@ -39,11 +39,10 @@ #include <grpcpp/impl/server_initializer.h> #include <grpcpp/impl/service_type.h> #include <grpcpp/security/server_credentials.h> +#include <grpcpp/server.h> #include <grpcpp/server_context.h> #include <grpcpp/support/time.h> -#include "y_absl/memory/memory.h" - #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -69,6 +68,15 @@ namespace { // max-threads set) to the server builder. #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX +// Give a useful status error message if the resource is exhausted specifically +// because the server threadpool is full. +const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted"; + +// Although we might like to give a useful status error message on unimplemented +// RPCs, it's not always possible since that also would need to be added across +// languages and isn't actually required by the spec. +const char* kUnknownRpcMethod = ""; + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -107,15 +115,6 @@ class UnimplementedAsyncRequestContext { GenericServerAsyncReaderWriter generic_stream_; }; -// TODO(vjpai): Just for this file, use some contents of the experimental -// namespace here to make the code easier to read below. Remove this when -// de-experimentalized fully. -#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL -using ::grpc::experimental::CallbackGenericService; -using ::grpc::experimental::CallbackServerContext; -using ::grpc::experimental::GenericCallbackServerContext; -#endif - } // namespace ServerInterface::BaseAsyncRequest::BaseAsyncRequest( @@ -266,7 +265,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, } namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { +class ShutdownCallback : public grpc_completion_queue_functor { public: ShutdownCallback() { functor_run = &ShutdownCallback::Run; @@ -282,7 +281,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { // The Run function will get invoked by the completion queue library // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { + static void Run(grpc_completion_queue_functor* cb, int) { auto* callback = static_cast<ShutdownCallback*>(cb); delete callback->cq_; delete callback; @@ -410,7 +409,9 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { : server_->resource_exhausted_handler_.get(); deserialized_request_ = handler->Deserialize(call_, request_payload_, &request_status_, nullptr); - + if (!request_status_.ok()) { + gpr_log(GPR_DEBUG, "Failed to deserialize message."); + } request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); @@ -576,7 +577,7 @@ class Server::CallbackRequest final // method_name needs to be specialized between named method and generic const char* method_name() const; - class CallbackCallTag : public grpc_experimental_completion_queue_functor { + class CallbackCallTag : public grpc_completion_queue_functor { public: explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) : req_(req) { @@ -599,8 +600,7 @@ class Server::CallbackRequest final Server::CallbackRequest<ServerContextType>* req_; grpc::internal::Call* call_; - static void StaticRun(grpc_experimental_completion_queue_functor* cb, - int ok) { + static void StaticRun(grpc_completion_queue_functor* cb, int ok) { static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); } void Run(bool ok) { @@ -650,6 +650,9 @@ class Server::CallbackRequest final req_->request_ = req_->method_->handler()->Deserialize( req_->call_, req_->request_payload_, &req_->request_status_, &req_->handler_data_); + if (!(req_->request_status_.ok())) { + gpr_log(GPR_DEBUG, "Failed to deserialize message."); + } req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); @@ -791,8 +794,8 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { - server_->server()->core_server->SetRegisteredMethodAllocator( - server_cq_->cq(), tag, [this, method] { + grpc_core::Server::FromC(server_->server()) + ->SetRegisteredMethodAllocator(server_cq_->cq(), tag, [this, method] { grpc_core::Server::RegisteredCallAllocation result; new SyncRequest(server_, method, &result); return result; @@ -804,9 +807,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { if (has_sync_method_) { unknown_method_ = y_absl::make_unique<grpc::internal::RpcServiceMethod>( "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, - new grpc::internal::UnknownMethodHandler); - server_->server()->core_server->SetBatchMethodAllocator( - server_cq_->cq(), [this] { + new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod)); + grpc_core::Server::FromC(server_->server()) + ->SetBatchMethodAllocator(server_cq_->cq(), [this] { grpc_core::Server::BatchCallAllocation result; new SyncRequest(server_, unknown_method_.get(), &result); return result; @@ -932,14 +935,16 @@ Server::~Server() { for (const auto& value : sync_req_mgrs_) { value->Shutdown(); } - if (callback_cq_ != nullptr) { + CompletionQueue* callback_cq = + callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type - callback_cq_->Shutdown(); + callback_cq->Shutdown(); } else { - CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); } - callback_cq_ = nullptr; + callback_cq_.store(nullptr, std::memory_order_release); } } } @@ -1028,7 +1033,7 @@ bool Server::RegisterService(const TString* addr, grpc::Service* service) { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); - server_->core_server->SetRegisteredMethodAllocator( + grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator( cq->cq(), method_registration_tag, [this, cq, method_value] { grpc_core::Server::RegisteredCallAllocation result; new CallbackRequest<grpc::CallbackServerContext>(this, method_value, @@ -1069,7 +1074,8 @@ void Server::RegisterCallbackGenericService( generic_handler_.reset(service->Handler()); grpc::CompletionQueue* cq = CallbackCQ(); - server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] { + grpc_core::Server::FromC(server_)->SetBatchMethodAllocator(cq->cq(), [this, + cq] { grpc_core::Server::BatchCallAllocation result; new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); return result; @@ -1106,9 +1112,9 @@ void Server::UnrefAndWaitLocked() { shutdown_done_ = true; return; // no need to wait on CV since done condition already set } - grpc::internal::WaitUntil( - &shutdown_done_cv_, &mu_, - [this]() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; }); + while (!shutdown_done_) { + shutdown_done_cv_.Wait(&mu_); + } } void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { @@ -1194,7 +1200,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { resource_exhausted_handler_ = - y_absl::make_unique<grpc::internal::ResourceExhaustedHandler>(); + y_absl::make_unique<grpc::internal::ResourceExhaustedHandler>( + kServerThreadpoolExhausted); } for (const auto& value : sync_req_mgrs_) { @@ -1258,14 +1265,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. - if (callback_cq_ != nullptr) { + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type - callback_cq_->Shutdown(); + callback_cq->Shutdown(); } else { - CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); } - callback_cq_ = nullptr; + callback_cq_.store(nullptr, std::memory_order_release); } // Drain the shutdown queue (if the previous call to AsyncNext() timed out @@ -1320,8 +1328,9 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { - grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); - grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); + grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod); + grpc::internal::UnknownMethodHandler::FillOps(request_->context(), + kUnknownRpcMethod, this); request_->stream()->call_.PerformOps(this); } @@ -1332,25 +1341,33 @@ grpc::ServerInitializer* Server::initializer() { grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); + if (callback_cq != nullptr) { + return callback_cq; + } + // The callback_cq_ wasn't already set, so grab a lock and set it up exactly + // once for this server. grpc::internal::MutexLock l(&mu_); - if (callback_cq_ != nullptr) { - return callback_cq_; + callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { + return callback_cq; } if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type auto* shutdown_callback = new grpc::ShutdownCallback; - callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ + callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + shutdown_callback->TakeCQ(callback_cq); } else { // Otherwise we need to use the alternative CQ variant - callback_cq_ = CompletionQueue::CallbackAlternativeCQ(); + callback_cq = CompletionQueue::CallbackAlternativeCQ(); } - return callback_cq_; + callback_cq_.store(callback_cq, std::memory_order_release); + return callback_cq; } } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index df82c69ed8..c8ace3d914 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -16,9 +16,8 @@ * */ -#include <grpcpp/impl/codegen/server_context.h> - #include <algorithm> +#include <atomic> #include <utility> #include <grpc/compression.h> @@ -28,6 +27,7 @@ #include <grpc/support/log.h> #include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/server_context.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/support/server_callback.h> #include <grpcpp/support/time.h> @@ -327,14 +327,16 @@ void ServerContextBase::TryCancel() const { bool ServerContextBase::IsCancelled() const { if (completion_tag_) { // When using callback API, this result is always valid. - return completion_op_->CheckCancelledAsync(); + return marked_cancelled_.load(std::memory_order_acquire) || + completion_op_->CheckCancelledAsync(); } else if (has_notify_when_done_tag_) { // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { // when using sync API, the result is always valid - return completion_op_ && completion_op_->CheckCancelled(cq_); + return marked_cancelled_.load(std::memory_order_acquire) || + (completion_op_ && completion_op_->CheckCancelled(cq_)); } } diff --git a/contrib/libs/grpc/src/cpp/server/server_credentials.cc b/contrib/libs/grpc/src/cpp/server/server_credentials.cc index 36b5a52dc7..454e8b4e9d 100644 --- a/contrib/libs/grpc/src/cpp/server/server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/server_credentials.cc @@ -16,9 +16,8 @@ * */ -#include <grpcpp/security/server_credentials.h> - #include <grpcpp/impl/grpc_library.h> +#include <grpcpp/security/server_credentials.h> namespace grpc { diff --git a/contrib/libs/grpc/src/cpp/server/server_posix.cc b/contrib/libs/grpc/src/cpp/server/server_posix.cc index c3d40d4fa2..f2452cc326 100644 --- a/contrib/libs/grpc/src/cpp/server/server_posix.cc +++ b/contrib/libs/grpc/src/cpp/server/server_posix.cc @@ -16,9 +16,8 @@ * */ -#include <grpcpp/server_posix.h> - #include <grpc/grpc_posix.h> +#include <grpcpp/server_posix.h> namespace grpc { diff --git a/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc index b543f3f172..f184238906 100644 --- a/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc @@ -19,7 +19,6 @@ #include "src/cpp/server/secure_server_credentials.h" namespace grpc { -namespace experimental { std::shared_ptr<ServerCredentials> XdsServerCredentials( const std::shared_ptr<ServerCredentials>& fallback_credentials) { @@ -37,5 +36,12 @@ std::shared_ptr<ServerCredentials> XdsServerCredentials( fallback_credentials->AsSecureServerCredentials()->c_creds())); } +namespace experimental { + +std::shared_ptr<ServerCredentials> XdsServerCredentials( + const std::shared_ptr<ServerCredentials>& fallback_credentials) { + return grpc::XdsServerCredentials(fallback_credentials); +} + } // namespace experimental } // namespace grpc |