diff options
author | dvshkurko <dvshkurko@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
commit | 321ee9bce31ec6e238be26dbcbe539cffa2c3309 (patch) | |
tree | 14407a2757cbf29eb97e266b7f07e851f971000c /contrib/libs/grpc/src/cpp/server | |
parent | 2f6ca198245aeffd5e2d82b65927c2465b68b4f5 (diff) | |
download | ydb-321ee9bce31ec6e238be26dbcbe539cffa2c3309.tar.gz |
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
19 files changed, 873 insertions, 873 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc index 9aad932429..00bbe73d88 100644 --- a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc +++ b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc @@ -31,8 +31,8 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( args->SetString(name_, value_); } virtual void UpdatePlugins( - std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) - override {} + std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) + override {} private: const TString name_; @@ -52,8 +52,8 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( args->SetInt(name_, value_); } virtual void UpdatePlugins( - std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) - override {} + std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) + override {} private: const TString name_; diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc index 6dcf84bf40..e061c62ef4 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc @@ -27,25 +27,25 @@ namespace grpc { namespace { -grpc::protobuf::util::Status ParseJson(const char* json_str, - grpc::protobuf::Message* message) { - grpc::protobuf::json::JsonParseOptions options; +grpc::protobuf::util::Status ParseJson(const char* json_str, + grpc::protobuf::Message* message) { + grpc::protobuf::json::JsonParseOptions options; options.case_insensitive_enum_parsing = true; - return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); -} + return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); +} } // namespace Status ChannelzService::GetTopChannels( - ServerContext* /*unused*/, - const channelz::v1::GetTopChannelsRequest* request, + ServerContext* /*unused*/, + const channelz::v1::GetTopChannelsRequest* request, channelz::v1::GetTopChannelsResponse* response) { char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); if (json_str == nullptr) { return Status(StatusCode::INTERNAL, "grpc_channelz_get_top_channels returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -54,14 +54,14 @@ Status ChannelzService::GetTopChannels( } Status ChannelzService::GetServers( - ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request, + ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request, channelz::v1::GetServersResponse* response) { char* json_str = grpc_channelz_get_servers(request->start_server_id()); if (json_str == nullptr) { return Status(StatusCode::INTERNAL, "grpc_channelz_get_servers returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -69,7 +69,7 @@ Status ChannelzService::GetServers( return Status::OK; } -Status ChannelzService::GetServer(ServerContext* /*unused*/, +Status ChannelzService::GetServer(ServerContext* /*unused*/, const channelz::v1::GetServerRequest* request, channelz::v1::GetServerResponse* response) { char* json_str = grpc_channelz_get_server(request->server_id()); @@ -77,7 +77,7 @@ Status ChannelzService::GetServer(ServerContext* /*unused*/, return Status(StatusCode::INTERNAL, "grpc_channelz_get_server returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -86,8 +86,8 @@ Status ChannelzService::GetServer(ServerContext* /*unused*/, } Status ChannelzService::GetServerSockets( - ServerContext* /*unused*/, - const channelz::v1::GetServerSocketsRequest* request, + ServerContext* /*unused*/, + const channelz::v1::GetServerSocketsRequest* request, channelz::v1::GetServerSocketsResponse* response) { char* json_str = grpc_channelz_get_server_sockets( request->server_id(), request->start_socket_id(), request->max_results()); @@ -95,7 +95,7 @@ Status ChannelzService::GetServerSockets( return Status(StatusCode::INTERNAL, "grpc_channelz_get_server_sockets returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -104,13 +104,13 @@ Status ChannelzService::GetServerSockets( } Status ChannelzService::GetChannel( - ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request, + ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request, channelz::v1::GetChannelResponse* response) { char* json_str = grpc_channelz_get_channel(request->channel_id()); if (json_str == nullptr) { return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -119,15 +119,15 @@ Status ChannelzService::GetChannel( } Status ChannelzService::GetSubchannel( - ServerContext* /*unused*/, - const channelz::v1::GetSubchannelRequest* request, + ServerContext* /*unused*/, + const channelz::v1::GetSubchannelRequest* request, channelz::v1::GetSubchannelResponse* response) { char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); if (json_str == nullptr) { return Status(StatusCode::NOT_FOUND, "No object found for that SubchannelId"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -135,14 +135,14 @@ Status ChannelzService::GetSubchannel( return Status::OK; } -Status ChannelzService::GetSocket(ServerContext* /*unused*/, +Status ChannelzService::GetSocket(ServerContext* /*unused*/, const channelz::v1::GetSocketRequest* request, channelz::v1::GetSocketResponse* response) { char* json_str = grpc_channelz_get_socket(request->socket_id()); if (json_str == nullptr) { return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc index ae26a447ab..fd1ce81424 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc @@ -39,7 +39,7 @@ class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { si->RegisterService(channelz_service_); } - void Finish(grpc::ServerInitializer* /*si*/) override {} + void Finish(grpc::ServerInitializer* /*si*/) override {} void ChangeArguments(const TString& /*name*/, void* /*value*/) override {} @@ -67,13 +67,13 @@ CreateChannelzServicePlugin() { new ChannelzServicePlugin()); } -} // namespace experimental -} // namespace channelz -} // namespace grpc -namespace grpc_impl { -namespace channelz { -namespace experimental { - +} // namespace experimental +} // namespace channelz +} // namespace grpc +namespace grpc_impl { +namespace channelz { +namespace experimental { + void InitChannelzService() { static struct Initializer { Initializer() { @@ -85,4 +85,4 @@ void InitChannelzService() { } // namespace experimental } // namespace channelz -} // namespace grpc_impl +} // namespace grpc_impl diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc index 77c5d6a263..004154730b 100644 --- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc +++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc @@ -19,7 +19,7 @@ #include "src/cpp/server/dynamic_thread_pool.h" #include <grpc/support/log.h> -#include <grpcpp/impl/codegen/sync.h> +#include <grpcpp/impl/codegen/sync.h> #include "src/core/lib/gprpp/thd.h" @@ -39,27 +39,27 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); } void DynamicThreadPool::DynamicThread::ThreadFunc() { pool_->ThreadFunc(); // Now that we have killed ourselves, we should reduce the thread count - grpc_core::MutexLock lock(&pool_->mu_); + grpc_core::MutexLock lock(&pool_->mu_); pool_->nthreads_--; // Move ourselves to dead list pool_->dead_threads_.push_back(this); if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { - pool_->shutdown_cv_.Signal(); + pool_->shutdown_cv_.Signal(); } } void DynamicThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. - grpc_core::ReleasableMutexLock lock(&mu_); + grpc_core::ReleasableMutexLock lock(&mu_); if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { break; } threads_waiting_++; - cv_.Wait(&mu_); + cv_.Wait(&mu_); threads_waiting_--; } // Drain callbacks before considering shutdown to ensure all work @@ -67,7 +67,7 @@ void DynamicThreadPool::ThreadFunc() { if (!callbacks_.empty()) { auto cb = callbacks_.front(); callbacks_.pop(); - lock.Unlock(); + lock.Unlock(); cb(); } else if (shutdown_) { break; @@ -81,7 +81,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) nthreads_(0), threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); nthreads_++; new DynamicThread(this); } @@ -94,17 +94,17 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { } DynamicThreadPool::~DynamicThreadPool() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); shutdown_ = true; - cv_.Broadcast(); + cv_.Broadcast(); while (nthreads_ != 0) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.Wait(&mu_); } ReapThreads(&dead_threads_); } void DynamicThreadPool::Add(const std::function<void()>& callback) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); // Add works to the callbacks list callbacks_.push(callback); // Increase pool size or notify as needed @@ -113,7 +113,7 @@ void DynamicThreadPool::Add(const std::function<void()>& callback) { nthreads_++; new DynamicThread(this); } else { - cv_.Signal(); + cv_.Signal(); } // Also use this chance to harvest dead threads if (!dead_threads_.empty()) { diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h index 6f9f943bc3..ea60265165 100644 --- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h +++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h @@ -25,7 +25,7 @@ #include <grpcpp/support/config.h> -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/cpp/server/thread_pool_interface.h" @@ -49,9 +49,9 @@ class DynamicThreadPool final : public ThreadPoolInterface { grpc_core::Thread thd_; void ThreadFunc(); }; - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - grpc_core::CondVar shutdown_cv_; + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + grpc_core::CondVar shutdown_cv_; bool shutdown_; std::queue<std::function<void()>> callbacks_; int reserve_threads_; diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc index 09d2a9d3b5..68bc20638e 100644 --- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc +++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc @@ -1,96 +1,96 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/cpp/server/external_connection_acceptor_impl.h" - -#include <memory> - +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/cpp/server/external_connection_acceptor_impl.h" + +#include <memory> + #include <grpcpp/server_builder.h> -#include <grpcpp/support/channel_arguments.h> - -namespace grpc { -namespace internal { -namespace { -// The actual type to return to user. It co-owns the internal impl object with -// the server. -class AcceptorWrapper : public experimental::ExternalConnectionAcceptor { - public: - explicit AcceptorWrapper(std::shared_ptr<ExternalConnectionAcceptorImpl> impl) - : impl_(std::move(impl)) {} - void HandleNewConnection(NewConnectionParameters* p) override { - impl_->HandleNewConnection(p); - } - - private: - std::shared_ptr<ExternalConnectionAcceptorImpl> impl_; -}; -} // namespace - -ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl( +#include <grpcpp/support/channel_arguments.h> + +namespace grpc { +namespace internal { +namespace { +// The actual type to return to user. It co-owns the internal impl object with +// the server. +class AcceptorWrapper : public experimental::ExternalConnectionAcceptor { + public: + explicit AcceptorWrapper(std::shared_ptr<ExternalConnectionAcceptorImpl> impl) + : impl_(std::move(impl)) {} + void HandleNewConnection(NewConnectionParameters* p) override { + impl_->HandleNewConnection(p); + } + + private: + std::shared_ptr<ExternalConnectionAcceptorImpl> impl_; +}; +} // namespace + +ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl( const TString& name, - ServerBuilder::experimental_type::ExternalConnectionType type, - std::shared_ptr<ServerCredentials> creds) - : name_(name), creds_(std::move(creds)) { - GPR_ASSERT(type == - ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD); -} - -std::unique_ptr<experimental::ExternalConnectionAcceptor> -ExternalConnectionAcceptorImpl::GetAcceptor() { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!has_acceptor_); - has_acceptor_ = true; - return std::unique_ptr<experimental::ExternalConnectionAcceptor>( - new AcceptorWrapper(shared_from_this())); -} - -void ExternalConnectionAcceptorImpl::HandleNewConnection( - experimental::ExternalConnectionAcceptor::NewConnectionParameters* p) { - grpc_core::MutexLock lock(&mu_); - if (shutdown_ || !started_) { - // TODO(yangg) clean up. - gpr_log( - GPR_ERROR, - "NOT handling external connection with fd %d, started %d, shutdown %d", - p->fd, started_, shutdown_); - return; - } - if (handler_) { - handler_->Handle(p->listener_fd, p->fd, p->read_buffer.c_buffer()); - } -} - -void ExternalConnectionAcceptorImpl::Shutdown() { - grpc_core::MutexLock lock(&mu_); - shutdown_ = true; -} - -void ExternalConnectionAcceptorImpl::Start() { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!started_); - GPR_ASSERT(has_acceptor_); - GPR_ASSERT(!shutdown_); - started_ = true; -} - -void ExternalConnectionAcceptorImpl::SetToChannelArgs(ChannelArguments* args) { - args->SetPointer(name_.c_str(), &handler_); -} - -} // namespace internal -} // namespace grpc + ServerBuilder::experimental_type::ExternalConnectionType type, + std::shared_ptr<ServerCredentials> creds) + : name_(name), creds_(std::move(creds)) { + GPR_ASSERT(type == + ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD); +} + +std::unique_ptr<experimental::ExternalConnectionAcceptor> +ExternalConnectionAcceptorImpl::GetAcceptor() { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(!has_acceptor_); + has_acceptor_ = true; + return std::unique_ptr<experimental::ExternalConnectionAcceptor>( + new AcceptorWrapper(shared_from_this())); +} + +void ExternalConnectionAcceptorImpl::HandleNewConnection( + experimental::ExternalConnectionAcceptor::NewConnectionParameters* p) { + grpc_core::MutexLock lock(&mu_); + if (shutdown_ || !started_) { + // TODO(yangg) clean up. + gpr_log( + GPR_ERROR, + "NOT handling external connection with fd %d, started %d, shutdown %d", + p->fd, started_, shutdown_); + return; + } + if (handler_) { + handler_->Handle(p->listener_fd, p->fd, p->read_buffer.c_buffer()); + } +} + +void ExternalConnectionAcceptorImpl::Shutdown() { + grpc_core::MutexLock lock(&mu_); + shutdown_ = true; +} + +void ExternalConnectionAcceptorImpl::Start() { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(!started_); + GPR_ASSERT(has_acceptor_); + GPR_ASSERT(!shutdown_); + started_ = true; +} + +void ExternalConnectionAcceptorImpl::SetToChannelArgs(ChannelArguments* args) { + args->SetPointer(name_.c_str(), &handler_); +} + +} // namespace internal +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h index 430c72862e..2340c6e248 100644 --- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h +++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h @@ -1,71 +1,71 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ -#define SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ - -#include <memory> - -#include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/security/server_credentials.h> -#include <grpcpp/server_builder.h> -#include <grpcpp/support/channel_arguments.h> - -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/tcp_server.h" - -namespace grpc { -namespace internal { - -class ExternalConnectionAcceptorImpl - : public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> { - public: - ExternalConnectionAcceptorImpl( +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ +#define SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ + +#include <memory> + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpcpp/security/server_credentials.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/support/channel_arguments.h> + +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/tcp_server.h" + +namespace grpc { +namespace internal { + +class ExternalConnectionAcceptorImpl + : public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> { + public: + ExternalConnectionAcceptorImpl( const TString& name, - ServerBuilder::experimental_type::ExternalConnectionType type, - std::shared_ptr<ServerCredentials> creds); - // Should only be called once. - std::unique_ptr<experimental::ExternalConnectionAcceptor> GetAcceptor(); - - void HandleNewConnection( - experimental::ExternalConnectionAcceptor::NewConnectionParameters* p); - - void Shutdown(); - - void Start(); - - const char* name() { return name_.c_str(); } - - ServerCredentials* GetCredentials() { return creds_.get(); } - - void SetToChannelArgs(::grpc::ChannelArguments* args); - - private: + ServerBuilder::experimental_type::ExternalConnectionType type, + std::shared_ptr<ServerCredentials> creds); + // Should only be called once. + std::unique_ptr<experimental::ExternalConnectionAcceptor> GetAcceptor(); + + void HandleNewConnection( + experimental::ExternalConnectionAcceptor::NewConnectionParameters* p); + + void Shutdown(); + + void Start(); + + const char* name() { return name_.c_str(); } + + ServerCredentials* GetCredentials() { return creds_.get(); } + + void SetToChannelArgs(::grpc::ChannelArguments* args); + + private: const TString name_; - std::shared_ptr<ServerCredentials> creds_; - grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned - grpc_core::Mutex mu_; - bool has_acceptor_ = false; - bool started_ = false; - bool shutdown_ = false; -}; - -} // namespace internal -} // namespace grpc - -#endif // SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ + std::shared_ptr<ServerCredentials> creds_; + grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned + grpc_core::Mutex mu_; + bool has_acceptor_ = false; + bool started_ = false; + bool shutdown_ = false; +}; + +} // namespace internal +} // namespace grpc + +#endif // SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index 3cc508d0cb..133a13e028 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -26,11 +26,11 @@ #include <grpcpp/impl/codegen/method_handler.h> #include "src/cpp/server/health/default_health_check_service.h" -#include "src/proto/grpc/health/v1/health.upb.h" +#include "src/proto/grpc/health/v1/health.upb.h" #include "upb/upb.hpp" -#define MAX_SERVICE_NAME_LENGTH 200 - +#define MAX_SERVICE_NAME_LENGTH 200 + namespace grpc { // @@ -43,7 +43,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const TString& service_name, bool serving) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -53,7 +53,7 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -64,7 +64,7 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -78,7 +78,7 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const TString& service_name) const { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) { return NOT_FOUND; @@ -90,7 +90,7 @@ DefaultHealthCheckService::GetServingStatus( void DefaultHealthCheckService::RegisterCallHandler( const TString& service_name, std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; service_data.AddCallHandler(handler /* copies ref */); HealthCheckServiceImpl::CallHandler* h = handler.get(); @@ -100,7 +100,7 @@ void DefaultHealthCheckService::RegisterCallHandler( void DefaultHealthCheckService::UnregisterCallHandler( const TString& service_name, const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; @@ -168,7 +168,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { // We will reach here after the server starts shutting down. shutdown_ = true; { - grpc_core::MutexLock lock(&cq_shutdown_mu_); + grpc_core::MutexLock lock(&cq_shutdown_mu_); cq_->Shutdown(); } thread_->Join(); @@ -216,43 +216,43 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( copy_to += slices[i].size(); } } - upb::Arena arena; - grpc_health_v1_HealthCheckRequest* request_struct = - grpc_health_v1_HealthCheckRequest_parse( - reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); + upb::Arena arena; + grpc_health_v1_HealthCheckRequest* request_struct = + grpc_health_v1_HealthCheckRequest_parse( + reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); if (slices.size() > 1) { gpr_free(request_bytes); } - if (request_struct == nullptr) { - return false; - } - upb_strview service = - grpc_health_v1_HealthCheckRequest_service(request_struct); - if (service.size > MAX_SERVICE_NAME_LENGTH) { - return false; - } - service_name->assign(service.data, service.size); + if (request_struct == nullptr) { + return false; + } + upb_strview service = + grpc_health_v1_HealthCheckRequest_service(request_struct); + if (service.size > MAX_SERVICE_NAME_LENGTH) { + return false; + } + service_name->assign(service.data, service.size); return true; } bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( ServingStatus status, ByteBuffer* response) { - upb::Arena arena; - grpc_health_v1_HealthCheckResponse* response_struct = - grpc_health_v1_HealthCheckResponse_new(arena.ptr()); - grpc_health_v1_HealthCheckResponse_set_status( - response_struct, + upb::Arena arena; + grpc_health_v1_HealthCheckResponse* response_struct = + grpc_health_v1_HealthCheckResponse_new(arena.ptr()); + grpc_health_v1_HealthCheckResponse_set_status( + response_struct, status == NOT_FOUND - ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN - : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING - : grpc_health_v1_HealthCheckResponse_NOT_SERVING); - size_t buf_length; - char* buf = grpc_health_v1_HealthCheckResponse_serialize( - response_struct, arena.ptr(), &buf_length); - if (buf == nullptr) { - return false; - } - grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length); + ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN + : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING + : grpc_health_v1_HealthCheckResponse_NOT_SERVING); + size_t buf_length; + char* buf = grpc_health_v1_HealthCheckResponse_serialize( + response_struct, arena.ptr(), &buf_length); + if (buf == nullptr) { + return false; + } + grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length); Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); @@ -271,7 +271,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: std::make_shared<CheckCallHandler>(cq, database, service); CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; // Request a Check() call. handler->next_ = @@ -316,7 +316,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: } // Send response. { - grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); if (!service_->shutdown_) { next_ = CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, @@ -352,7 +352,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: std::make_shared<WatchCallHandler>(cq, database, service); WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; // Request AsyncNotifyWhenDone(). handler->on_done_notified_ = @@ -407,7 +407,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { - grpc_core::MutexLock lock(&send_mu_); + grpc_core::MutexLock lock(&send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. if (send_in_flight_) { @@ -425,7 +425,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: ByteBuffer response; bool success = service_->EncodeResponse(status, &response); // Grab shutdown lock and send response. - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { SendFinishLocked(std::move(self), Status::CANCELLED); return; @@ -447,7 +447,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::move(self), Status::CANCELLED); return; } - grpc_core::MutexLock lock(&send_mu_); + grpc_core::MutexLock lock(&send_mu_); send_in_flight_ = false; // If we got a new status since we started the last send, start a // new send for it. @@ -461,7 +461,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { if (finish_called_) return; - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) return; SendFinishLocked(std::move(self), status); } diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h index 9da1dfc15f..ad6a583294 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h @@ -27,11 +27,11 @@ #include <grpcpp/health_check_service_interface.h> #include <grpcpp/impl/codegen/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> -#include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" namespace grpc { @@ -119,8 +119,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { HealthCheckServiceImpl* service); // Not used for Check. - void SendHealth(std::shared_ptr<CallHandler> /*self*/, - ServingStatus /*status*/) override {} + void SendHealth(std::shared_ptr<CallHandler> /*self*/, + ServingStatus /*status*/) override {} private: // Called when we receive a call. @@ -198,7 +198,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { GenericServerAsyncWriter stream_; ServerContext ctx_; - grpc_core::Mutex send_mu_; + grpc_core::Mutex send_mu_; bool send_in_flight_ = false; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. @@ -227,7 +227,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // To synchronize the operations related to shutdown state of cq_, so that // we don't enqueue new tags into cq_ after it is already shut down. - grpc_core::Mutex cq_shutdown_mu_; + grpc_core::Mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; }; @@ -273,7 +273,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { const TString& service_name, const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); - mutable grpc_core::Mutex mu_; + mutable grpc_core::Mutex mu_; bool shutdown_ = false; // Guarded by mu_. std::map<TString, ServiceData> services_map_; // Guarded by mu_. std::unique_ptr<HealthCheckServiceImpl> impl_; diff --git a/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc b/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc index 3fa384ace9..1d47d67a32 100644 --- a/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc +++ b/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc @@ -30,6 +30,6 @@ void HealthCheckServiceServerBuilderOption::UpdateArguments( } void HealthCheckServiceServerBuilderOption::UpdatePlugins( - std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) {} + std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) {} } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc index 3f33f4e045..1564bf6f4f 100644 --- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc @@ -29,7 +29,7 @@ class InsecureServerCredentialsImpl final : public ServerCredentials { return grpc_server_add_insecure_http2_port(server, addr.c_str()); } void SetAuthMetadataProcessor( - const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override { + const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override { (void)processor; GPR_ASSERT(0); // Should not be called on InsecureServerCredentials. } diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc index 561d4f5048..d5f7f6973b 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc @@ -32,10 +32,10 @@ std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { FILE* fp; fp = fopen("/proc/stat", "r"); uint64_t user, nice, system, idle; - if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { - // Something bad happened with the information, so assume it's all invalid - user = nice = system = idle = 0; - } + if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { + // Something bad happened with the information, so assume it's all invalid + user = nice = system = idle = 0; + } fclose(fp); busy = user + nice + system; total = busy + idle; diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index 732602bcb7..580e00b99b 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -70,18 +70,18 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( &response_metadata); std::vector<grpc_metadata> consumed_md; - for (const auto& consumed : consumed_metadata) { + for (const auto& consumed : consumed_metadata) { grpc_metadata md_entry; - md_entry.key = SliceReferencingString(consumed.first); - md_entry.value = SliceReferencingString(consumed.second); + md_entry.key = SliceReferencingString(consumed.first); + md_entry.value = SliceReferencingString(consumed.second); md_entry.flags = 0; consumed_md.push_back(md_entry); } std::vector<grpc_metadata> response_md; - for (const auto& response : response_metadata) { + for (const auto& response : response_metadata) { grpc_metadata md_entry; - md_entry.key = SliceReferencingString(response.first); - md_entry.value = SliceReferencingString(response.second); + md_entry.key = SliceReferencingString(response.first); + md_entry.value = SliceReferencingString(response.second); md_entry.flags = 0; response_md.push_back(md_entry); } @@ -98,19 +98,19 @@ int SecureServerCredentials::AddPortToServer(const TString& addr, } void SecureServerCredentials::SetAuthMetadataProcessor( - const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) { - auto* wrapper = new grpc::AuthMetadataProcessorAyncWrapper(processor); + const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) { + auto* wrapper = new grpc::AuthMetadataProcessorAyncWrapper(processor); grpc_server_credentials_set_auth_metadata_processor( - creds_, {grpc::AuthMetadataProcessorAyncWrapper::Process, - grpc::AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); + creds_, {grpc::AuthMetadataProcessorAyncWrapper::Process, + grpc::AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); } std::shared_ptr<ServerCredentials> SslServerCredentials( - const grpc::SslServerCredentialsOptions& options) { + const grpc::SslServerCredentialsOptions& options) { std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs; - for (const auto& key_cert_pair : options.pem_key_cert_pairs) { - grpc_ssl_pem_key_cert_pair p = {key_cert_pair.private_key.c_str(), - key_cert_pair.cert_chain.c_str()}; + for (const auto& key_cert_pair : options.pem_key_cert_pairs) { + grpc_ssl_pem_key_cert_pair p = {key_cert_pair.private_key.c_str(), + key_cert_pair.cert_chain.c_str()}; pem_key_cert_pairs.push_back(p); } grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create_ex( @@ -128,7 +128,7 @@ std::shared_ptr<ServerCredentials> SslServerCredentials( namespace experimental { std::shared_ptr<ServerCredentials> AltsServerCredentials( - const AltsServerCredentialsOptions& /* options */) { + const AltsServerCredentialsOptions& /* options */) { grpc_alts_credentials_options* c_options = grpc_alts_credentials_server_options_create(); grpc_server_credentials* c_creds = @@ -144,12 +144,12 @@ std::shared_ptr<ServerCredentials> LocalServerCredentials( new SecureServerCredentials(grpc_local_server_credentials_create(type))); } -std::shared_ptr<ServerCredentials> TlsServerCredentials( +std::shared_ptr<ServerCredentials> TlsServerCredentials( const grpc::experimental::TlsCredentialsOptions& options) { grpc::GrpcLibraryCodegen init; return std::shared_ptr<ServerCredentials>(new SecureServerCredentials( grpc_tls_server_credentials_create(options.c_credentials_options()))); -} - +} + } // namespace experimental } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h index 9e3fb3f9eb..945bb445cc 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h @@ -22,16 +22,16 @@ #include <memory> #include <grpcpp/security/server_credentials.h> -#include <grpcpp/security/tls_credentials_options.h> +#include <grpcpp/security/tls_credentials_options.h> #include <grpc/grpc_security.h> #include "src/cpp/server/thread_pool_interface.h" namespace grpc { - -class SecureServerCredentials; - + +class SecureServerCredentials; + class AuthMetadataProcessorAyncWrapper final { public: static void Destroy(void* wrapper); @@ -42,11 +42,11 @@ class AuthMetadataProcessorAyncWrapper final { AuthMetadataProcessorAyncWrapper( const std::shared_ptr<AuthMetadataProcessor>& processor) - : processor_(processor) { - if (processor && processor->IsBlocking()) { - thread_pool_.reset(CreateDefaultThreadPool()); - } - } + : processor_(processor) { + if (processor && processor->IsBlocking()) { + thread_pool_.reset(CreateDefaultThreadPool()); + } + } private: void InvokeProcessor(grpc_auth_context* context, const grpc_metadata* md, @@ -67,11 +67,11 @@ class SecureServerCredentials final : public ServerCredentials { int AddPortToServer(const TString& addr, grpc_server* server) override; void SetAuthMetadataProcessor( - const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override; + const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override; private: grpc_server_credentials* creds_; - std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; + std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; }; } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 0cc00b365f..92324cb5e6 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -27,9 +27,9 @@ #include <utility> #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" -#include "src/cpp/server/external_connection_acceptor_impl.h" +#include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/thread_pool_interface.h" namespace grpc { @@ -47,10 +47,10 @@ ServerBuilder::ServerBuilder() : max_receive_message_size_(INT_MIN), max_send_message_size_(INT_MIN), sync_server_settings_(SyncServerSettings()), - resource_quota_(nullptr) { + resource_quota_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); - for (const auto& value : *g_plugin_factory_list) { - plugins_.emplace_back(value()); + for (const auto& value : *g_plugin_factory_list) { + plugins_.emplace_back(value()); } // all compression algorithms enabled by default. @@ -91,9 +91,9 @@ ServerBuilder& ServerBuilder::RegisterService(const TString& addr, ServerBuilder& ServerBuilder::RegisterAsyncGenericService( AsyncGenericService* service) { - if (generic_service_ || callback_generic_service_) { + if (generic_service_ || callback_generic_service_) { gpr_log(GPR_ERROR, - "Adding multiple generic services is unsupported for now. " + "Adding multiple generic services is unsupported for now. " "Dropping the service %p", (void*)service); } else { @@ -116,33 +116,33 @@ ServerBuilder& ServerBuilder::RegisterCallbackGenericService( return *this; } #else -ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( +ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( experimental::CallbackGenericService* service) { - if (builder_->generic_service_ || builder_->callback_generic_service_) { - gpr_log(GPR_ERROR, - "Adding multiple generic services is unsupported for now. " - "Dropping the service %p", - (void*)service); - } else { - builder_->callback_generic_service_ = service; - } - return *builder_; -} + if (builder_->generic_service_ || builder_->callback_generic_service_) { + gpr_log(GPR_ERROR, + "Adding multiple generic services is unsupported for now. " + "Dropping the service %p", + (void*)service); + } else { + builder_->callback_generic_service_ = service; + } + return *builder_; +} #endif - -std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> -ServerBuilder::experimental_type::AddExternalConnectionAcceptor( - experimental_type::ExternalConnectionType type, - std::shared_ptr<ServerCredentials> creds) { + +std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> +ServerBuilder::experimental_type::AddExternalConnectionAcceptor( + experimental_type::ExternalConnectionType type, + std::shared_ptr<ServerCredentials> creds) { TString name_prefix("external:"); - char count_str[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str); - builder_->acceptors_.emplace_back( - std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>( - name_prefix.append(count_str), type, creds)); - return builder_->acceptors_.back()->GetAcceptor(); -} - + char count_str[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str); + builder_->acceptors_.emplace_back( + std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>( + name_prefix.append(count_str), type, creds)); + return builder_->acceptors_.back()->GetAcceptor(); +} + ServerBuilder& ServerBuilder::SetOption( std::unique_ptr<ServerBuilderOption> option) { options_.push_back(std::move(option)); @@ -217,8 +217,8 @@ ServerBuilder& ServerBuilder::AddListeningPort( return *this; } -std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { - grpc::ChannelArguments args; +std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { + grpc::ChannelArguments args; if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } @@ -252,16 +252,16 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // == Determine if the server has any syncrhonous methods == bool has_sync_methods = false; - for (const auto& value : services_) { - if (value->service->has_synchronous_methods()) { + for (const auto& value : services_) { + if (value->service->has_synchronous_methods()) { has_sync_methods = true; break; } } if (!has_sync_methods) { - for (const auto& value : plugins_) { - if (value->has_sync_methods()) { + for (const auto& value : plugins_) { + if (value->has_sync_methods()) { has_sync_methods = true; break; } @@ -280,26 +280,26 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { std::make_shared< std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>()); - bool has_frequently_polled_cqs = false; - for (const auto& cq : cqs_) { - if (cq->IsFrequentlyPolled()) { - has_frequently_polled_cqs = true; - break; - } - } - - // == Determine if the server has any callback methods == - bool has_callback_methods = false; - for (const auto& service : services_) { - if (service->service->has_callback_methods()) { - has_callback_methods = true; - has_frequently_polled_cqs = true; - break; + bool has_frequently_polled_cqs = false; + for (const auto& cq : cqs_) { + if (cq->IsFrequentlyPolled()) { + has_frequently_polled_cqs = true; + break; } } - const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs; + // == Determine if the server has any callback methods == + bool has_callback_methods = false; + for (const auto& service : services_) { + if (service->service->has_callback_methods()) { + has_callback_methods = true; + has_frequently_polled_cqs = true; + break; + } + } + const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs; + if (has_sync_methods) { grpc_cq_polling_type polling_type = is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; @@ -328,7 +328,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { gpr_log(GPR_INFO, "Callback server."); } - std::unique_ptr<grpc::Server> server(new grpc::Server( + std::unique_ptr<grpc::Server> server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, std::move(acceptors_), resource_quota_, @@ -343,10 +343,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { for (const auto& cq : *sync_server_cqs) { grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); - has_frequently_polled_cqs = true; + has_frequently_polled_cqs = true; } - if (has_callback_methods || callback_generic_service_ != nullptr) { + if (has_callback_methods || callback_generic_service_ != nullptr) { auto* cq = server->CallbackCQ(); grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); } @@ -363,29 +363,29 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { cq->RegisterServer(server.get()); } - if (!has_frequently_polled_cqs) { + if (!has_frequently_polled_cqs) { gpr_log(GPR_ERROR, "At least one of the completion queues must be frequently polled"); return nullptr; } - for (const auto& value : services_) { - if (!server->RegisterService(value->host.get(), value->service)) { + for (const auto& value : services_) { + if (!server->RegisterService(value->host.get(), value->service)) { return nullptr; } } - for (const auto& value : plugins_) { - value->InitServer(initializer); + for (const auto& value : plugins_) { + value->InitServer(initializer); } if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); - } else if (callback_generic_service_) { - server->RegisterCallbackGenericService(callback_generic_service_); + } else if (callback_generic_service_) { + server->RegisterCallbackGenericService(callback_generic_service_); } else { - for (const auto& value : services_) { - if (value->service->has_generic_methods()) { + for (const auto& value : services_) { + if (value->service->has_generic_methods()) { gpr_log(GPR_ERROR, "Some methods were marked generic but there is no " "generic service registered."); @@ -394,22 +394,22 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } } - for (auto& port : ports_) { - int r = server->AddListeningPort(port.addr, port.creds.get()); + for (auto& port : ports_) { + int r = server->AddListeningPort(port.addr, port.creds.get()); if (!r) { server->Shutdown(); return nullptr; } - if (port.selected_port != nullptr) { - *port.selected_port = r; + if (port.selected_port != nullptr) { + *port.selected_port = r; } } auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; server->Start(cqs_data, cqs_.size()); - for (const auto& value : plugins_) { - value->Finish(initializer); + for (const auto& value : plugins_) { + value->Finish(initializer); } return server; diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc index 40aef8e735..04a776fac1 100644 --- a/contrib/libs/grpc/src/cpp/server/server_callback.cc +++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc @@ -1,29 +1,29 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + #include <grpcpp/impl/codegen/server_callback.h> - -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/executor.h" - + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" + namespace grpc { -namespace internal { - +namespace internal { + void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { if (inline_ondone) { CallOnDone(); @@ -50,18 +50,18 @@ void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { } } -void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { - if (reactor->InternalInlineable()) { - reactor->OnCancel(); - } else { +void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { + if (reactor->InternalInlineable()) { + reactor->OnCancel(); + } else { // Ref to make sure that the closure executes before the whole call gets // destructed, and Unref within the closure. - Ref(); - grpc_core::ExecCtx exec_ctx; + Ref(); + grpc_core::ExecCtx exec_ctx; struct ClosureWithArg { grpc_closure closure; - ServerCallbackCall* call; - ServerReactor* reactor; + ServerCallbackCall* call; + ServerReactor* reactor; ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg) : call(call_arg), reactor(reactor_arg) { GRPC_CLOSURE_INIT(&closure, @@ -74,11 +74,11 @@ void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { }, this, grpc_schedule_on_exec_ctx); } - }; + }; ClosureWithArg* arg = new ClosureWithArg(this, reactor); grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); - } -} - -} // namespace internal + } +} + +} // namespace internal } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index c2a911c7f7..5cf50d9266 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -19,7 +19,7 @@ #include <cstdlib> #include <sstream> -#include <type_traits> +#include <type_traits> #include <utility> #include <grpc/grpc.h> @@ -29,10 +29,10 @@ #include <grpcpp/completion_queue.h> #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> -#include <grpcpp/impl/codegen/byte_buffer.h> +#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> -#include <grpcpp/impl/codegen/method_handler.h> +#include <grpcpp/impl/codegen/method_handler.h> #include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_service_method.h> @@ -49,7 +49,7 @@ #include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" -#include "src/cpp/server/external_connection_acceptor_impl.h" +#include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -68,8 +68,8 @@ namespace { class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} - void PreSynchronousRequest(ServerContext* /*context*/) override {} - void PostSynchronousRequest(ServerContext* /*context*/) override {} + void PreSynchronousRequest(ServerContext* /*context*/) override {} + void PostSynchronousRequest(ServerContext* /*context*/) override {} }; std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; @@ -83,12 +83,12 @@ void InitGlobalCallbacks() { class ShutdownTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } }; class DummyTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } }; class UnimplementedAsyncRequestContext { @@ -110,187 +110,187 @@ using ::grpc::experimental::GenericCallbackServerContext; } // namespace -ServerInterface::BaseAsyncRequest::BaseAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : server_(server), - context_(context), - stream_(stream), - call_cq_(call_cq), - notification_cq_(notification_cq), - tag_(tag), - delete_on_finalize_(delete_on_finalize), - call_(nullptr), - done_intercepting_(false) { - /* Set up interception state partially for the receive ops. call_wrapper_ is - * not filled at this point, but it will be filled before the interceptors are - * run. */ - interceptor_methods_.SetCall(&call_wrapper_); - interceptor_methods_.SetReverse(); - call_cq_->RegisterAvalanching(); // This op will trigger more ops -} - -ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { - call_cq_->CompleteAvalanching(); -} - -bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, - bool* status) { - if (done_intercepting_) { - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; - } - context_->set_call(call_); - context_->cq_ = call_cq_; - if (call_wrapper_.call() == nullptr) { - // Fill it since it is empty. - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); - } - - // just the pointers inside call are copied here - stream_->BindCall(&call_wrapper_); - - if (*status && call_ && call_wrapper_.server_rpc_info()) { - done_intercepting_ = true; - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueFinalizeResultAfterInterception(); })) { - // There are no interceptors to run. Continue - } else { - // There were interceptors to be run, so - // ContinueFinalizeResultAfterInterception will be run when interceptors - // are done. - return false; - } - } - if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - } - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; -} - -void ServerInterface::BaseAsyncRequest:: - ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - // Queue a tag which will be returned immediately - grpc_core::ExecCtx exec_ctx; - grpc_cq_begin_op(notification_cq_->cq(), this); - grpc_cq_end_op( - notification_cq_->cq(), this, GRPC_ERROR_NONE, - [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, - nullptr, new grpc_cq_completion()); -} - -ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, const char* name, - internal::RpcMethod::RpcType type) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - true), - name_(name), - type_(type) {} - -void ServerInterface::RegisteredAsyncRequest::IssueRequest( - void* registered_method, grpc_byte_buffer** payload, - ServerCompletionQueue* notification_cq) { - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_server_request_registered_call( - server_->server(), registered_method, &call_, - &context_->deadline_, context_->client_metadata_.arr(), - payload, call_cq_->cq(), notification_cq->cq(), - this) == GRPC_CALL_OK); -} - -ServerInterface::GenericAsyncRequest::GenericAsyncRequest( - ServerInterface* server, GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - delete_on_finalize) { - grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, - context->client_metadata_.arr(), - call_cq->cq(), notification_cq->cq(), - this) == GRPC_CALL_OK); -} - -bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, - bool* status) { - // If we are done intercepting, there is nothing more for us to do - if (done_intercepting_) { - return BaseAsyncRequest::FinalizeResult(tag, status); - } - // TODO(yangg) remove the copy here. - if (*status) { - static_cast<GenericServerContext*>(context_)->method_ = - StringFromCopiedSlice(call_details_.method); - static_cast<GenericServerContext*>(context_)->host_ = - StringFromCopiedSlice(call_details_.host); - context_->deadline_ = call_details_.deadline; - } - grpc_slice_unref(call_details_.method); - grpc_slice_unref(call_details_.host); - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info( - static_cast<GenericServerContext*>(context_)->method_.c_str(), - internal::RpcMethod::BIDI_STREAMING, - *server_->interceptor_creators())); - return BaseAsyncRequest::FinalizeResult(tag, status); -} - -namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { - public: - ShutdownCallback() { - functor_run = &ShutdownCallback::Run; - // Set inlineable to true since this callback is trivial and thus does not - // need to be run from the executor (triggering a thread hop). This should - // only be used by internal callbacks like this and not by user application - // code. - inlineable = true; - } - // TakeCQ takes ownership of the cq into the shutdown callback - // so that the shutdown callback will be responsible for destroying it - void TakeCQ(CompletionQueue* cq) { cq_ = cq; } - - // The Run function will get invoked by the completion queue library - // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { - auto* callback = static_cast<ShutdownCallback*>(cb); - delete callback->cq_; - delete callback; - } - - private: - CompletionQueue* cq_ = nullptr; -}; -} // namespace - +ServerInterface::BaseAsyncRequest::BaseAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + notification_cq_(notification_cq), + tag_(tag), + delete_on_finalize_(delete_on_finalize), + call_(nullptr), + done_intercepting_(false) { + /* Set up interception state partially for the receive ops. call_wrapper_ is + * not filled at this point, but it will be filled before the interceptors are + * run. */ + interceptor_methods_.SetCall(&call_wrapper_); + interceptor_methods_.SetReverse(); + call_cq_->RegisterAvalanching(); // This op will trigger more ops +} + +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (done_intercepting_) { + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; + } + context_->set_call(call_); + context_->cq_ = call_cq_; + if (call_wrapper_.call() == nullptr) { + // Fill it since it is empty. + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); + } + + // just the pointers inside call are copied here + stream_->BindCall(&call_wrapper_); + + if (*status && call_ && call_wrapper_.server_rpc_info()) { + done_intercepting_ = true; + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueFinalizeResultAfterInterception(); })) { + // There are no interceptors to run. Continue + } else { + // There were interceptors to be run, so + // ContinueFinalizeResultAfterInterception will be run when interceptors + // are done. + return false; + } + } + if (*status && call_) { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + } + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; +} + +void ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception() { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + // Queue a tag which will be returned immediately + grpc_core::ExecCtx exec_ctx; + grpc_cq_begin_op(notification_cq_->cq(), this); + grpc_cq_end_op( + notification_cq_->cq(), this, GRPC_ERROR_NONE, + [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, + nullptr, new grpc_cq_completion()); +} + +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, const char* name, + internal::RpcMethod::RpcType type) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + true), + name_(name), + type_(type) {} + +void ServerInterface::RegisteredAsyncRequest::IssueRequest( + void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq) { + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_server_request_registered_call( + server_->server(), registered_method, &call_, + &context_->deadline_, context_->client_metadata_.arr(), + payload, call_cq_->cq(), notification_cq->cq(), + this) == GRPC_CALL_OK); +} + +ServerInterface::GenericAsyncRequest::GenericAsyncRequest( + ServerInterface* server, GenericServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + delete_on_finalize) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, + context->client_metadata_.arr(), + call_cq->cq(), notification_cq->cq(), + this) == GRPC_CALL_OK); +} + +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, + bool* status) { + // If we are done intercepting, there is nothing more for us to do + if (done_intercepting_) { + return BaseAsyncRequest::FinalizeResult(tag, status); + } + // TODO(yangg) remove the copy here. + if (*status) { + static_cast<GenericServerContext*>(context_)->method_ = + StringFromCopiedSlice(call_details_.method); + static_cast<GenericServerContext*>(context_)->host_ = + StringFromCopiedSlice(call_details_.host); + context_->deadline_ = call_details_.deadline; + } + grpc_slice_unref(call_details_.method); + grpc_slice_unref(call_details_.host); + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), + context_->set_server_rpc_info( + static_cast<GenericServerContext*>(context_)->method_.c_str(), + internal::RpcMethod::BIDI_STREAMING, + *server_->interceptor_creators())); + return BaseAsyncRequest::FinalizeResult(tag, status); +} + +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { + functor_run = &ShutdownCallback::Run; + // Set inlineable to true since this callback is trivial and thus does not + // need to be run from the executor (triggering a thread hop). This should + // only be used by internal callbacks like this and not by user application + // code. + inlineable = true; + } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + /// Use private inheritance rather than composition only to establish order /// of construction, since the public base class should be constructed after the /// elements belonging to the private base class are constructed. This is not /// possible using true composition. class Server::UnimplementedAsyncRequest final - : private grpc::UnimplementedAsyncRequestContext, + : private grpc::UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: UnimplementedAsyncRequest(ServerInterface* server, @@ -300,25 +300,25 @@ class Server::UnimplementedAsyncRequest final bool FinalizeResult(void** tag, bool* status) override; - grpc::ServerContext* context() { return &server_context_; } - grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + grpc::ServerContext* context() { return &server_context_; } + grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } }; /// UnimplementedAsyncResponse should not post user-visible completions to the /// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus> { + : public grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - if (grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, - status)) { + if (grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, + status)) { delete this; } else { // The tag was swallowed due to interception. We will see it again. @@ -330,16 +330,16 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { +class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: - SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) + SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), method_tag_(method_tag), in_flight_(false), - has_request_payload_(method->method_type() == - grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -395,7 +395,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - bool FinalizeResult(void** /*tag*/, bool* status) override { + bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); cq_ = nullptr; @@ -450,8 +450,8 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); if (has_request_payload_) { @@ -459,11 +459,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_, nullptr); + &request_status_, nullptr); request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); interceptor_methods_.SetRecvMessage(request_, nullptr); } @@ -482,40 +482,40 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr, nullptr)); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_, request_status_, nullptr, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); cq_.Shutdown(); - grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ - grpc::DummyTag ignored_tag; + grpc::DummyTag ignored_tag; GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); } delete this; } private: - grpc::CompletionQueue cq_; - grpc::ServerContext ctx_; + grpc::CompletionQueue cq_; + grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; - grpc::Status request_status_; - grpc::internal::RpcServiceMethod* const method_; - grpc::internal::Call call_; + grpc::Status request_status_; + grpc::internal::RpcServiceMethod* const method_; + grpc::internal::Call call_; Server* server_; std::shared_ptr<GlobalCallbacks> global_callbacks_; bool resources_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: - grpc::internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; void* const method_tag_; bool in_flight_; const bool has_request_payload_; @@ -527,14 +527,14 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { grpc_completion_queue* cq_; }; -template <class ServerContextType> +template <class ServerContextType> class Server::CallbackRequest final : public grpc::internal::CompletionQueueTag { - public: + public: static_assert( std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, "ServerContextType must be derived from CallbackServerContext"); - + // For codegen services, the value of method represents the defined // characteristics of the method being requested. For generic services, method // is nullptr since these services don't have pre-defined methods. @@ -578,26 +578,26 @@ class Server::CallbackRequest final server_->UnrefWithPossibleNotify(); } - // Needs specialization to account for different processing of metadata - // in generic API - bool FinalizeResult(void** tag, bool* status) override; + // Needs specialization to account for different processing of metadata + // in generic API + bool FinalizeResult(void** tag, bool* status) override; private: - // method_name needs to be specialized between named method and generic - const char* method_name() const; - + // method_name needs to be specialized between named method and generic + const char* method_name() const; + class CallbackCallTag : public grpc_experimental_completion_queue_functor { public: - CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) - : req_(req) { + CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) + : req_(req) { functor_run = &CallbackCallTag::StaticRun; - // Set inlineable to true since this callback is internally-controlled - // without taking any locks, and thus does not need to be run from the - // executor (which triggers a thread hop). This should only be used by - // internal callbacks like this and not by user application code. The work - // here is actually non-trivial, but there is no chance of having user - // locks conflict with each other so it's ok to run inlined. - inlineable = true; + // Set inlineable to true since this callback is internally-controlled + // without taking any locks, and thus does not need to be run from the + // executor (which triggers a thread hop). This should only be used by + // internal callbacks like this and not by user application code. The work + // here is actually non-trivial, but there is no chance of having user + // locks conflict with each other so it's ok to run inlined. + inlineable = true; } // force_run can not be performed on a tag if operations using this tag @@ -606,8 +606,8 @@ class Server::CallbackRequest final void force_run(bool ok) { Run(ok); } private: - Server::CallbackRequest<ServerContextType>* req_; - grpc::internal::Call* call_; + Server::CallbackRequest<ServerContextType>* req_; + grpc::internal::Call* call_; static void StaticRun(grpc_experimental_completion_queue_functor* cb, int ok) { @@ -634,35 +634,35 @@ class Server::CallbackRequest final req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call - call_ = - new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) - grpc::internal::Call( - req_->call_, req_->server_, req_->cq_, - req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( - req_->method_name(), - (req_->method_ != nullptr) - ? req_->method_->method_type() - : grpc::internal::RpcMethod::BIDI_STREAMING, - req_->server_->interceptor_creators_)); + call_ = + new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) + grpc::internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_name(), + (req_->method_ != nullptr) + ? req_->method_->method_type() + : grpc::internal::RpcMethod::BIDI_STREAMING, + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( &req_->ctx_.client_metadata_); if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE req_->request_ = req_->method_->handler()->Deserialize( - req_->call_, req_->request_payload_, &req_->request_status_, - &req_->handler_data_); + req_->call_, req_->request_payload_, &req_->request_status_, + &req_->handler_data_); req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); } @@ -675,11 +675,11 @@ class Server::CallbackRequest final } } void ContinueRunAfterInterception() { - auto* handler = (req_->method_ != nullptr) - ? req_->method_->handler() - : req_->server_->generic_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, + auto* handler = (req_->method_ != nullptr) + ? req_->method_->handler() + : req_->server_->generic_handler_.get(); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); } }; @@ -694,61 +694,61 @@ class Server::CallbackRequest final } Server* const server_; - grpc::internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; const bool has_request_payload_; grpc_byte_buffer* request_payload_ = nullptr; void* request_ = nullptr; void* handler_data_ = nullptr; - grpc::Status request_status_; + grpc::Status request_status_; grpc_call_details* const call_details_ = nullptr; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; CallbackCallTag tag_; - ServerContextType ctx_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + ServerContextType ctx_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; -template <> +template <> bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( void** /*tag*/, bool* /*status*/) { - return false; -} - -template <> + return false; +} + +template <> bool Server::CallbackRequest< grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/, bool* status) { - if (*status) { + if (*status) { deadline_ = call_details_->deadline; - // TODO(yangg) remove the copy here - ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); - ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); - } - grpc_slice_unref(call_details_->method); - grpc_slice_unref(call_details_->host); - return false; -} - -template <> + // TODO(yangg) remove the copy here + ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); + ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); + } + grpc_slice_unref(call_details_->method); + grpc_slice_unref(call_details_->host); + return false; +} + +template <> const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() const { - return method_->name(); -} - -template <> -const char* Server::CallbackRequest< + return method_->name(); +} + +template <> +const char* Server::CallbackRequest< grpc::GenericCallbackServerContext>::method_name() const { - return ctx_.method().c_str(); -} - + return ctx_.method().c_str(); +} + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers -class Server::SyncRequestThreadManager : public grpc::ThreadManager { +class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: - SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, + SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, grpc_resource_quota* rq, int min_pollers, int max_pollers, int cq_timeout_msec) @@ -767,11 +767,11 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); switch (server_cq_->AsyncNext(tag, ok, deadline)) { - case grpc::CompletionQueue::TIMEOUT: + case grpc::CompletionQueue::TIMEOUT: return TIMEOUT; - case grpc::CompletionQueue::SHUTDOWN: + case grpc::CompletionQueue::SHUTDOWN: return SHUTDOWN; - case grpc::CompletionQueue::GOT_EVENT: + case grpc::CompletionQueue::GOT_EVENT: return WORK_FOUND; } @@ -806,15 +806,15 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { // object } - void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { + void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new grpc::internal::RpcServiceMethod( - "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, - new grpc::internal::UnknownMethodHandler)); + unknown_method_.reset(new grpc::internal::RpcServiceMethod( + "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, + new grpc::internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -847,9 +847,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void Start() { if (!sync_requests_.empty()) { - for (const auto& value : sync_requests_) { - value->SetupRequest(); - value->Request(server_->c_server(), server_cq_->cq()); + for (const auto& value : sync_requests_) { + value->SetupRequest(); + value->Request(server_->c_server(), server_cq_->cq()); } Initialize(); // ThreadManager's Initialize() @@ -858,27 +858,27 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { private: Server* server_; - grpc::CompletionQueue* server_cq_; + grpc::CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; -static grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( grpc::ChannelArguments* args, - std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> + std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, - std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> - acceptors, + std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> + acceptors, grpc_resource_quota* server_rq, std::vector< - std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> interceptor_creators) - : acceptors_(std::move(acceptors)), - interceptor_creators_(std::move(interceptor_creators)), + : acceptors_(std::move(acceptors)), + interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(INT_MIN), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -888,8 +888,8 @@ Server::Server( server_initializer_(new ServerInitializer(this)), health_check_service_disabled_(false) { g_gli_initializer.summon(); - gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); - global_callbacks_ = grpc::g_callbacks; + gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); + global_callbacks_ = grpc::g_callbacks; global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { @@ -912,22 +912,22 @@ Server::Server( } } - for (auto& acceptor : acceptors_) { - acceptor->SetToChannelArgs(args); - } - + for (auto& acceptor : acceptors_) { + acceptor->SetToChannelArgs(args); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); for (size_t i = 0; i < channel_args.num_args; i++) { - if (0 == strcmp(channel_args.args[i].key, - grpc::kHealthCheckServiceInterfaceArg)) { + if (0 == strcmp(channel_args.args[i].key, + grpc::kHealthCheckServiceInterfaceArg)) { if (channel_args.args[i].value.pointer.p == nullptr) { health_check_service_disabled_ = true; } else { - health_check_service_.reset( - static_cast<grpc::HealthCheckServiceInterface*>( - channel_args.args[i].value.pointer.p)); + health_check_service_.reset( + static_cast<grpc::HealthCheckServiceInterface*>( + channel_args.args[i].value.pointer.p)); } } if (0 == @@ -940,19 +940,19 @@ Server::Server( Server::~Server() { { - grpc::internal::ReleasableMutexLock lock(&mu_); + grpc::internal::ReleasableMutexLock lock(&mu_); if (started_ && !shutdown_) { - lock.Unlock(); + lock.Unlock(); Shutdown(); } else if (!started_) { // Shutdown the completion queues - for (const auto& value : sync_req_mgrs_) { - value->Shutdown(); - } - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; + for (const auto& value : sync_req_mgrs_) { + value->Shutdown(); } + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + callback_cq_ = nullptr; + } } } // Destroy health check service before we destroy the C server so that @@ -963,43 +963,43 @@ Server::~Server() { } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { - GPR_ASSERT(!grpc::g_callbacks); + GPR_ASSERT(!grpc::g_callbacks); GPR_ASSERT(callbacks); - grpc::g_callbacks.reset(callbacks); + grpc::g_callbacks.reset(callbacks); } grpc_server* Server::c_server() { return server_; } -std::shared_ptr<grpc::Channel> Server::InProcessChannel( - const grpc::ChannelArguments& args) { +std::shared_ptr<grpc::Channel> Server::InProcessChannel( + const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); - return grpc::CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), - std::vector<std::unique_ptr< - grpc::experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr< + grpc::experimental::ClientInterceptorFactoryInterface>>()); } -std::shared_ptr<grpc::Channel> +std::shared_ptr<grpc::Channel> Server::experimental_type::InProcessChannelWithInterceptors( - const grpc::ChannelArguments& args, + const grpc::ChannelArguments& args, std::vector< - std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args = args.c_channel_args(); - return grpc::CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), std::move(interceptor_creators)); } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - grpc::internal::RpcServiceMethod* method) { + grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { - case grpc::internal::RpcMethod::NORMAL_RPC: - case grpc::internal::RpcMethod::SERVER_STREAMING: + case grpc::internal::RpcMethod::NORMAL_RPC: + case grpc::internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case grpc::internal::RpcMethod::CLIENT_STREAMING: - case grpc::internal::RpcMethod::BIDI_STREAMING: + case grpc::internal::RpcMethod::CLIENT_STREAMING: + case grpc::internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -1015,14 +1015,14 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { const char* method_name = nullptr; - for (const auto& method : service->methods_) { - if (method.get() == nullptr) { // Handled by generic service if any. + for (const auto& method : service->methods_) { + if (method.get() == nullptr) { // Handled by generic service if any. continue; } void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, - PayloadHandlingForMethod(method.get()), 0); + PayloadHandlingForMethod(method.get()), 0); if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -1032,9 +1032,9 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { if (method->handler() == nullptr) { // Async method without handler method->set_server_tag(method_registration_tag); } else if (method->api_type() == - grpc::internal::RpcServiceMethod::ApiType::SYNC) { - for (const auto& value : sync_req_mgrs_) { - value->AddSyncMethod(method.get(), method_registration_tag); + grpc::internal::RpcServiceMethod::ApiType::SYNC) { + for (const auto& value : sync_req_mgrs_) { + value->AddSyncMethod(method.get(), method_registration_tag); } } else { has_callback_methods_ = true; @@ -1064,32 +1064,32 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { return true; } -void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { +void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; - has_async_generic_service_ = true; + has_async_generic_service_ = true; } -void Server::RegisterCallbackGenericService( +void Server::RegisterCallbackGenericService( grpc::CallbackGenericService* service) { - GPR_ASSERT( - service->server_ == nullptr && - "Can only register a callback generic service against one server."); - service->server_ = this; - has_callback_generic_service_ = true; - generic_handler_.reset(service->Handler()); - + GPR_ASSERT( + service->server_ == nullptr && + "Can only register a callback generic service against one server."); + service->server_ = this; + has_callback_generic_service_ = true; + generic_handler_.reset(service->Handler()); + grpc::CompletionQueue* cq = CallbackCQ(); server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] { grpc_core::Server::BatchCallAllocation result; new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); return result; }); -} - +} + int Server::AddListeningPort(const TString& addr, - grpc::ServerCredentials* creds) { + grpc::ServerCredentials* creds) { GPR_ASSERT(!started_); int port = creds->AddPortToServer(addr, server_); global_callbacks_->AddPort(this, addr, creds, port); @@ -1121,46 +1121,46 @@ void Server::UnrefAndWaitLocked() { shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); } -void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { +void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); started_ = true; // Only create default health check service when user did not provide an // explicit one. - grpc::ServerCompletionQueue* health_check_cq = nullptr; - grpc::DefaultHealthCheckService::HealthCheckServiceImpl* + grpc::ServerCompletionQueue* health_check_cq = nullptr; + grpc::DefaultHealthCheckService::HealthCheckServiceImpl* default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && - grpc::DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new grpc::DefaultHealthCheckService; + grpc::DefaultHealthCheckServiceEnabled()) { + auto* default_hc_service = new grpc::DefaultHealthCheckService; health_check_service_.reset(default_hc_service); // We create a non-polling CQ to avoid impacting application // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = new grpc::ServerCompletionQueue( - GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + health_check_cq = new grpc::ServerCompletionQueue( + GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = default_hc_service->GetHealthCheckService( - std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); + std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); RegisterService(nullptr, default_health_check_service_impl); } - for (auto& acceptor : acceptors_) { - acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); - } - - // If this server uses callback methods, then create a callback generic - // service to handle any unimplemented methods using the default reactor - // creator + for (auto& acceptor : acceptors_) { + acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); + } + + // If this server uses callback methods, then create a callback generic + // service to handle any unimplemented methods using the default reactor + // creator if (has_callback_methods_ && !has_callback_generic_service_) { unimplemented_service_.reset(new grpc::CallbackGenericService); - RegisterCallbackGenericService(unimplemented_service_.get()); - } - + RegisterCallbackGenericService(unimplemented_service_.get()); + } + #ifndef NDEBUG for (size_t i = 0; i < num_cqs; i++) { cq_list_.push_back(cqs[i]); @@ -1169,9 +1169,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); - if (!has_async_generic_service_ && !has_callback_generic_service_) { - for (const auto& value : sync_req_mgrs_) { - value->AddUnknownSyncMethod(); + if (!has_async_generic_service_ && !has_callback_generic_service_) { + for (const auto& value : sync_req_mgrs_) { + value->AddUnknownSyncMethod(); } for (size_t i = 0; i < num_cqs; i++) { @@ -1188,50 +1188,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // server CQs), make sure that we have a ResourceExhausted handler // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset( - new grpc::internal::ResourceExhaustedHandler); + resource_exhausted_handler_.reset( + new grpc::internal::ResourceExhaustedHandler); } - for (const auto& value : sync_req_mgrs_) { - value->Start(); + for (const auto& value : sync_req_mgrs_) { + value->Start(); } if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } - - for (auto& acceptor : acceptors_) { - acceptor->Start(); - } + + for (auto& acceptor : acceptors_) { + acceptor->Start(); + } } void Server::ShutdownInternal(gpr_timespec deadline) { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); if (shutdown_) { return; } shutdown_ = true; - for (auto& acceptor : acceptors_) { - acceptor->Shutdown(); - } - + for (auto& acceptor : acceptors_) { + acceptor->Shutdown(); + } + /// The completion queue to use for server shutdown completion notification - grpc::CompletionQueue shutdown_cq; - grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc::CompletionQueue shutdown_cq; + grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); shutdown_cq.Shutdown(); void* tag; bool ok; - grpc::CompletionQueue::NextStatus status = + grpc::CompletionQueue::NextStatus status = shutdown_cq.AsyncNext(&tag, &ok, deadline); // If this timed out, it means we are done with the grace period for a clean // shutdown. We should force a shutdown now by cancelling all inflight calls - if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { + if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { grpc_server_cancel_all_calls(server_); } // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has @@ -1239,25 +1239,25 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Shutdown all ThreadManagers. This will try to gracefully stop all the // threads in the ThreadManagers (once they process any inflight requests) - for (const auto& value : sync_req_mgrs_) { - value->Shutdown(); // ThreadManager's Shutdown() + for (const auto& value : sync_req_mgrs_) { + value->Shutdown(); // ThreadManager's Shutdown() } // Wait for threads in all ThreadManagers to terminate - for (const auto& value : sync_req_mgrs_) { - value->Wait(); + for (const auto& value : sync_req_mgrs_) { + value->Wait(); } // Drop the shutdown ref and wait for all other refs to drop as well. UnrefAndWaitLocked(); - // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it - // will delete itself at true shutdown. - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; - } - + // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it + // will delete itself at true shutdown. + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + callback_cq_ = nullptr; + } + // Drain the shutdown queue (if the previous call to AsyncNext() timed out // and we didn't remove the tag from the queue yet) while (shutdown_cq.Next(&tag, &ok)) { @@ -1265,7 +1265,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.Broadcast(); #ifndef NDEBUG // Unregister this server with the CQs passed into it by the user so that @@ -1278,14 +1278,14 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } void Server::Wait() { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); while (started_ && !shutdown_notified_) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.Wait(&mu_); } } -void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, - grpc::internal::Call* call) { +void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, + grpc::internal::Call* call) { ops->FillOps(call); } @@ -1310,19 +1310,19 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { - grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); - grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); + grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); + grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } -grpc::ServerInitializer* Server::initializer() { - return server_initializer_.get(); -} +grpc::ServerInitializer* Server::initializer() { + return server_initializer_.get(); +} -grpc::CompletionQueue* Server::CallbackCQ() { +grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered - grpc::internal::MutexLock l(&mu_); + grpc::internal::MutexLock l(&mu_); if (callback_cq_ != nullptr) { return callback_cq_; } @@ -1335,6 +1335,6 @@ grpc::CompletionQueue* Server::CallbackCQ() { shutdown_callback->TakeCQ(callback_cq_); return callback_cq_; -} +} } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index 458ac20d87..4a4181d39f 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -28,18 +28,18 @@ #include <grpc/support/log.h> #include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/completion_queue.h> -#include <grpcpp/support/server_callback.h> +#include <grpcpp/support/server_callback.h> #include <grpcpp/support/time.h> -#include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/surface/call.h" namespace grpc { // CompletionOp -class ServerContextBase::CompletionOp final +class ServerContextBase::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq @@ -47,7 +47,7 @@ class ServerContextBase::CompletionOp final CompletionOp(internal::Call* call, ::grpc::internal::ServerCallbackCall* callback_controller) : call_(*call), - callback_controller_(callback_controller), + callback_controller_(callback_controller), has_tag_(false), tag_(nullptr), core_cq_tag_(this), @@ -73,10 +73,10 @@ class ServerContextBase::CompletionOp final // This should always be arena allocated in the call, so override delete. // But this class is not trivially destructible, so must actually call delete // before allowing the arena to be freed - static void operator delete(void* /*ptr*/, std::size_t size) { - // Use size to avoid unused-parameter warning since assert seems to be - // compiled out and treated as unused in some gcc optimized versions. - (void)size; + static void operator delete(void* /*ptr*/, std::size_t size) { + // Use size to avoid unused-parameter warning since assert seems to be + // compiled out and treated as unused in some gcc optimized versions. + (void)size; assert(size == sizeof(CompletionOp)); } @@ -110,7 +110,7 @@ class ServerContextBase::CompletionOp final // RPC. This should set hijacking state for each of the ops. void SetHijackingState() override { /* Servers don't allow hijacking */ - GPR_ASSERT(false); + GPR_ASSERT(false); } /* Should be called after interceptors are done running */ @@ -122,17 +122,17 @@ class ServerContextBase::CompletionOp final done_intercepting_ = true; if (!has_tag_) { /* We don't have a tag to return. */ - Unref(); + Unref(); return; } /* Start a dummy op so that we can return the tag */ - GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, - nullptr) == GRPC_CALL_OK); + GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, + nullptr) == GRPC_CALL_OK); } private: bool CheckCancelledNoPluck() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); return finalized_ ? (cancelled_ != 0) : false; } @@ -141,16 +141,16 @@ class ServerContextBase::CompletionOp final bool has_tag_; void* tag_; void* core_cq_tag_; - grpc_core::RefCount refs_; - grpc_core::Mutex mu_; + grpc_core::RefCount refs_; + grpc_core::Mutex mu_; bool finalized_; int cancelled_; // This is an int (not bool) because it is passed to core bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; -void ServerContextBase::CompletionOp::Unref() { - if (refs_.Unref()) { +void ServerContextBase::CompletionOp::Unref() { + if (refs_.Unref()) { grpc_call* call = call_.call(); delete this; grpc_call_unref(call); @@ -166,14 +166,14 @@ void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, - nullptr) == GRPC_CALL_OK); + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, + nullptr) == GRPC_CALL_OK); /* No interceptors to run here */ } -bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { +bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { // Decide whether to call the cancel callback within the lock bool call_cancel; @@ -201,8 +201,8 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { // Release the lock since we may call a callback and interceptors. } - if (call_cancel && callback_controller_ != nullptr) { - callback_controller_->MaybeCallOnCancel(); + if (call_cancel && callback_controller_ != nullptr) { + callback_controller_->MaybeCallOnCancel(); } /* Add interception point and run through interceptors */ interceptor_methods_.AddInterceptionHookPoint( @@ -213,26 +213,26 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { if (has_tag) { *tag = tag_; } - Unref(); + Unref(); return has_tag; } // There are interceptors to be run. Return false for now. return false; } -// ServerContextBase body +// ServerContextBase body ServerContextBase::ServerContextBase() : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {} -ServerContextBase::ServerContextBase(gpr_timespec deadline, +ServerContextBase::ServerContextBase(gpr_timespec deadline, grpc_metadata_array* arr) : deadline_(deadline) { std::swap(*client_metadata_.arr(), *arr); } -void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, - grpc_metadata_array* arr) { +void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, + grpc_metadata_array* arr) { deadline_ = deadline; std::swap(*client_metadata_.arr(), *arr); } @@ -244,9 +244,9 @@ ServerContextBase::~ServerContextBase() { if (rpc_info_) { rpc_info_->Unref(); } - if (default_reactor_used_.load(std::memory_order_relaxed)) { + if (default_reactor_used_.load(std::memory_order_relaxed)) { reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); - } + } } ServerContextBase::CallWrapper::~CallWrapper() { @@ -257,7 +257,7 @@ ServerContextBase::CallWrapper::~CallWrapper() { } } -void ServerContextBase::BeginCompletionOp( +void ServerContextBase::BeginCompletionOp( internal::Call* call, std::function<void(bool)> callback, ::grpc::internal::ServerCallbackCall* callback_controller) { GPR_ASSERT(!completion_op_); @@ -267,10 +267,10 @@ void ServerContextBase::BeginCompletionOp( grpc_call_ref(call->call()); completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) - CompletionOp(call, callback_controller); - if (callback_controller != nullptr) { - completion_tag_.Set(call->call(), std::move(callback), completion_op_, - true); + CompletionOp(call, callback_controller); + if (callback_controller != nullptr) { + completion_tag_.Set(call->call(), std::move(callback), completion_op_, + true); completion_op_->set_core_cq_tag(&completion_tag_); completion_op_->set_tag(completion_op_); } else if (has_notify_when_done_tag_) { @@ -293,7 +293,7 @@ void ServerContextBase::AddTrailingMetadata(const TString& key, trailing_metadata_.insert(std::make_pair(key, value)); } -void ServerContextBase::TryCancel() const { +void ServerContextBase::TryCancel() const { internal::CancelInterceptorBatchMethods cancel_methods; if (rpc_info_) { for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { @@ -308,7 +308,7 @@ void ServerContextBase::TryCancel() const { } } -bool ServerContextBase::IsCancelled() const { +bool ServerContextBase::IsCancelled() const { if (completion_tag_) { // When using callback API, this result is always valid. return completion_op_->CheckCancelledAsync(); @@ -322,7 +322,7 @@ bool ServerContextBase::IsCancelled() const { } } -void ServerContextBase::set_compression_algorithm( +void ServerContextBase::set_compression_algorithm( grpc_compression_algorithm algorithm) { compression_algorithm_ = algorithm; const char* algorithm_name = nullptr; @@ -345,12 +345,12 @@ TString ServerContextBase::peer() const { return peer; } -const struct census_context* ServerContextBase::census_context() const { +const struct census_context* ServerContextBase::census_context() const { return call_.call == nullptr ? nullptr : grpc_census_call_get_context(call_.call); } -void ServerContextBase::SetLoadReportingCosts( +void ServerContextBase::SetLoadReportingCosts( const std::vector<TString>& cost_data) { if (call_.call == nullptr) return; for (const auto& cost_datum : cost_data) { diff --git a/contrib/libs/grpc/src/cpp/server/server_posix.cc b/contrib/libs/grpc/src/cpp/server/server_posix.cc index c3d40d4fa2..85709b0c70 100644 --- a/contrib/libs/grpc/src/cpp/server/server_posix.cc +++ b/contrib/libs/grpc/src/cpp/server/server_posix.cc @@ -24,7 +24,7 @@ namespace grpc { #ifdef GPR_SUPPORT_CHANNELS_FROM_FD -void AddInsecureChannelFromFd(grpc::Server* server, int fd) { +void AddInsecureChannelFromFd(grpc::Server* server, int fd) { grpc_server_add_insecure_channel_from_fd(server->c_server(), nullptr, fd); } |