aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp/server
diff options
context:
space:
mode:
authorheretic <heretic@yandex-team.ru>2022-03-25 12:34:53 +0300
committerheretic <heretic@yandex-team.ru>2022-03-25 12:34:53 +0300
commita41f3739eed6fceb6f62056a7620d220958a47e7 (patch)
tree278103258b510cb4a96761ea79d6ccd397ca05a0 /contrib/libs/grpc/src/cpp/server
parent73d3613a82e5c217fcbe0ab8bbf8120c1ed1af55 (diff)
downloadydb-a41f3739eed6fceb6f62056a7620d220958a47e7.tar.gz
Update grpc to 1.43.2 DTCC-864
ref:50a492c335cda70f458797cf945e49fe739c2715
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
-rw-r--r--contrib/libs/grpc/src/cpp/server/async_generic_service.cc1
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h1
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc24
-rw-r--r--contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc3
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc5
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc1
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc4
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/util.cc3
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc5
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.h3
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_builder.cc63
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_callback.cc4
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc109
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_context.cc10
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_credentials.cc3
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_posix.cc3
-rw-r--r--contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc8
17 files changed, 130 insertions, 120 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc
index 07697a52d1..fdb3da830c 100644
--- a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc
@@ -17,7 +17,6 @@
*/
#include <grpcpp/generic/async_generic_service.h>
-
#include <grpcpp/server.h>
namespace grpc {
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
index b4a66ba1c6..824f0a9fe1 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
@@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include <grpcpp/grpcpp.h>
+
#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
namespace grpc {
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
index f39270924b..52341c1612 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
@@ -16,6 +16,8 @@
*
*/
+#include "src/cpp/server/health/default_health_check_service.h"
+
#include <memory>
#include "y_absl/memory/memory.h"
@@ -26,9 +28,7 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/method_handler.h>
-#include "src/cpp/server/health/default_health_check_service.h"
#include "src/proto/grpc/health/v1/health.upb.h"
-#include "upb/upb.hpp"
#define MAX_SERVICE_NAME_LENGTH 200
@@ -202,28 +202,16 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
const ByteBuffer& request, TString* service_name) {
- std::vector<Slice> slices;
- if (!request.Dump(&slices).ok()) return false;
+ Slice slice;
+ if (!request.DumpToSingleSlice(&slice).ok()) return false;
uint8_t* request_bytes = nullptr;
size_t request_size = 0;
- if (slices.size() == 1) {
- request_bytes = const_cast<uint8_t*>(slices[0].begin());
- request_size = slices[0].size();
- } else if (slices.size() > 1) {
- request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
- uint8_t* copy_to = request_bytes;
- for (size_t i = 0; i < slices.size(); i++) {
- memcpy(copy_to, slices[i].begin(), slices[i].size());
- copy_to += slices[i].size();
- }
- }
+ request_bytes = const_cast<uint8_t*>(slice.begin());
+ request_size = slice.size();
upb::Arena arena;
grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_parse(
reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
- if (slices.size() > 1) {
- gpr_free(request_bytes);
- }
if (request_struct == nullptr) {
return false;
}
diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
index 4eb492e073..133647a5ed 100644
--- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
@@ -16,10 +16,9 @@
*
*/
-#include <grpcpp/security/server_credentials.h>
-
#include <grpc/grpc.h>
#include <grpc/support/log.h>
+#include <grpcpp/security/server_credentials.h>
namespace grpc {
namespace {
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
index 561d4f5048..f778b13785 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
@@ -20,6 +20,8 @@
#ifdef GPR_LINUX
+#include <inttypes.h>
+
#include <cstdio>
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
@@ -32,7 +34,8 @@ std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
FILE* fp;
fp = fopen("/proc/stat", "r");
uint64_t user, nice, system, idle;
- if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) {
+ if (fscanf(fp, "cpu %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, &user,
+ &nice, &system, &idle) != 4) {
// Something bad happened with the information, so assume it's all invalid
user = nice = system = idle = 0;
}
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
index 0a98e848a2..c03daddb35 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
@@ -21,6 +21,7 @@
#ifdef GPR_WINDOWS
#include <windows.h>
+
#include <cstdint>
#include "src/cpp/server/load_reporter/get_cpu_stats.h"
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
index 16542bfddf..78e6c32864 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
@@ -18,14 +18,16 @@
#include <grpc/impl/codegen/port_platform.h>
+#include "src/cpp/server/load_reporter/load_data_store.h"
+
#include <stdio.h>
+
#include <cstdlib>
#include <set>
#include <unordered_map>
#include <vector>
#include "src/core/lib/iomgr/socket_utils.h"
-#include "src/cpp/server/load_reporter/load_data_store.h"
namespace grpc {
namespace load_reporter {
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
index 24ad9f3f24..4b4e778a62 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
@@ -18,11 +18,10 @@
#include <grpc/impl/codegen/port_platform.h>
-#include <grpcpp/ext/server_load_reporting.h>
-
#include <cmath>
#include <grpc/support/log.h>
+#include <grpcpp/ext/server_load_reporting.h>
namespace grpc {
namespace load_reporter {
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
index b6ea87f4fe..819a12e294 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
@@ -16,6 +16,8 @@
*
*/
+#include "src/cpp/server/secure_server_credentials.h"
+
#include <functional>
#include <map>
#include <memory>
@@ -25,7 +27,6 @@
#include <grpcpp/security/auth_metadata_processor.h>
#include "src/cpp/common/secure_auth_context.h"
-#include "src/cpp/server/secure_server_credentials.h"
namespace grpc {
@@ -74,7 +75,6 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
grpc_metadata md_entry;
md_entry.key = SliceReferencingString(consumed.first);
md_entry.value = SliceReferencingString(consumed.second);
- md_entry.flags = 0;
consumed_md.push_back(md_entry);
}
std::vector<grpc_metadata> response_md;
@@ -82,7 +82,6 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
grpc_metadata md_entry;
md_entry.key = SliceReferencingString(response.first);
md_entry.value = SliceReferencingString(response.second);
- md_entry.flags = 0;
response_md.push_back(md_entry);
}
auto consumed_md_data = consumed_md.empty() ? nullptr : &consumed_md[0];
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
index f2b65c2864..95b4cab958 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
@@ -21,11 +21,10 @@
#include <memory>
+#include <grpc/grpc_security.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/security/tls_credentials_options.h>
-#include <grpc/grpc_security.h>
-
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc
index 502d4f20ae..abe95fd834 100644
--- a/contrib/libs/grpc/src/cpp/server/server_builder.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc
@@ -16,15 +16,14 @@
*
*/
-#include <grpcpp/server_builder.h>
+#include <utility>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/service_type.h>
#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
-
-#include <utility>
+#include <grpcpp/server_builder.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
@@ -102,38 +101,23 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
return *this;
}
-#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
ServerBuilder& ServerBuilder::RegisterCallbackGenericService(
CallbackGenericService* service) {
if (generic_service_ || callback_generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple generic services is unsupported for now. "
"Dropping the service %p",
- (void*)service);
+ service);
} else {
callback_generic_service_ = service;
}
return *this;
}
-#else
-ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
- experimental::CallbackGenericService* service) {
- if (builder_->generic_service_ || builder_->callback_generic_service_) {
- gpr_log(GPR_ERROR,
- "Adding multiple generic services is unsupported for now. "
- "Dropping the service %p",
- service);
- } else {
- builder_->callback_generic_service_ = service;
- }
- return *builder_;
-}
-#endif
-ServerBuilder& ServerBuilder::experimental_type::SetContextAllocator(
+ServerBuilder& ServerBuilder::SetContextAllocator(
std::unique_ptr<grpc::ContextAllocator> context_allocator) {
- builder_->context_allocator_ = std::move(context_allocator);
- return *builder_;
+ context_allocator_ = std::move(context_allocator);
+ return *this;
}
std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
@@ -149,6 +133,12 @@ ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
return builder_->acceptors_.back()->GetAcceptor();
}
+void ServerBuilder::experimental_type::SetAuthorizationPolicyProvider(
+ std::shared_ptr<experimental::AuthorizationPolicyProviderInterface>
+ provider) {
+ builder_->authorization_provider_ = std::move(provider);
+}
+
ServerBuilder& ServerBuilder::SetOption(
std::unique_ptr<ServerBuilderOption> option) {
options_.push_back(std::move(option));
@@ -177,9 +167,9 @@ ServerBuilder& ServerBuilder::SetSyncServerOption(
ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
grpc_compression_algorithm algorithm, bool enabled) {
if (enabled) {
- GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm);
+ grpc_core::SetBit(&enabled_compression_algorithms_bitset_, algorithm);
} else {
- GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm);
+ grpc_core::ClearBit(&enabled_compression_algorithms_bitset_, algorithm);
}
return *this;
}
@@ -223,8 +213,8 @@ ServerBuilder& ServerBuilder::AddListeningPort(
return *this;
}
-std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
- grpc::ChannelArguments args;
+ChannelArguments ServerBuilder::BuildChannelArgs() {
+ ChannelArguments args;
if (max_receive_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
@@ -245,16 +235,24 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm);
}
-
if (resource_quota_ != nullptr) {
args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
grpc_resource_quota_arg_vtable());
}
-
for (const auto& plugin : plugins_) {
plugin->UpdateServerBuilder(this);
plugin->UpdateChannelArguments(&args);
}
+ if (authorization_provider_ != nullptr) {
+ args.SetPointerWithVtable(GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER,
+ authorization_provider_->c_provider(),
+ grpc_authorization_policy_provider_arg_vtable());
+ }
+ return args;
+}
+
+std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
+ ChannelArguments args = BuildChannelArgs();
// == Determine if the server has any syncrhonous methods ==
bool has_sync_methods = false;
@@ -304,6 +302,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
}
}
+ if (callback_generic_service_ != nullptr) {
+ has_frequently_polled_cqs = true;
+ }
+
const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs;
if (has_sync_methods) {
@@ -375,12 +377,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
-#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
server->RegisterContextAllocator(std::move(context_allocator_));
-#else
- server->experimental_registration()->RegisterContextAllocator(
- std::move(context_allocator_));
-#endif
for (const auto& value : services_) {
if (!server->RegisterService(value->host.get(), value->service)) {
diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc
index f6b72c0fcc..5b2d328b81 100644
--- a/contrib/libs/grpc/src/cpp/server/server_callback.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc
@@ -37,7 +37,7 @@ void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) {
explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) {
GRPC_CLOSURE_INIT(
&closure,
- [](void* void_arg, grpc_error*) {
+ [](void* void_arg, grpc_error_handle) {
ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
arg->call->CallOnDone();
delete arg;
@@ -66,7 +66,7 @@ void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) {
: call(call_arg), reactor(reactor_arg) {
GRPC_CLOSURE_INIT(
&closure,
- [](void* void_arg, grpc_error*) {
+ [](void* void_arg, grpc_error_handle) {
ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
arg->reactor->OnCancel();
arg->call->MaybeDone();
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index dab7f488a7..edb1fe4d1d 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -15,13 +15,13 @@
*
*/
-#include <grpcpp/server.h>
-
#include <cstdlib>
#include <sstream>
#include <type_traits>
#include <utility>
+#include "y_absl/memory/memory.h"
+
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
@@ -39,11 +39,10 @@
#include <grpcpp/impl/server_initializer.h>
#include <grpcpp/impl/service_type.h>
#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/server.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/time.h>
-#include "y_absl/memory/memory.h"
-
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -69,6 +68,15 @@ namespace {
// max-threads set) to the server builder.
#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+// Give a useful status error message if the resource is exhausted specifically
+// because the server threadpool is full.
+const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted";
+
+// Although we might like to give a useful status error message on unimplemented
+// RPCs, it's not always possible since that also would need to be added across
+// languages and isn't actually required by the spec.
+const char* kUnknownRpcMethod = "";
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -107,15 +115,6 @@ class UnimplementedAsyncRequestContext {
GenericServerAsyncReaderWriter generic_stream_;
};
-// TODO(vjpai): Just for this file, use some contents of the experimental
-// namespace here to make the code easier to read below. Remove this when
-// de-experimentalized fully.
-#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
-using ::grpc::experimental::CallbackGenericService;
-using ::grpc::experimental::CallbackServerContext;
-using ::grpc::experimental::GenericCallbackServerContext;
-#endif
-
} // namespace
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
@@ -266,7 +265,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
}
namespace {
-class ShutdownCallback : public grpc_experimental_completion_queue_functor {
+class ShutdownCallback : public grpc_completion_queue_functor {
public:
ShutdownCallback() {
functor_run = &ShutdownCallback::Run;
@@ -282,7 +281,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
- static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ static void Run(grpc_completion_queue_functor* cb, int) {
auto* callback = static_cast<ShutdownCallback*>(cb);
delete callback->cq_;
delete callback;
@@ -410,7 +409,9 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
: server_->resource_exhausted_handler_.get();
deserialized_request_ = handler->Deserialize(call_, request_payload_,
&request_status_, nullptr);
-
+ if (!request_status_.ok()) {
+ gpr_log(GPR_DEBUG, "Failed to deserialize message.");
+ }
request_payload_ = nullptr;
interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
@@ -576,7 +577,7 @@ class Server::CallbackRequest final
// method_name needs to be specialized between named method and generic
const char* method_name() const;
- class CallbackCallTag : public grpc_experimental_completion_queue_functor {
+ class CallbackCallTag : public grpc_completion_queue_functor {
public:
explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
@@ -599,8 +600,7 @@ class Server::CallbackRequest final
Server::CallbackRequest<ServerContextType>* req_;
grpc::internal::Call* call_;
- static void StaticRun(grpc_experimental_completion_queue_functor* cb,
- int ok) {
+ static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
}
void Run(bool ok) {
@@ -650,6 +650,9 @@ class Server::CallbackRequest final
req_->request_ = req_->method_->handler()->Deserialize(
req_->call_, req_->request_payload_, &req_->request_status_,
&req_->handler_data_);
+ if (!(req_->request_status_.ok())) {
+ gpr_log(GPR_DEBUG, "Failed to deserialize message.");
+ }
req_->request_payload_ = nullptr;
req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
@@ -791,8 +794,8 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
- server_->server()->core_server->SetRegisteredMethodAllocator(
- server_cq_->cq(), tag, [this, method] {
+ grpc_core::Server::FromC(server_->server())
+ ->SetRegisteredMethodAllocator(server_cq_->cq(), tag, [this, method] {
grpc_core::Server::RegisteredCallAllocation result;
new SyncRequest(server_, method, &result);
return result;
@@ -804,9 +807,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
if (has_sync_method_) {
unknown_method_ = y_absl::make_unique<grpc::internal::RpcServiceMethod>(
"unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
- new grpc::internal::UnknownMethodHandler);
- server_->server()->core_server->SetBatchMethodAllocator(
- server_cq_->cq(), [this] {
+ new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod));
+ grpc_core::Server::FromC(server_->server())
+ ->SetBatchMethodAllocator(server_cq_->cq(), [this] {
grpc_core::Server::BatchCallAllocation result;
new SyncRequest(server_, unknown_method_.get(), &result);
return result;
@@ -932,14 +935,16 @@ Server::~Server() {
for (const auto& value : sync_req_mgrs_) {
value->Shutdown();
}
- if (callback_cq_ != nullptr) {
+ CompletionQueue* callback_cq =
+ callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
- callback_cq_->Shutdown();
+ callback_cq->Shutdown();
} else {
- CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
}
- callback_cq_ = nullptr;
+ callback_cq_.store(nullptr, std::memory_order_release);
}
}
}
@@ -1028,7 +1033,7 @@ bool Server::RegisterService(const TString* addr, grpc::Service* service) {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
- server_->core_server->SetRegisteredMethodAllocator(
+ grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator(
cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::Server::RegisteredCallAllocation result;
new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
@@ -1069,7 +1074,8 @@ void Server::RegisterCallbackGenericService(
generic_handler_.reset(service->Handler());
grpc::CompletionQueue* cq = CallbackCQ();
- server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] {
+ grpc_core::Server::FromC(server_)->SetBatchMethodAllocator(cq->cq(), [this,
+ cq] {
grpc_core::Server::BatchCallAllocation result;
new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
return result;
@@ -1106,9 +1112,9 @@ void Server::UnrefAndWaitLocked() {
shutdown_done_ = true;
return; // no need to wait on CV since done condition already set
}
- grpc::internal::WaitUntil(
- &shutdown_done_cv_, &mu_,
- [this]() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; });
+ while (!shutdown_done_) {
+ shutdown_done_cv_.Wait(&mu_);
+ }
}
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
@@ -1194,7 +1200,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// to deal with the case of thread exhaustion
if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
resource_exhausted_handler_ =
- y_absl::make_unique<grpc::internal::ResourceExhaustedHandler>();
+ y_absl::make_unique<grpc::internal::ResourceExhaustedHandler>(
+ kServerThreadpoolExhausted);
}
for (const auto& value : sync_req_mgrs_) {
@@ -1258,14 +1265,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
- if (callback_cq_ != nullptr) {
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
- callback_cq_->Shutdown();
+ callback_cq->Shutdown();
} else {
- CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
}
- callback_cq_ = nullptr;
+ callback_cq_.store(nullptr, std::memory_order_release);
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
@@ -1320,8 +1328,9 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
- grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, "");
- grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this);
+ grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod);
+ grpc::internal::UnknownMethodHandler::FillOps(request_->context(),
+ kUnknownRpcMethod, this);
request_->stream()->call_.PerformOps(this);
}
@@ -1332,25 +1341,33 @@ grpc::ServerInitializer* Server::initializer() {
grpc::CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
+ if (callback_cq != nullptr) {
+ return callback_cq;
+ }
+ // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
+ // once for this server.
grpc::internal::MutexLock l(&mu_);
- if (callback_cq_ != nullptr) {
- return callback_cq_;
+ callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
+ return callback_cq;
}
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
auto* shutdown_callback = new grpc::ShutdownCallback;
- callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
+ callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
+ shutdown_callback->TakeCQ(callback_cq);
} else {
// Otherwise we need to use the alternative CQ variant
- callback_cq_ = CompletionQueue::CallbackAlternativeCQ();
+ callback_cq = CompletionQueue::CallbackAlternativeCQ();
}
- return callback_cq_;
+ callback_cq_.store(callback_cq, std::memory_order_release);
+ return callback_cq;
}
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc
index df82c69ed8..c8ace3d914 100644
--- a/contrib/libs/grpc/src/cpp/server/server_context.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_context.cc
@@ -16,9 +16,8 @@
*
*/
-#include <grpcpp/impl/codegen/server_context.h>
-
#include <algorithm>
+#include <atomic>
#include <utility>
#include <grpc/compression.h>
@@ -28,6 +27,7 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/codegen/server_context.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/time.h>
@@ -327,14 +327,16 @@ void ServerContextBase::TryCancel() const {
bool ServerContextBase::IsCancelled() const {
if (completion_tag_) {
// When using callback API, this result is always valid.
- return completion_op_->CheckCancelledAsync();
+ return marked_cancelled_.load(std::memory_order_acquire) ||
+ completion_op_->CheckCancelledAsync();
} else if (has_notify_when_done_tag_) {
// When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
// when using sync API, the result is always valid
- return completion_op_ && completion_op_->CheckCancelled(cq_);
+ return marked_cancelled_.load(std::memory_order_acquire) ||
+ (completion_op_ && completion_op_->CheckCancelled(cq_));
}
}
diff --git a/contrib/libs/grpc/src/cpp/server/server_credentials.cc b/contrib/libs/grpc/src/cpp/server/server_credentials.cc
index 36b5a52dc7..454e8b4e9d 100644
--- a/contrib/libs/grpc/src/cpp/server/server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_credentials.cc
@@ -16,9 +16,8 @@
*
*/
-#include <grpcpp/security/server_credentials.h>
-
#include <grpcpp/impl/grpc_library.h>
+#include <grpcpp/security/server_credentials.h>
namespace grpc {
diff --git a/contrib/libs/grpc/src/cpp/server/server_posix.cc b/contrib/libs/grpc/src/cpp/server/server_posix.cc
index c3d40d4fa2..f2452cc326 100644
--- a/contrib/libs/grpc/src/cpp/server/server_posix.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_posix.cc
@@ -16,9 +16,8 @@
*
*/
-#include <grpcpp/server_posix.h>
-
#include <grpc/grpc_posix.h>
+#include <grpcpp/server_posix.h>
namespace grpc {
diff --git a/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc
index b543f3f172..f184238906 100644
--- a/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc
@@ -19,7 +19,6 @@
#include "src/cpp/server/secure_server_credentials.h"
namespace grpc {
-namespace experimental {
std::shared_ptr<ServerCredentials> XdsServerCredentials(
const std::shared_ptr<ServerCredentials>& fallback_credentials) {
@@ -37,5 +36,12 @@ std::shared_ptr<ServerCredentials> XdsServerCredentials(
fallback_credentials->AsSecureServerCredentials()->c_creds()));
}
+namespace experimental {
+
+std::shared_ptr<ServerCredentials> XdsServerCredentials(
+ const std::shared_ptr<ServerCredentials>& fallback_credentials) {
+ return grpc::XdsServerCredentials(fallback_credentials);
+}
+
} // namespace experimental
} // namespace grpc