diff options
author | neksard <neksard@yandex-team.ru> | 2022-02-10 16:45:23 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:23 +0300 |
commit | 8f7cf138264e0caa318144bf8a2c950e0b0a8593 (patch) | |
tree | 83bf5c8c8047c42d8475e6095df90ccdc3d1b57f /contrib/libs/grpc/src/cpp/server | |
parent | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (diff) | |
download | ydb-8f7cf138264e0caa318144bf8a2c950e0b0a8593.tar.gz |
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
18 files changed, 1730 insertions, 1730 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc index 6dcf84bf40..b707f3c476 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc @@ -1,29 +1,29 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include "src/cpp/server/channelz/channelz_service.h" - -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> - -namespace grpc { +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/server/channelz/channelz_service.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +namespace grpc { namespace { @@ -33,121 +33,121 @@ grpc::protobuf::util::Status ParseJson(const char* json_str, options.case_insensitive_enum_parsing = true; return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); } - + } // namespace -Status ChannelzService::GetTopChannels( +Status ChannelzService::GetTopChannels( ServerContext* /*unused*/, const channelz::v1::GetTopChannelsRequest* request, - channelz::v1::GetTopChannelsResponse* response) { - char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_top_channels returned null"); - } + channelz::v1::GetTopChannelsResponse* response) { + char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_top_channels returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetServers( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetServers( ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request, - channelz::v1::GetServersResponse* response) { - char* json_str = grpc_channelz_get_servers(request->start_server_id()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_servers returned null"); - } + channelz::v1::GetServersResponse* response) { + char* json_str = grpc_channelz_get_servers(request->start_server_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_servers returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + Status ChannelzService::GetServer(ServerContext* /*unused*/, - const channelz::v1::GetServerRequest* request, - channelz::v1::GetServerResponse* response) { - char* json_str = grpc_channelz_get_server(request->server_id()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_server returned null"); - } + const channelz::v1::GetServerRequest* request, + channelz::v1::GetServerResponse* response) { + char* json_str = grpc_channelz_get_server(request->server_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetServerSockets( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetServerSockets( ServerContext* /*unused*/, const channelz::v1::GetServerSocketsRequest* request, - channelz::v1::GetServerSocketsResponse* response) { - char* json_str = grpc_channelz_get_server_sockets( - request->server_id(), request->start_socket_id(), request->max_results()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_server_sockets returned null"); - } + channelz::v1::GetServerSocketsResponse* response) { + char* json_str = grpc_channelz_get_server_sockets( + request->server_id(), request->start_socket_id(), request->max_results()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server_sockets returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetChannel( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetChannel( ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request, - channelz::v1::GetChannelResponse* response) { - char* json_str = grpc_channelz_get_channel(request->channel_id()); - if (json_str == nullptr) { - return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); - } + channelz::v1::GetChannelResponse* response) { + char* json_str = grpc_channelz_get_channel(request->channel_id()); + if (json_str == nullptr) { + return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetSubchannel( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetSubchannel( ServerContext* /*unused*/, const channelz::v1::GetSubchannelRequest* request, - channelz::v1::GetSubchannelResponse* response) { - char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); - if (json_str == nullptr) { - return Status(StatusCode::NOT_FOUND, - "No object found for that SubchannelId"); - } + channelz::v1::GetSubchannelResponse* response) { + char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); + if (json_str == nullptr) { + return Status(StatusCode::NOT_FOUND, + "No object found for that SubchannelId"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + Status ChannelzService::GetSocket(ServerContext* /*unused*/, - const channelz::v1::GetSocketRequest* request, - channelz::v1::GetSocketResponse* response) { - char* json_str = grpc_channelz_get_socket(request->socket_id()); - if (json_str == nullptr) { - return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); - } + const channelz::v1::GetSocketRequest* request, + channelz::v1::GetSocketResponse* response) { + char* json_str = grpc_channelz_get_socket(request->socket_id()); + if (json_str == nullptr) { + return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -} // namespace grpc + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h index b4a66ba1c6..72818a0d72 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h @@ -1,64 +1,64 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H -#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H - -#include <grpc/support/port_platform.h> - -#include <grpcpp/grpcpp.h> -#include "src/proto/grpc/channelz/channelz.grpc.pb.h" - -namespace grpc { - -class ChannelzService final : public channelz::v1::Channelz::Service { - private: - // implementation of GetTopChannels rpc - Status GetTopChannels( - ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, - channelz::v1::GetTopChannelsResponse* response) override; - // implementation of GetServers rpc - Status GetServers(ServerContext* unused, - const channelz::v1::GetServersRequest* request, - channelz::v1::GetServersResponse* response) override; - // implementation of GetServer rpc - Status GetServer(ServerContext* unused, - const channelz::v1::GetServerRequest* request, - channelz::v1::GetServerResponse* response) override; - // implementation of GetServerSockets rpc - Status GetServerSockets( - ServerContext* unused, - const channelz::v1::GetServerSocketsRequest* request, - channelz::v1::GetServerSocketsResponse* response) override; - // implementation of GetChannel rpc - Status GetChannel(ServerContext* unused, - const channelz::v1::GetChannelRequest* request, - channelz::v1::GetChannelResponse* response) override; - // implementation of GetSubchannel rpc - Status GetSubchannel(ServerContext* unused, - const channelz::v1::GetSubchannelRequest* request, - channelz::v1::GetSubchannelResponse* response) override; - // implementation of GetSocket rpc - Status GetSocket(ServerContext* unused, - const channelz::v1::GetSocketRequest* request, - channelz::v1::GetSocketResponse* response) override; -}; - -} // namespace grpc - -#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H +#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H + +#include <grpc/support/port_platform.h> + +#include <grpcpp/grpcpp.h> +#include "src/proto/grpc/channelz/channelz.grpc.pb.h" + +namespace grpc { + +class ChannelzService final : public channelz::v1::Channelz::Service { + private: + // implementation of GetTopChannels rpc + Status GetTopChannels( + ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, + channelz::v1::GetTopChannelsResponse* response) override; + // implementation of GetServers rpc + Status GetServers(ServerContext* unused, + const channelz::v1::GetServersRequest* request, + channelz::v1::GetServersResponse* response) override; + // implementation of GetServer rpc + Status GetServer(ServerContext* unused, + const channelz::v1::GetServerRequest* request, + channelz::v1::GetServerResponse* response) override; + // implementation of GetServerSockets rpc + Status GetServerSockets( + ServerContext* unused, + const channelz::v1::GetServerSocketsRequest* request, + channelz::v1::GetServerSocketsResponse* response) override; + // implementation of GetChannel rpc + Status GetChannel(ServerContext* unused, + const channelz::v1::GetChannelRequest* request, + channelz::v1::GetChannelResponse* response) override; + // implementation of GetSubchannel rpc + Status GetSubchannel(ServerContext* unused, + const channelz::v1::GetSubchannelRequest* request, + channelz::v1::GetSubchannelResponse* response) override; + // implementation of GetSocket rpc + Status GetSocket(ServerContext* unused, + const channelz::v1::GetSocketRequest* request, + channelz::v1::GetSocketResponse* response) override; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc index ae26a447ab..35ecd08125 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc @@ -1,72 +1,72 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <grpcpp/ext/channelz_service_plugin.h> -#include <grpcpp/impl/server_builder_plugin.h> -#include <grpcpp/impl/server_initializer.h> -#include <grpcpp/server.h> - -#include "src/cpp/server/channelz/channelz_service.h" - -namespace grpc { -namespace channelz { -namespace experimental { - -class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { - public: - ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {} - +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include <grpcpp/ext/channelz_service_plugin.h> +#include <grpcpp/impl/server_builder_plugin.h> +#include <grpcpp/impl/server_initializer.h> +#include <grpcpp/server.h> + +#include "src/cpp/server/channelz/channelz_service.h" + +namespace grpc { +namespace channelz { +namespace experimental { + +class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { + public: + ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {} + TString name() override { return "channelz_service"; } - - void InitServer(grpc::ServerInitializer* si) override { - si->RegisterService(channelz_service_); - } - + + void InitServer(grpc::ServerInitializer* si) override { + si->RegisterService(channelz_service_); + } + void Finish(grpc::ServerInitializer* /*si*/) override {} - + void ChangeArguments(const TString& /*name*/, void* /*value*/) override {} - - bool has_sync_methods() const override { - if (channelz_service_) { - return channelz_service_->has_synchronous_methods(); - } - return false; - } - - bool has_async_methods() const override { - if (channelz_service_) { - return channelz_service_->has_async_methods(); - } - return false; - } - - private: - std::shared_ptr<grpc::ChannelzService> channelz_service_; -}; - -static std::unique_ptr< ::grpc::ServerBuilderPlugin> -CreateChannelzServicePlugin() { - return std::unique_ptr< ::grpc::ServerBuilderPlugin>( - new ChannelzServicePlugin()); -} - + + bool has_sync_methods() const override { + if (channelz_service_) { + return channelz_service_->has_synchronous_methods(); + } + return false; + } + + bool has_async_methods() const override { + if (channelz_service_) { + return channelz_service_->has_async_methods(); + } + return false; + } + + private: + std::shared_ptr<grpc::ChannelzService> channelz_service_; +}; + +static std::unique_ptr< ::grpc::ServerBuilderPlugin> +CreateChannelzServicePlugin() { + return std::unique_ptr< ::grpc::ServerBuilderPlugin>( + new ChannelzServicePlugin()); +} + } // namespace experimental } // namespace channelz } // namespace grpc @@ -74,15 +74,15 @@ namespace grpc_impl { namespace channelz { namespace experimental { -void InitChannelzService() { +void InitChannelzService() { static struct Initializer { Initializer() { ::grpc::ServerBuilder::InternalAddPluginFactory( &grpc::channelz::experimental::CreateChannelzServicePlugin); } } initialize; -} - -} // namespace experimental -} // namespace channelz +} + +} // namespace experimental +} // namespace channelz } // namespace grpc_impl diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index 3cc508d0cb..5f70ce0540 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -25,191 +25,191 @@ #include <grpc/support/log.h> #include <grpcpp/impl/codegen/method_handler.h> -#include "src/cpp/server/health/default_health_check_service.h" +#include "src/cpp/server/health/default_health_check_service.h" #include "src/proto/grpc/health/v1/health.upb.h" #include "upb/upb.hpp" #define MAX_SERVICE_NAME_LENGTH 200 namespace grpc { - -// -// DefaultHealthCheckService -// - -DefaultHealthCheckService::DefaultHealthCheckService() { - services_map_[""].SetServingStatus(SERVING); -} - -void DefaultHealthCheckService::SetServingStatus( + +// +// DefaultHealthCheckService +// + +DefaultHealthCheckService::DefaultHealthCheckService() { + services_map_[""].SetServingStatus(SERVING); +} + +void DefaultHealthCheckService::SetServingStatus( const TString& service_name, bool serving) { grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - // Set to NOT_SERVING in case service_name is not in the map. - serving = false; - } - services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); -} - -void DefaultHealthCheckService::SetServingStatus(bool serving) { - const ServingStatus status = serving ? SERVING : NOT_SERVING; + if (shutdown_) { + // Set to NOT_SERVING in case service_name is not in the map. + serving = false; + } + services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); +} + +void DefaultHealthCheckService::SetServingStatus(bool serving) { + const ServingStatus status = serving ? SERVING : NOT_SERVING; grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return; - } - for (auto& p : services_map_) { - ServiceData& service_data = p.second; - service_data.SetServingStatus(status); - } -} - -void DefaultHealthCheckService::Shutdown() { + if (shutdown_) { + return; + } + for (auto& p : services_map_) { + ServiceData& service_data = p.second; + service_data.SetServingStatus(status); + } +} + +void DefaultHealthCheckService::Shutdown() { grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return; - } - shutdown_ = true; - for (auto& p : services_map_) { - ServiceData& service_data = p.second; - service_data.SetServingStatus(NOT_SERVING); - } -} - -DefaultHealthCheckService::ServingStatus -DefaultHealthCheckService::GetServingStatus( + if (shutdown_) { + return; + } + shutdown_ = true; + for (auto& p : services_map_) { + ServiceData& service_data = p.second; + service_data.SetServingStatus(NOT_SERVING); + } +} + +DefaultHealthCheckService::ServingStatus +DefaultHealthCheckService::GetServingStatus( const TString& service_name) const { grpc_core::MutexLock lock(&mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) { - return NOT_FOUND; - } - const ServiceData& service_data = it->second; - return service_data.GetServingStatus(); -} - -void DefaultHealthCheckService::RegisterCallHandler( + auto it = services_map_.find(service_name); + if (it == services_map_.end()) { + return NOT_FOUND; + } + const ServiceData& service_data = it->second; + return service_data.GetServingStatus(); +} + +void DefaultHealthCheckService::RegisterCallHandler( const TString& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { grpc_core::MutexLock lock(&mu_); - ServiceData& service_data = services_map_[service_name]; - service_data.AddCallHandler(handler /* copies ref */); - HealthCheckServiceImpl::CallHandler* h = handler.get(); - h->SendHealth(std::move(handler), service_data.GetServingStatus()); -} - -void DefaultHealthCheckService::UnregisterCallHandler( + ServiceData& service_data = services_map_[service_name]; + service_data.AddCallHandler(handler /* copies ref */); + HealthCheckServiceImpl::CallHandler* h = handler.get(); + h->SendHealth(std::move(handler), service_data.GetServingStatus()); +} + +void DefaultHealthCheckService::UnregisterCallHandler( const TString& service_name, - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { grpc_core::MutexLock lock(&mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) return; - ServiceData& service_data = it->second; - service_data.RemoveCallHandler(handler); - if (service_data.Unused()) { - services_map_.erase(it); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq) { - GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); - return impl_.get(); -} - -// -// DefaultHealthCheckService::ServiceData -// - -void DefaultHealthCheckService::ServiceData::SetServingStatus( - ServingStatus status) { - status_ = status; - for (auto& call_handler : call_handlers_) { - call_handler->SendHealth(call_handler /* copies ref */, status); - } -} - -void DefaultHealthCheckService::ServiceData::AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - call_handlers_.insert(std::move(handler)); -} - -void DefaultHealthCheckService::ServiceData::RemoveCallHandler( - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { - call_handlers_.erase(handler); -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl -// - + auto it = services_map_.find(service_name); + if (it == services_map_.end()) return; + ServiceData& service_data = it->second; + service_data.RemoveCallHandler(handler); + if (service_data.Unused()) { + services_map_.erase(it); + } +} + +DefaultHealthCheckService::HealthCheckServiceImpl* +DefaultHealthCheckService::GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq) { + GPR_ASSERT(impl_ == nullptr); + impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); + return impl_.get(); +} + +// +// DefaultHealthCheckService::ServiceData +// + +void DefaultHealthCheckService::ServiceData::SetServingStatus( + ServingStatus status) { + status_ = status; + for (auto& call_handler : call_handlers_) { + call_handler->SendHealth(call_handler /* copies ref */, status); + } +} + +void DefaultHealthCheckService::ServiceData::AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + call_handlers_.insert(std::move(handler)); +} + +void DefaultHealthCheckService::ServiceData::RemoveCallHandler( + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { + call_handlers_.erase(handler); +} + +// +// DefaultHealthCheckService::HealthCheckServiceImpl +// + namespace { const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; -const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; +const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq) - : database_(database), cq_(std::move(cq)) { - // Add Check() method. - AddMethod(new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); - // Add Watch() method. - AddMethod(new internal::RpcServiceMethod( - kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); - // Create serving thread. - thread_ = std::unique_ptr<::grpc_core::Thread>( - new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); + DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq) + : database_(database), cq_(std::move(cq)) { + // Add Check() method. + AddMethod(new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); + // Add Watch() method. + AddMethod(new internal::RpcServiceMethod( + kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); + // Create serving thread. + thread_ = std::unique_ptr<::grpc_core::Thread>( + new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); } -DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - // We will reach here after the server starts shutting down. - shutdown_ = true; - { +DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { + // We will reach here after the server starts shutting down. + shutdown_ = true; + { grpc_core::MutexLock lock(&cq_shutdown_mu_); - cq_->Shutdown(); - } - thread_->Join(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { - // Request the calls we're interested in. - // We do this before starting the serving thread, so that we know it's - // done before server startup is complete. - CheckCallHandler::CreateAndStart(cq_.get(), database_, this); - WatchCallHandler::CreateAndStart(cq_.get(), database_, this); - // Start serving thread. - thread_->Start(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { + cq_->Shutdown(); + } + thread_->Join(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { + // Request the calls we're interested in. + // We do this before starting the serving thread, so that we know it's + // done before server startup is complete. + CheckCallHandler::CreateAndStart(cq_.get(), database_, this); + WatchCallHandler::CreateAndStart(cq_.get(), database_, this); + // Start serving thread. + thread_->Start(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg); - void* tag; - bool ok; - while (true) { - if (!service->cq_->Next(&tag, &ok)) { - // The completion queue is shutting down. - GPR_ASSERT(service->shutdown_); - break; - } - auto* next_step = static_cast<CallableTag*>(tag); - next_step->Run(ok); - } -} - -bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( + void* tag; + bool ok; + while (true) { + if (!service->cq_->Next(&tag, &ok)) { + // The completion queue is shutting down. + GPR_ASSERT(service->shutdown_); + break; + } + auto* next_step = static_cast<CallableTag*>(tag); + next_step->Run(ok); + } +} + +bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( const ByteBuffer& request, TString* service_name) { std::vector<Slice> slices; - if (!request.Dump(&slices).ok()) return false; + if (!request.Dump(&slices).ok()) return false; uint8_t* request_bytes = nullptr; size_t request_size = 0; - if (slices.size() == 1) { + if (slices.size() == 1) { request_bytes = const_cast<uint8_t*>(slices[0].begin()); request_size = slices[0].size(); - } else if (slices.size() > 1) { - request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); + } else if (slices.size() > 1) { + request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { memcpy(copy_to, slices[i].begin(), slices[i].size()); @@ -220,8 +220,8 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( grpc_health_v1_HealthCheckRequest* request_struct = grpc_health_v1_HealthCheckRequest_parse( reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); - if (slices.size() > 1) { - gpr_free(request_bytes); + if (slices.size() > 1) { + gpr_free(request_bytes); } if (request_struct == nullptr) { return false; @@ -232,17 +232,17 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( return false; } service_name->assign(service.data, service.size); - return true; -} + return true; +} -bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( - ServingStatus status, ByteBuffer* response) { +bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( + ServingStatus status, ByteBuffer* response) { upb::Arena arena; grpc_health_v1_HealthCheckResponse* response_struct = grpc_health_v1_HealthCheckResponse_new(arena.ptr()); grpc_health_v1_HealthCheckResponse_set_status( response_struct, - status == NOT_FOUND + status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING : grpc_health_v1_HealthCheckResponse_NOT_SERVING); @@ -256,249 +256,249 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); - return true; + return true; } -// -// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr<CallHandler> self = - std::make_shared<CheckCallHandler>(cq, database, service); - CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); - { +// +// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr<CallHandler> self = + std::make_shared<CheckCallHandler>(cq, database, service); + CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); + { grpc_core::MutexLock lock(&service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request a Check() call. - handler->next_ = - CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, - &handler->writer_, cq, cq, &handler->next_); - } + if (service->shutdown_) return; + // Request a Check() call. + handler->next_ = + CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, + &handler->writer_, cq, cq, &handler->next_); + } } -DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - // The value of ok being false means that the server is shutting down. - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Process request. - gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, - this); +DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + // The value of ok being false means that the server is shutting down. + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Process request. + gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, + this); TString service_name; - grpc::Status status = Status::OK; - ByteBuffer response; - if (!service_->DecodeRequest(request_, &service_name)) { - status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); - } else { - ServingStatus serving_status = database_->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - status = Status(StatusCode::NOT_FOUND, "service name unknown"); - } else if (!service_->EncodeResponse(serving_status, &response)) { - status = Status(StatusCode::INTERNAL, "could not encode response"); - } - } - // Send response. - { + grpc::Status status = Status::OK; + ByteBuffer response; + if (!service_->DecodeRequest(request_, &service_name)) { + status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); + } else { + ServingStatus serving_status = database_->GetServingStatus(service_name); + if (serving_status == NOT_FOUND) { + status = Status(StatusCode::NOT_FOUND, "service name unknown"); + } else if (!service_->EncodeResponse(serving_status, &response)) { + status = Status(StatusCode::INTERNAL, "could not encode response"); + } + } + // Send response. + { grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - next_ = - CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - if (status.ok()) { - writer_.Finish(response, status, &next_); - } else { - writer_.FinishWithError(status, &next_); - } - } - } + if (!service_->shutdown_) { + next_ = + CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + if (status.ok()) { + writer_.Finish(response, status, &next_); + } else { + writer_.FinishWithError(status, &next_); + } + } + } } -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", - service_, this); +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", + service_, this); } - self.reset(); // To appease clang-tidy. + self.reset(); // To appease clang-tidy. } -// -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr<CallHandler> self = - std::make_shared<WatchCallHandler>(cq, database, service); - WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); - { +// +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr<CallHandler> self = + std::make_shared<WatchCallHandler>(cq, database, service); + WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); + { grpc_core::MutexLock lock(&service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request AsyncNotifyWhenDone(). - handler->on_done_notified_ = - CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, - std::placeholders::_1, std::placeholders::_2), - self /* copies ref */); - handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); - // Request a Watch() call. - handler->next_ = - CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, - &handler->stream_, cq, cq, - &handler->next_); + if (service->shutdown_) return; + // Request AsyncNotifyWhenDone(). + handler->on_done_notified_ = + CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, + std::placeholders::_1, std::placeholders::_2), + self /* copies ref */); + handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); + // Request a Watch() call. + handler->next_ = + CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, + &handler->stream_, cq, cq, + &handler->next_); } } -DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - // Server shutting down. - // - // AsyncNotifyWhenDone() needs to be called before the call starts, but the - // tag will not pop out if the call never starts ( - // https://github.com/grpc/grpc/issues/10136). So we need to manually - // release the ownership of the handler in this case. - GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Parse request. - if (!service_->DecodeRequest(request_, &service_name_)) { - SendFinish(std::move(self), - Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); - return; - } - // Register the call for updates to the service. - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch started for service \"%s\" (handler: %p)", - service_, service_name_.c_str(), this); - database_->RegisterCallHandler(service_name_, std::move(self)); +DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + // Server shutting down. + // + // AsyncNotifyWhenDone() needs to be called before the call starts, but the + // tag will not pop out if the call never starts ( + // https://github.com/grpc/grpc/issues/10136). So we need to manually + // release the ownership of the handler in this case. + GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Parse request. + if (!service_->DecodeRequest(request_, &service_name_)) { + SendFinish(std::move(self), + Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); + return; + } + // Register the call for updates to the service. + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch started for service \"%s\" (handler: %p)", + service_, service_name_.c_str(), this); + database_->RegisterCallHandler(service_name_, std::move(self)); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { grpc_core::MutexLock lock(&send_mu_); - // If there's already a send in flight, cache the new status, and - // we'll start a new send for it when the one in flight completes. - if (send_in_flight_) { - pending_status_ = status; - return; - } - // Start a send. - SendHealthLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { - send_in_flight_ = true; - // Construct response. - ByteBuffer response; - bool success = service_->EncodeResponse(status, &response); - // Grab shutdown lock and send response. + // If there's already a send in flight, cache the new status, and + // we'll start a new send for it when the one in flight completes. + if (send_in_flight_) { + pending_status_ = status; + return; + } + // Start a send. + SendHealthLocked(std::move(self), status); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { + send_in_flight_ = true; + // Construct response. + ByteBuffer response; + bool success = service_->EncodeResponse(status, &response); + // Grab shutdown lock and send response. grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); - if (service_->shutdown_) { - SendFinishLocked(std::move(self), Status::CANCELLED); - return; - } - if (!success) { - SendFinishLocked(std::move(self), - Status(StatusCode::INTERNAL, "could not encode response")); - return; - } - next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Write(response, &next_); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - SendFinish(std::move(self), Status::CANCELLED); - return; - } + if (service_->shutdown_) { + SendFinishLocked(std::move(self), Status::CANCELLED); + return; + } + if (!success) { + SendFinishLocked(std::move(self), + Status(StatusCode::INTERNAL, "could not encode response")); + return; + } + next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Write(response, &next_); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + SendFinish(std::move(self), Status::CANCELLED); + return; + } grpc_core::MutexLock lock(&send_mu_); - send_in_flight_ = false; - // If we got a new status since we started the last send, start a - // new send for it. - if (pending_status_ != NOT_FOUND) { - auto status = pending_status_; - pending_status_ = NOT_FOUND; - SendHealthLocked(std::move(self), status); - } -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { - if (finish_called_) return; + send_in_flight_ = false; + // If we got a new status since we started the last send, start a + // new send for it. + if (pending_status_ != NOT_FOUND) { + auto status = pending_status_; + pending_status_ = NOT_FOUND; + SendHealthLocked(std::move(self), status); + } +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { + if (finish_called_) return; grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); - if (service_->shutdown_) return; - SendFinishLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(status, &on_finish_done_); - finish_called_ = true; -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call finished (service_name: \"%s\", " - "handler: %p).", - service_, service_name_.c_str(), this); - } - self.reset(); // To appease clang-tidy. -} - -// TODO(roth): This method currently assumes that there will be only one -// thread polling the cq and invoking the corresponding callbacks. If -// that changes, we will need to add synchronization here. -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { - GPR_ASSERT(ok); - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call is notified done (handler: %p, " - "is_cancelled: %d).", - service_, this, static_cast<int>(ctx_.IsCancelled())); - database_->UnregisterCallHandler(service_name_, self); - SendFinish(std::move(self), Status::CANCELLED); -} - + if (service_->shutdown_) return; + SendFinishLocked(std::move(self), status); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(status, &on_finish_done_); + finish_called_ = true; +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call finished (service_name: \"%s\", " + "handler: %p).", + service_, service_name_.c_str(), this); + } + self.reset(); // To appease clang-tidy. +} + +// TODO(roth): This method currently assumes that there will be only one +// thread polling the cq and invoking the corresponding callbacks. If +// that changes, we will need to add synchronization here. +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { + GPR_ASSERT(ok); + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call is notified done (handler: %p, " + "is_cancelled: %d).", + service_, this, static_cast<int>(ctx_.IsCancelled())); + database_->UnregisterCallHandler(service_name_, self); + SendFinish(std::move(self), Status::CANCELLED); +} + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h index 9da1dfc15f..5da0ef935a 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h @@ -19,260 +19,260 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include <atomic> -#include <set> +#include <atomic> +#include <set> -#include <grpc/support/log.h> -#include <grpcpp/grpcpp.h> +#include <grpc/support/log.h> +#include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> -#include <grpcpp/impl/codegen/async_generic_service.h> -#include <grpcpp/impl/codegen/async_unary_call.h> +#include <grpcpp/impl/codegen/async_generic_service.h> +#include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/gprpp/thd.h" - +#include "src/core/lib/gprpp/thd.h" + namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; - + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Base class for call handlers. - class CallHandler { - public: - virtual ~CallHandler() = default; - virtual void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) = 0; - }; - - HealthCheckServiceImpl(DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq); - - ~HealthCheckServiceImpl(); - - void StartServingThread(); - + // Base class for call handlers. + class CallHandler { + public: + virtual ~CallHandler() = default; + virtual void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) = 0; + }; + + HealthCheckServiceImpl(DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq); + + ~HealthCheckServiceImpl(); + + void StartServingThread(); + private: - // A tag that can be called with a bool argument. It's tailored for - // CallHandler's use. Before being used, it should be constructed with a - // method of CallHandler and a shared pointer to the handler. The - // shared pointer will be moved to the invoked function and the function - // can only be invoked once. That makes ref counting of the handler easier, - // because the shared pointer is not bound to the function and can be gone - // once the invoked function returns (if not used any more). - class CallableTag { - public: - using HandlerFunction = - std::function<void(std::shared_ptr<CallHandler>, bool)>; - - CallableTag() {} - - CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) - : handler_function_(std::move(func)), handler_(std::move(handler)) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - } - - // Runs the tag. This should be called only once. The handler is no - // longer owned by this tag after this method is invoked. - void Run(bool ok) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - handler_function_(std::move(handler_), ok); - } - - // Releases and returns the shared pointer to the handler. - std::shared_ptr<CallHandler> ReleaseHandler() { - return std::move(handler_); - } - - private: - HandlerFunction handler_function_ = nullptr; - std::shared_ptr<CallHandler> handler_; - }; - - // Call handler for Check method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class CheckCallHandler : public CallHandler { - public: - // Instantiates a CheckCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // Not used for Check. + // A tag that can be called with a bool argument. It's tailored for + // CallHandler's use. Before being used, it should be constructed with a + // method of CallHandler and a shared pointer to the handler. The + // shared pointer will be moved to the invoked function and the function + // can only be invoked once. That makes ref counting of the handler easier, + // because the shared pointer is not bound to the function and can be gone + // once the invoked function returns (if not used any more). + class CallableTag { + public: + using HandlerFunction = + std::function<void(std::shared_ptr<CallHandler>, bool)>; + + CallableTag() {} + + CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) + : handler_function_(std::move(func)), handler_(std::move(handler)) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + } + + // Runs the tag. This should be called only once. The handler is no + // longer owned by this tag after this method is invoked. + void Run(bool ok) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + handler_function_(std::move(handler_), ok); + } + + // Releases and returns the shared pointer to the handler. + std::shared_ptr<CallHandler> ReleaseHandler() { + return std::move(handler_); + } + + private: + HandlerFunction handler_function_ = nullptr; + std::shared_ptr<CallHandler> handler_; + }; + + // Call handler for Check method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class CheckCallHandler : public CallHandler { + public: + // Instantiates a CheckCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // Not used for Check. void SendHealth(std::shared_ptr<CallHandler> /*self*/, ServingStatus /*status*/) override {} - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - GenericServerAsyncResponseWriter writer_; - ServerContext ctx_; - - CallableTag next_; - }; - - // Call handler for Watch method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class WatchCallHandler : public CallHandler { - public: - // Instantiates a WatchCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) override; - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Requires holding send_mu_. - void SendHealthLocked(std::shared_ptr<CallHandler> self, - ServingStatus status); - - // When sending a health result finishes. - void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); - - void SendFinish(std::shared_ptr<CallHandler> self, const Status& status); - - // Requires holding service_->cq_shutdown_mu_. - void SendFinishLocked(std::shared_ptr<CallHandler> self, - const Status& status); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // Called when AsyncNotifyWhenDone() notifies us. - void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + GenericServerAsyncResponseWriter writer_; + ServerContext ctx_; + + CallableTag next_; + }; + + // Call handler for Watch method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class WatchCallHandler : public CallHandler { + public: + // Instantiates a WatchCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) override; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Requires holding send_mu_. + void SendHealthLocked(std::shared_ptr<CallHandler> self, + ServingStatus status); + + // When sending a health result finishes. + void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); + + void SendFinish(std::shared_ptr<CallHandler> self, const Status& status); + + // Requires holding service_->cq_shutdown_mu_. + void SendFinishLocked(std::shared_ptr<CallHandler> self, + const Status& status); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // Called when AsyncNotifyWhenDone() notifies us. + void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; TString service_name_; - GenericServerAsyncWriter stream_; - ServerContext ctx_; - + GenericServerAsyncWriter stream_; + ServerContext ctx_; + grpc_core::Mutex send_mu_; - bool send_in_flight_ = false; // Guarded by mu_. - ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - - bool finish_called_ = false; - CallableTag next_; - CallableTag on_done_notified_; - CallableTag on_finish_done_; - }; - - // Handles the incoming requests and drives the completion queue in a loop. - static void Serve(void* arg); - - // Returns true on success. - static bool DecodeRequest(const ByteBuffer& request, + bool send_in_flight_ = false; // Guarded by mu_. + ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. + + bool finish_called_ = false; + CallableTag next_; + CallableTag on_done_notified_; + CallableTag on_finish_done_; + }; + + // Handles the incoming requests and drives the completion queue in a loop. + static void Serve(void* arg); + + // Returns true on success. + static bool DecodeRequest(const ByteBuffer& request, TString* service_name); - static bool EncodeResponse(ServingStatus status, ByteBuffer* response); - - // Needed to appease Windows compilers, which don't seem to allow - // nested classes to access protected members in the parent's - // superclass. - using Service::RequestAsyncServerStreaming; - using Service::RequestAsyncUnary; - - DefaultHealthCheckService* database_; - std::unique_ptr<ServerCompletionQueue> cq_; - - // To synchronize the operations related to shutdown state of cq_, so that - // we don't enqueue new tags into cq_ after it is already shut down. + static bool EncodeResponse(ServingStatus status, ByteBuffer* response); + + // Needed to appease Windows compilers, which don't seem to allow + // nested classes to access protected members in the parent's + // superclass. + using Service::RequestAsyncServerStreaming; + using Service::RequestAsyncUnary; + + DefaultHealthCheckService* database_; + std::unique_ptr<ServerCompletionQueue> cq_; + + // To synchronize the operations related to shutdown state of cq_, so that + // we don't enqueue new tags into cq_ after it is already shut down. grpc_core::Mutex cq_shutdown_mu_; - std::atomic_bool shutdown_{false}; - std::unique_ptr<::grpc_core::Thread> thread_; + std::atomic_bool shutdown_{false}; + std::unique_ptr<::grpc_core::Thread> thread_; }; DefaultHealthCheckService(); - + void SetServingStatus(const TString& service_name, bool serving) override; void SetServingStatus(bool serving) override; - - void Shutdown() override; - + + void Shutdown() override; + ServingStatus GetServingStatus(const TString& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq); - + HealthCheckServiceImpl* GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq); + private: - // Stores the current serving status of a service and any call - // handlers registered for updates when the service's status changes. - class ServiceData { - public: - void SetServingStatus(ServingStatus status); - ServingStatus GetServingStatus() const { return status_; } - void AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - void RemoveCallHandler( - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); - bool Unused() const { - return call_handlers_.empty() && status_ == NOT_FOUND; - } - - private: - ServingStatus status_ = NOT_FOUND; - std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> - call_handlers_; - }; - - void RegisterCallHandler( + // Stores the current serving status of a service and any call + // handlers registered for updates when the service's status changes. + class ServiceData { + public: + void SetServingStatus(ServingStatus status); + ServingStatus GetServingStatus() const { return status_; } + void AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + void RemoveCallHandler( + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); + bool Unused() const { + return call_handlers_.empty() && status_ == NOT_FOUND; + } + + private: + ServingStatus status_ = NOT_FOUND; + std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> + call_handlers_; + }; + + void RegisterCallHandler( const TString& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - - void UnregisterCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + + void UnregisterCallHandler( const TString& service_name, - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); - + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); + mutable grpc_core::Mutex mu_; bool shutdown_ = false; // Guarded by mu_. std::map<TString, ServiceData> services_map_; // Guarded by mu_. diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h index 00ad794a04..eabed9711e 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h @@ -1,81 +1,81 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H -#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H - -#include <grpc/impl/codegen/port_platform.h> - -namespace grpc { -namespace load_reporter { - -// TODO(juanlishen): Update the version number with the PR number every time -// there is any change to the server load reporter. -constexpr uint32_t kVersion = 15853; - -// TODO(juanlishen): This window size is from the internal spec for the load -// reporter. Need to ask the gRPC LB team whether we should make this and the -// fetching interval configurable. -constexpr uint32_t kFeedbackSampleWindowSeconds = 10; -constexpr uint32_t kFetchAndSampleIntervalSeconds = 1; - -constexpr size_t kLbIdLength = 8; -constexpr size_t kIpv4AddressLength = 8; -constexpr size_t kIpv6AddressLength = 32; - -constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>"; - -// Call statuses. - -constexpr char kCallStatusOk[] = "OK"; -constexpr char kCallStatusServerError[] = "5XX"; -constexpr char kCallStatusClientError[] = "4XX"; - -// Tag keys. - -constexpr char kTagKeyToken[] = "token"; -constexpr char kTagKeyHost[] = "host"; -constexpr char kTagKeyUserId[] = "user_id"; -constexpr char kTagKeyStatus[] = "status"; -constexpr char kTagKeyMetricName[] = "metric_name"; - -// Measure names. - -constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count"; -constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count"; -constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent"; -constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received"; -constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms"; -constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric"; - -// View names. - -constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count"; -constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count"; -constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent"; -constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received"; -constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms"; -constexpr char kViewOtherCallMetricCount[] = - "grpc.io/lb_view/other_call_metric_count"; -constexpr char kViewOtherCallMetricValue[] = - "grpc.io/lb_view/other_call_metric_value"; - -} // namespace load_reporter -} // namespace grpc - -#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H +#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H + +#include <grpc/impl/codegen/port_platform.h> + +namespace grpc { +namespace load_reporter { + +// TODO(juanlishen): Update the version number with the PR number every time +// there is any change to the server load reporter. +constexpr uint32_t kVersion = 15853; + +// TODO(juanlishen): This window size is from the internal spec for the load +// reporter. Need to ask the gRPC LB team whether we should make this and the +// fetching interval configurable. +constexpr uint32_t kFeedbackSampleWindowSeconds = 10; +constexpr uint32_t kFetchAndSampleIntervalSeconds = 1; + +constexpr size_t kLbIdLength = 8; +constexpr size_t kIpv4AddressLength = 8; +constexpr size_t kIpv6AddressLength = 32; + +constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>"; + +// Call statuses. + +constexpr char kCallStatusOk[] = "OK"; +constexpr char kCallStatusServerError[] = "5XX"; +constexpr char kCallStatusClientError[] = "4XX"; + +// Tag keys. + +constexpr char kTagKeyToken[] = "token"; +constexpr char kTagKeyHost[] = "host"; +constexpr char kTagKeyUserId[] = "user_id"; +constexpr char kTagKeyStatus[] = "status"; +constexpr char kTagKeyMetricName[] = "metric_name"; + +// Measure names. + +constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count"; +constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count"; +constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent"; +constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received"; +constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms"; +constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric"; + +// View names. + +constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count"; +constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count"; +constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent"; +constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received"; +constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms"; +constexpr char kViewOtherCallMetricCount[] = + "grpc.io/lb_view/other_call_metric_count"; +constexpr char kViewOtherCallMetricValue[] = + "grpc.io/lb_view/other_call_metric_value"; + +} // namespace load_reporter +} // namespace grpc + +#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h index f514b0752f..8544b05417 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h @@ -1,36 +1,36 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H -#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H - -#include <grpc/impl/codegen/port_platform.h> - -#include <utility> - -namespace grpc { -namespace load_reporter { - -// Reads the CPU stats (in a pair of busy and total numbers) from the system. -// The units of the stats should be the same. -std::pair<uint64_t, uint64_t> GetCpuStatsImpl(); - -} // namespace load_reporter -} // namespace grpc - -#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H +#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H + +#include <grpc/impl/codegen/port_platform.h> + +#include <utility> + +namespace grpc { +namespace load_reporter { + +// Reads the CPU stats (in a pair of busy and total numbers) from the system. +// The units of the stats should be the same. +std::pair<uint64_t, uint64_t> GetCpuStatsImpl(); + +} // namespace load_reporter +} // namespace grpc + +#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc index 561d4f5048..8565d384a8 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc @@ -1,48 +1,48 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_LINUX - -#include <cstdio> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - FILE* fp; - fp = fopen("/proc/stat", "r"); - uint64_t user, nice, system, idle; +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_LINUX + +#include <cstdio> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + FILE* fp; + fp = fopen("/proc/stat", "r"); + uint64_t user, nice, system, idle; if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { // Something bad happened with the information, so assume it's all invalid user = nice = system = idle = 0; } - fclose(fp); - busy = user + nice + system; - total = busy + idle; - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // GPR_LINUX + fclose(fp); + busy = user + nice + system; + total = busy + idle; + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // GPR_LINUX diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc index dbdde304c2..125631a3d1 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc @@ -1,45 +1,45 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_APPLE - -#include <mach/mach.h> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - host_cpu_load_info_data_t cpuinfo; - mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT; - if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, - (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) { - for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i]; - busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE]; - } - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // GPR_APPLE +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_APPLE + +#include <mach/mach.h> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + host_cpu_load_info_data_t cpuinfo; + mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT; + if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, + (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) { + for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i]; + busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE]; + } + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // GPR_APPLE diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc index 80fb8b6da1..e2d61859c8 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc @@ -1,40 +1,40 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) - -#include <grpc/support/log.h> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - gpr_log(GPR_ERROR, - "Platforms other than Linux, Windows, and MacOS are not supported."); - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) + +#include <grpc/support/log.h> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + gpr_log(GPR_ERROR, + "Platforms other than Linux, Windows, and MacOS are not supported."); + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc index 0a98e848a2..bc5718a056 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc @@ -1,55 +1,55 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_WINDOWS - -#include <windows.h> -#include <cstdint> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -namespace { - -uint64_t FiletimeToInt(const FILETIME& ft) { - ULARGE_INTEGER i; - i.LowPart = ft.dwLowDateTime; - i.HighPart = ft.dwHighDateTime; - return i.QuadPart; -} - -} // namespace - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - FILETIME idle, kernel, user; - if (GetSystemTimes(&idle, &kernel, &user) != 0) { - total = FiletimeToInt(kernel) + FiletimeToInt(user); - busy = total - FiletimeToInt(idle); - } - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // GPR_WINDOWS +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINDOWS + +#include <windows.h> +#include <cstdint> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +namespace { + +uint64_t FiletimeToInt(const FILETIME& ft) { + ULARGE_INTEGER i; + i.LowPart = ft.dwLowDateTime; + i.HighPart = ft.dwHighDateTime; + return i.QuadPart; +} + +} // namespace + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + FILETIME idle, kernel, user; + if (GetSystemTimes(&idle, &kernel, &user) != 0) { + total = FiletimeToInt(kernel) + FiletimeToInt(user); + busy = total - FiletimeToInt(idle); + } + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // GPR_WINDOWS diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc index f07fa812a7..12e8203fe1 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc @@ -16,15 +16,15 @@ * */ -#include <grpc/impl/codegen/port_platform.h> - -#include <stdio.h> +#include <grpc/impl/codegen/port_platform.h> + +#include <stdio.h> #include <cstdlib> #include <set> #include <unordered_map> #include <vector> -#include "src/core/lib/iomgr/socket_utils.h" +#include "src/core/lib/iomgr/socket_utils.h" #include "src/cpp/server/load_reporter/load_data_store.h" namespace grpc { @@ -79,65 +79,65 @@ const typename C::value_type* RandomElement(const C& container) { LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token, TString user_id) - : user_id_(std::move(user_id)) { - GPR_ASSERT(client_ip_and_token.size() >= 2); - int ip_hex_size; - GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", - &ip_hex_size) == 1); - GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength || - ip_hex_size == kIpv6AddressLength); - size_t cur_pos = 2; - client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size); - cur_pos += ip_hex_size; - if (client_ip_and_token.size() - cur_pos < kLbIdLength) { - lb_id_ = kInvalidLbId; - lb_tag_ = ""; - } else { - lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength); - lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength); - } -} - + : user_id_(std::move(user_id)) { + GPR_ASSERT(client_ip_and_token.size() >= 2); + int ip_hex_size; + GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", + &ip_hex_size) == 1); + GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength || + ip_hex_size == kIpv6AddressLength); + size_t cur_pos = 2; + client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size); + cur_pos += ip_hex_size; + if (client_ip_and_token.size() - cur_pos < kLbIdLength) { + lb_id_ = kInvalidLbId; + lb_tag_ = ""; + } else { + lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength); + lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength); + } +} + TString LoadRecordKey::GetClientIpBytes() const { - if (client_ip_hex_.empty()) { - return ""; - } else if (client_ip_hex_.size() == kIpv4AddressLength) { - uint32_t ip_bytes; - if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) { - gpr_log(GPR_ERROR, - "Can't parse client IP (%s) from a hex string to an integer.", - client_ip_hex_.c_str()); - return ""; - } - ip_bytes = grpc_htonl(ip_bytes); + if (client_ip_hex_.empty()) { + return ""; + } else if (client_ip_hex_.size() == kIpv4AddressLength) { + uint32_t ip_bytes; + if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) { + gpr_log(GPR_ERROR, + "Can't parse client IP (%s) from a hex string to an integer.", + client_ip_hex_.c_str()); + return ""; + } + ip_bytes = grpc_htonl(ip_bytes); return TString(reinterpret_cast<const char*>(&ip_bytes), sizeof(ip_bytes)); - } else if (client_ip_hex_.size() == kIpv6AddressLength) { - uint32_t ip_bytes[4]; - for (size_t i = 0; i < 4; ++i) { - if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x", - ip_bytes + i) != 1) { - gpr_log( - GPR_ERROR, - "Can't parse client IP part (%s) from a hex string to an integer.", - client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str()); - return ""; - } - ip_bytes[i] = grpc_htonl(ip_bytes[i]); - } + } else if (client_ip_hex_.size() == kIpv6AddressLength) { + uint32_t ip_bytes[4]; + for (size_t i = 0; i < 4; ++i) { + if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x", + ip_bytes + i) != 1) { + gpr_log( + GPR_ERROR, + "Can't parse client IP part (%s) from a hex string to an integer.", + client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str()); + return ""; + } + ip_bytes[i] = grpc_htonl(ip_bytes[i]); + } return TString(reinterpret_cast<const char*>(ip_bytes), sizeof(ip_bytes)); - } else { - GPR_UNREACHABLE_CODE(return ""); - } -} - + } else { + GPR_UNREACHABLE_CODE(return ""); + } +} + LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls, - double total_metric_value) { - call_metrics_.emplace(std::move(metric_name), - CallMetricValue(num_calls, total_metric_value)); -} - + double total_metric_value) { + call_metrics_.emplace(std::move(metric_name), + CallMetricValue(num_calls, total_metric_value)); +} + void PerBalancerStore::MergeRow(const LoadRecordKey& key, const LoadRecordValue& value) { // During suspension, the load data received will be dropped. diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h index 61ba618331..7047488c0e 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h @@ -28,8 +28,8 @@ #include <grpc/support/log.h> #include <grpcpp/impl/codegen/config.h> -#include "src/cpp/server/load_reporter/constants.h" - +#include "src/cpp/server/load_reporter/constants.h" + #include <util/string/cast.h> namespace grpc { @@ -76,9 +76,9 @@ class LoadRecordKey { user_id_(std::move(user_id)), client_ip_hex_(std::move(client_ip_hex)) {} - // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. + // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. LoadRecordKey(const TString& client_ip_and_token, TString user_id); - + TString ToString() const { return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ + ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ + @@ -90,9 +90,9 @@ class LoadRecordKey { user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_; } - // Gets the client IP bytes in network order (i.e., big-endian). + // Gets the client IP bytes in network order (i.e., big-endian). TString GetClientIpBytes() const; - + // Getters. const TString& lb_id() const { return lb_id_; } const TString& lb_tag() const { return lb_tag_; } @@ -126,8 +126,8 @@ class LoadRecordKey { class LoadRecordValue { public: explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0, - uint64_t error_count = 0, uint64_t bytes_sent = 0, - uint64_t bytes_recv = 0, uint64_t latency_ms = 0) + uint64_t error_count = 0, uint64_t bytes_sent = 0, + uint64_t bytes_recv = 0, uint64_t latency_ms = 0) : start_count_(start_count), ok_count_(ok_count), error_count_(error_count), @@ -136,8 +136,8 @@ class LoadRecordValue { latency_ms_(latency_ms) {} LoadRecordValue(TString metric_name, uint64_t num_calls, - double total_metric_value); - + double total_metric_value); + void MergeFrom(const LoadRecordValue& other) { start_count_ += other.start_count_; ok_count_ += other.ok_count_; @@ -175,9 +175,9 @@ class LoadRecordValue { uint64_t start_count() const { return start_count_; } uint64_t ok_count() const { return ok_count_; } uint64_t error_count() const { return error_count_; } - uint64_t bytes_sent() const { return bytes_sent_; } - uint64_t bytes_recv() const { return bytes_recv_; } - uint64_t latency_ms() const { return latency_ms_; } + uint64_t bytes_sent() const { return bytes_sent_; } + uint64_t bytes_recv() const { return bytes_recv_; } + uint64_t latency_ms() const { return latency_ms_; } const std::unordered_map<TString, CallMetricValue>& call_metrics() const { return call_metrics_; } @@ -186,9 +186,9 @@ class LoadRecordValue { uint64_t start_count_ = 0; uint64_t ok_count_ = 0; uint64_t error_count_ = 0; - uint64_t bytes_sent_ = 0; - uint64_t bytes_recv_ = 0; - uint64_t latency_ms_ = 0; + uint64_t bytes_sent_ = 0; + uint64_t bytes_recv_ = 0; + uint64_t latency_ms_ = 0; std::unordered_map<TString, CallMetricValue> call_metrics_; }; diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc index 24ad9f3f24..a57ddc4715 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc @@ -1,47 +1,47 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/impl/codegen/port_platform.h> - -#include <grpcpp/ext/server_load_reporting.h> - -#include <cmath> - -#include <grpc/support/log.h> - +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include <grpcpp/ext/server_load_reporting.h> + +#include <cmath> + +#include <grpc/support/log.h> + namespace grpc { -namespace load_reporter { -namespace experimental { - -void AddLoadReportingCost(grpc::ServerContext* ctx, +namespace load_reporter { +namespace experimental { + +void AddLoadReportingCost(grpc::ServerContext* ctx, const TString& cost_name, double cost_value) { - if (std::isnormal(cost_value)) { + if (std::isnormal(cost_value)) { TString buf; - buf.resize(sizeof(cost_value) + cost_name.size()); - memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value)); - memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(), - cost_name.size()); - ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf); - } else { - gpr_log(GPR_ERROR, "Call metric value is not normal."); - } -} - -} // namespace experimental -} // namespace load_reporter + buf.resize(sizeof(cost_value) + cost_name.size()); + memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value)); + memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(), + cost_name.size()); + ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf); + } else { + gpr_log(GPR_ERROR, "Call metric value is not normal."); + } +} + +} // namespace experimental +} // namespace load_reporter } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index 732602bcb7..3b73453515 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -44,10 +44,10 @@ void AuthMetadataProcessorAyncWrapper::Process( return; } if (w->processor_->IsBlocking()) { - w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { - w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, - cb, user_data); - }); + w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { + w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, + cb, user_data); + }); } else { // invoke directly. w->InvokeProcessor(context, md, num_md, cb, user_data); @@ -62,7 +62,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key), StringRefFromSlice(&md[i].value))); } - SecureAuthContext context(ctx); + SecureAuthContext context(ctx); AuthMetadataProcessor::OutputMetadata consumed_metadata; AuthMetadataProcessor::OutputMetadata response_metadata; @@ -138,12 +138,12 @@ std::shared_ptr<ServerCredentials> AltsServerCredentials( new SecureServerCredentials(c_creds)); } -std::shared_ptr<ServerCredentials> LocalServerCredentials( - grpc_local_connect_type type) { - return std::shared_ptr<ServerCredentials>( - new SecureServerCredentials(grpc_local_server_credentials_create(type))); -} - +std::shared_ptr<ServerCredentials> LocalServerCredentials( + grpc_local_connect_type type) { + return std::shared_ptr<ServerCredentials>( + new SecureServerCredentials(grpc_local_server_credentials_create(type))); +} + std::shared_ptr<ServerCredentials> TlsServerCredentials( const grpc::experimental::TlsCredentialsOptions& options) { grpc::GrpcLibraryCodegen init; diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 0cc00b365f..ec5d4eec8c 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -24,8 +24,8 @@ #include <grpcpp/resource_quota.h> #include <grpcpp/server.h> -#include <utility> - +#include <utility> + #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" @@ -44,8 +44,8 @@ static void do_plugin_list_init(void) { } ServerBuilder::ServerBuilder() - : max_receive_message_size_(INT_MIN), - max_send_message_size_(INT_MIN), + : max_receive_message_size_(INT_MIN), + max_send_message_size_(INT_MIN), sync_server_settings_(SyncServerSettings()), resource_quota_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); @@ -71,9 +71,9 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue( - GRPC_CQ_NEXT, - is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, - nullptr); + GRPC_CQ_NEXT, + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, + nullptr); cqs_.push_back(cq); return std::unique_ptr<grpc::ServerCompletionQueue>(cq); } @@ -180,7 +180,7 @@ ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus( ServerBuilder& ServerBuilder::SetDefaultCompressionLevel( grpc_compression_level level) { - maybe_default_compression_level_.is_set = true; + maybe_default_compression_level_.is_set = true; maybe_default_compression_level_.level = level; return *this; } @@ -212,14 +212,14 @@ ServerBuilder& ServerBuilder::AddListeningPort( while (addr_uri[pos] == '/') ++pos; // Skip slashes. addr = addr_uri.substr(pos); } - Port port = {addr, std::move(creds), selected_port}; + Port port = {addr, std::move(creds), selected_port}; ports_.push_back(port); return *this; } std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { grpc::ChannelArguments args; - if (max_receive_message_size_ >= -1) { + if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } if (max_send_message_size_ >= -1) { @@ -306,14 +306,14 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back( + sync_server_cqs->emplace_back( new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } - // TODO(vjpai): Add a section here for plugins once they can support callback - // methods - + // TODO(vjpai): Add a section here for plugins once they can support callback + // methods + if (has_sync_methods) { // This is a Sync server gpr_log(GPR_INFO, @@ -324,16 +324,16 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); } - if (has_callback_methods) { - gpr_log(GPR_INFO, "Callback server."); - } - + if (has_callback_methods) { + gpr_log(GPR_INFO, "Callback server."); + } + std::unique_ptr<grpc::Server> server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, std::move(acceptors_), resource_quota_, std::move(interceptor_creators_))); - + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e @@ -347,10 +347,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } if (has_callback_methods || callback_generic_service_ != nullptr) { - auto* cq = server->CallbackCQ(); - grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); - } - + auto* cq = server->CallbackCQ(); + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + } + // cqs_ contains the completion queue added by calling the ServerBuilder's // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index c2a911c7f7..56bf75d730 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -30,10 +30,10 @@ #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/method_handler.h> -#include <grpcpp/impl/codegen/server_interceptor.h> +#include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_service_method.h> #include <grpcpp/impl/server_initializer.h> @@ -43,10 +43,10 @@ #include <grpcpp/support/time.h> #include "src/core/ext/transport/inproc/inproc_transport.h" -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/external_connection_acceptor_impl.h" @@ -58,13 +58,13 @@ namespace grpc { namespace { -// The default value for maximum number of threads that can be created in the -// sync server. This value of INT_MAX is chosen to match the default behavior if -// no ResourceQuota is set. To modify the max number of threads in a sync -// server, pass a custom ResourceQuota object (with the desired number of -// max-threads set) to the server builder. -#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX - +// The default value for maximum number of threads that can be created in the +// sync server. This value of INT_MAX is chosen to match the default behavior if +// no ResourceQuota is set. To modify the max number of threads in a sync +// server, pass a custom ResourceQuota object (with the desired number of +// max-threads set) to the server builder. +#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -319,10 +319,10 @@ class Server::UnimplementedAsyncResponse final grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { - delete this; - } else { - // The tag was swallowed due to interception. We will see it again. - } + delete this; + } else { + // The tag was swallowed due to interception. We will see it again. + } return false; } @@ -334,7 +334,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), - method_tag_(method_tag), + method_tag_(method_tag), in_flight_(false), has_request_payload_(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || @@ -362,11 +362,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - if (method_tag_) { - if (grpc_server_request_registered_call( - server, method_tag_, &call_, &deadline_, &request_metadata_, + if (method_tag_) { + if (grpc_server_request_registered_call( + server, method_tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this) != GRPC_CALL_OK) { + notify_cq, this) != GRPC_CALL_OK) { TeardownRequest(); return; } @@ -384,21 +384,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - void PostShutdownCleanup() { - if (call_) { - grpc_call_unref(call_); - call_ = nullptr; - } - if (cq_) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; - } - } - + void PostShutdownCleanup() { + if (call_) { + grpc_call_unref(call_); + call_ = nullptr; + } + if (cq_) { + grpc_completion_queue_destroy(cq_); + cq_ = nullptr; + } + } + bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + cq_ = nullptr; } if (call_details_) { deadline_ = call_details_->deadline; @@ -408,26 +408,26 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { return true; } - // The CallData class represents a call that is "active" as opposed - // to just being requested. It wraps and takes ownership of the cq from - // the call request + // The CallData class represents a call that is "active" as opposed + // to just being requested. It wraps and takes ownership of the cq from + // the call request class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - request_(nullptr), - method_(mrd->method_), - call_( - mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), method_->method_type(), - server->interceptor_creators_)), - server_(server), - global_callbacks_(nullptr), - resources_(false) { + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), + request_(nullptr), + method_(mrd->method_), + call_( + mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(method_->name(), method_->method_type(), + server->interceptor_creators_)), + server_(server), + global_callbacks_(nullptr), + resources_(false) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -441,82 +441,82 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, - bool resources) { - global_callbacks_ = global_callbacks; - resources_ = resources; + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { + global_callbacks_ = global_callbacks; + resources_ = resources; - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); - if (has_request_payload_) { - // Set interception point for RECV MESSAGE - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - request_ = handler->Deserialize(call_.call(), request_payload_, + if (has_request_payload_) { + // Set interception point for RECV MESSAGE + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + request_ = handler->Deserialize(call_.call(), request_payload_, &request_status_, nullptr); - request_payload_ = nullptr; - interceptor_methods_.AddInterceptionHookPoint( + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_, nullptr); - } - - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } + interceptor_methods_.SetRecvMessage(request_, nullptr); + } + + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } } - void ContinueRunAfterInterception() { - { - ctx_.BeginCompletionOp(&call_, nullptr, nullptr); - global_callbacks_->PreSynchronousRequest(&ctx_); - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); + void ContinueRunAfterInterception() { + { + ctx_.BeginCompletionOp(&call_, nullptr, nullptr); + global_callbacks_->PreSynchronousRequest(&ctx_); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_, request_status_, nullptr, nullptr)); - request_ = nullptr; - global_callbacks_->PostSynchronousRequest(&ctx_); - - cq_.Shutdown(); - + request_ = nullptr; + global_callbacks_->PostSynchronousRequest(&ctx_); + + cq_.Shutdown(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - - /* Ensure the cq_ is shutdown */ + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + + /* Ensure the cq_ is shutdown */ grpc::DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - } - delete this; - } - + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + } + delete this; + } + private: grpc::CompletionQueue cq_; grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - void* request_; + void* request_; grpc::Status request_status_; grpc::internal::RpcServiceMethod* const method_; grpc::internal::Call call_; - Server* server_; - std::shared_ptr<GlobalCallbacks> global_callbacks_; - bool resources_; + Server* server_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: grpc::internal::RpcServiceMethod* const method_; - void* const method_tag_; + void* const method_tag_; bool in_flight_; const bool has_request_payload_; grpc_call* call_; @@ -541,19 +541,19 @@ class Server::CallbackRequest final CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, grpc::CompletionQueue* cq, grpc_core::Server::RegisteredCallAllocation* data) - : server_(server), - method_(method), + : server_(server), + method_(method), has_request_payload_(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), cq_(cq), - tag_(this) { + tag_(this) { CommonSetup(server, data); data->deadline = &deadline_; data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; - } - + } + // For generic services, method is nullptr since these services don't have // pre-defined methods. CallbackRequest(Server* server, grpc::CompletionQueue* cq, @@ -567,8 +567,8 @@ class Server::CallbackRequest final CommonSetup(server, data); grpc_call_details_init(call_details_); data->details = call_details_; - } - + } + ~CallbackRequest() { delete call_details_; grpc_metadata_array_destroy(&request_metadata_); @@ -576,21 +576,21 @@ class Server::CallbackRequest final grpc_byte_buffer_destroy(request_payload_); } server_->UnrefWithPossibleNotify(); - } - + } + // Needs specialization to account for different processing of metadata // in generic API bool FinalizeResult(void** tag, bool* status) override; - - private: + + private: // method_name needs to be specialized between named method and generic const char* method_name() const; - class CallbackCallTag : public grpc_experimental_completion_queue_functor { - public: + class CallbackCallTag : public grpc_experimental_completion_queue_functor { + public: CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) : req_(req) { - functor_run = &CallbackCallTag::StaticRun; + functor_run = &CallbackCallTag::StaticRun; // Set inlineable to true since this callback is internally-controlled // without taking any locks, and thus does not need to be run from the // executor (which triggers a thread hop). This should only be used by @@ -598,42 +598,42 @@ class Server::CallbackRequest final // here is actually non-trivial, but there is no chance of having user // locks conflict with each other so it's ok to run inlined. inlineable = true; - } - - // force_run can not be performed on a tag if operations using this tag - // have been sent to PerformOpsOnCall. It is intended for error conditions - // that are detected before the operations are internally processed. - void force_run(bool ok) { Run(ok); } - - private: + } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: Server::CallbackRequest<ServerContextType>* req_; grpc::internal::Call* call_; - - static void StaticRun(grpc_experimental_completion_queue_functor* cb, - int ok) { - static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); - } - void Run(bool ok) { - void* ignored = req_; - bool new_ok = ok; - GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == req_); - - if (!ok) { - // The call has been shutdown. - // Delete its contents to free up the request. - delete req_; - return; - } - - // Bind the call, deadline, and metadata from what we got - req_->ctx_.set_call(req_->call_); - req_->ctx_.cq_ = req_->cq_; - req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, - &req_->request_metadata_); - req_->request_metadata_.count = 0; - - // Create a C++ Call to control the underlying core call + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = req_; + bool new_ok = ok; + GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == req_); + + if (!ok) { + // The call has been shutdown. + // Delete its contents to free up the request. + delete req_; + return; + } + + // Bind the call, deadline, and metadata from what we got + req_->ctx_.set_call(req_->call_); + req_->ctx_.cq_ = req_->cq_; + req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); + req_->request_metadata_.count = 0; + + // Create a C++ Call to control the underlying core call call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) grpc::internal::Call( @@ -645,71 +645,71 @@ class Server::CallbackRequest final ? req_->method_->method_type() : grpc::internal::RpcMethod::BIDI_STREAMING, req_->server_->interceptor_creators_)); - - req_->interceptor_methods_.SetCall(call_); - req_->interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - req_->interceptor_methods_.AddInterceptionHookPoint( + + req_->interceptor_methods_.SetCall(call_); + req_->interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); - req_->interceptor_methods_.SetRecvInitialMetadata( - &req_->ctx_.client_metadata_); - - if (req_->has_request_payload_) { - // Set interception point for RECV MESSAGE - req_->request_ = req_->method_->handler()->Deserialize( + req_->interceptor_methods_.SetRecvInitialMetadata( + &req_->ctx_.client_metadata_); + + if (req_->has_request_payload_) { + // Set interception point for RECV MESSAGE + req_->request_ = req_->method_->handler()->Deserialize( req_->call_, req_->request_payload_, &req_->request_status_, &req_->handler_data_); - req_->request_payload_ = nullptr; - req_->interceptor_methods_.AddInterceptionHookPoint( + req_->request_payload_ = nullptr; + req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); - } - - if (req_->interceptor_methods_.RunInterceptors( - [this] { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } - } - void ContinueRunAfterInterception() { + req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); + } + + if (req_->interceptor_methods_.RunInterceptors( + [this] { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } + void ContinueRunAfterInterception() { auto* handler = (req_->method_ != nullptr) ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); - } - }; - + } + }; + template <class CallAllocation> void CommonSetup(Server* server, CallAllocation* data) { server->Ref(); - grpc_metadata_array_init(&request_metadata_); + grpc_metadata_array_init(&request_metadata_); data->tag = &tag_; data->call = &call_; data->initial_metadata = &request_metadata_; - } - - Server* const server_; + } + + Server* const server_; grpc::internal::RpcServiceMethod* const method_; - const bool has_request_payload_; + const bool has_request_payload_; grpc_byte_buffer* request_payload_ = nullptr; void* request_ = nullptr; void* handler_data_ = nullptr; grpc::Status request_status_; grpc_call_details* const call_details_ = nullptr; - grpc_call* call_; - gpr_timespec deadline_; - grpc_metadata_array request_metadata_; + grpc_call* call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; - CallbackCallTag tag_; + CallbackCallTag tag_; ServerContextType ctx_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; -}; - +}; + template <> bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( void** /*tag*/, bool* /*status*/) { @@ -750,13 +750,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - grpc_resource_quota* rq, int min_pollers, - int max_pollers, int cq_timeout_msec) - : ThreadManager("SyncServer", rq, min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), - global_callbacks_(std::move(global_callbacks)) {} + global_callbacks_(std::move(global_callbacks)) {} WorkStatus PollForWork(void** tag, bool* ok) override { *tag = nullptr; @@ -778,7 +778,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok, bool resources) override { + void DoWork(void* tag, bool ok, bool resources) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -789,9 +789,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } if (ok) { - // Calldata takes ownership of the completion queue and interceptors - // inside sync_req - auto* cd = new SyncRequest::CallData(server_, sync_req); + // Calldata takes ownership of the completion queue and interceptors + // inside sync_req + auto* cd = new SyncRequest::CallData(server_, sync_req); // Prepare for the next request if (!IsShutdown()) { sync_req->SetupRequest(); // Create new completion queue for sync_req @@ -799,7 +799,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd->Run(global_callbacks_, resources); + cd->Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -831,17 +831,17 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void* tag; bool ok; while (server_cq_->Next(&tag, &ok)) { - if (ok) { - // If a request was pulled off the queue, it means that the thread - // handling the request added it to the completion queue after shutdown - // was called - because the thread had already started and checked the - // shutdown flag before shutdown was called. In this case, we simply - // clean it up here, *after* calling wait on all the worker threads, at - // which point we are certain no in-flight requests will add more to the - // queue. This fixes an intermittent memory leak on shutdown. - SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - sync_req->PostShutdownCleanup(); - } + if (ok) { + // If a request was pulled off the queue, it means that the thread + // handling the request added it to the completion queue after shutdown + // was called - because the thread had already started and checked the + // shutdown flag before shutdown was called. In this case, we simply + // clean it up here, *after* calling wait on all the worker threads, at + // which point we are certain no in-flight requests will add more to the + // queue. This fixes an intermittent memory leak on shutdown. + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + sync_req->PostShutdownCleanup(); + } } } @@ -870,17 +870,17 @@ Server::Server( grpc::ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, + int min_pollers, int max_pollers, int sync_cq_timeout_msec, std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors, - grpc_resource_quota* server_rq, - std::vector< + grpc_resource_quota* server_rq, + std::vector< std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> - interceptor_creators) + interceptor_creators) : acceptors_(std::move(acceptors)), interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(INT_MIN), - sync_server_cqs_(std::move(sync_server_cqs)), + sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), shutdown_notified_(false), @@ -893,23 +893,23 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { - bool default_rq_created = false; - if (server_rq == nullptr) { - server_rq = grpc_resource_quota_create("SyncServer-default-rq"); - grpc_resource_quota_set_max_threads(server_rq, - DEFAULT_MAX_SYNC_SERVER_THREADS); - default_rq_created = true; - } - + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-default-rq"); + grpc_resource_quota_set_max_threads(server_rq, + DEFAULT_MAX_SYNC_SERVER_THREADS); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, server_rq, min_pollers, - max_pollers, sync_cq_timeout_msec)); - } - - if (default_rq_created) { - grpc_resource_quota_unref(server_rq); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); + } } for (auto& acceptor : acceptors_) { @@ -974,24 +974,24 @@ std::shared_ptr<grpc::Channel> Server::InProcessChannel( const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( - "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), + "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), std::vector<std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<grpc::Channel> -Server::experimental_type::InProcessChannelWithInterceptors( +Server::experimental_type::InProcessChannelWithInterceptors( const grpc::ChannelArguments& args, - std::vector< + std::vector< std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { - grpc_channel_args channel_args = args.c_channel_args(); + interceptor_creators) { + grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( - "inproc", - grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), - std::move(interceptor_creators)); -} - + "inproc", + grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), + std::move(interceptor_creators)); +} + static grpc_server_register_method_payload_handling PayloadHandlingForMethod( grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { @@ -1014,29 +1014,29 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { } const char* method_name = nullptr; - + for (const auto& method : service->methods_) { if (method.get() == nullptr) { // Handled by generic service if any. continue; } - void* method_registration_tag = grpc_server_register_method( + void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method.get()), 0); - if (method_registration_tag == nullptr) { + if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - if (method->handler() == nullptr) { // Async method without handler - method->set_server_tag(method_registration_tag); - } else if (method->api_type() == + if (method->handler() == nullptr) { // Async method without handler + method->set_server_tag(method_registration_tag); + } else if (method->api_type() == grpc::internal::RpcServiceMethod::ApiType::SYNC) { for (const auto& value : sync_req_mgrs_) { value->AddSyncMethod(method.get(), method_registration_tag); } - } else { + } else { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); @@ -1130,23 +1130,23 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // explicit one. grpc::ServerCompletionQueue* health_check_cq = nullptr; grpc::DefaultHealthCheckService::HealthCheckServiceImpl* - default_health_check_service_impl = nullptr; + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && grpc::DefaultHealthCheckServiceEnabled()) { auto* default_hc_service = new grpc::DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - // We create a non-polling CQ to avoid impacting application - // performance. This ensures that we don't introduce thread hops - // for application requests that wind up on this CQ, which is polled - // in its own thread. + health_check_service_.reset(default_hc_service); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. health_check_cq = new grpc::ServerCompletionQueue( GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); - grpc_server_register_completion_queue(server_, health_check_cq->cq(), - nullptr); - default_health_check_service_impl = - default_hc_service->GetHealthCheckService( + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); - RegisterService(nullptr, default_health_check_service_impl); + RegisterService(nullptr, default_health_check_service_impl); } for (auto& acceptor : acceptors_) { @@ -1179,26 +1179,26 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } - if (health_check_cq != nullptr) { - new UnimplementedAsyncRequest(this, health_check_cq); - } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } } - // If this server has any support for synchronous methods (has any sync - // server CQs), make sure that we have a ResourceExhausted handler - // to deal with the case of thread exhaustion - if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { + // If this server has any support for synchronous methods (has any sync + // server CQs), make sure that we have a ResourceExhausted handler + // to deal with the case of thread exhaustion + if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { resource_exhausted_handler_.reset( new grpc::internal::ResourceExhaustedHandler); - } - + } + for (const auto& value : sync_req_mgrs_) { value->Start(); } - - if (default_health_check_service_impl != nullptr) { - default_health_check_service_impl->StartServingThread(); - } + + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } for (auto& acceptor : acceptors_) { acceptor->Start(); @@ -1207,50 +1207,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { void Server::ShutdownInternal(gpr_timespec deadline) { grpc::internal::MutexLock lock(&mu_); - if (shutdown_) { - return; - } + if (shutdown_) { + return; + } - shutdown_ = true; + shutdown_ = true; for (auto& acceptor : acceptors_) { acceptor->Shutdown(); } - /// The completion queue to use for server shutdown completion notification + /// The completion queue to use for server shutdown completion notification grpc::CompletionQueue shutdown_cq; grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag - grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - shutdown_cq.Shutdown(); + shutdown_cq.Shutdown(); - void* tag; - bool ok; + void* tag; + bool ok; grpc::CompletionQueue::NextStatus status = - shutdown_cq.AsyncNext(&tag, &ok, deadline); + shutdown_cq.AsyncNext(&tag, &ok, deadline); - // If this timed out, it means we are done with the grace period for a clean - // shutdown. We should force a shutdown now by cancelling all inflight calls + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { - grpc_server_cancel_all_calls(server_); - } - // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has - // successfully shutdown + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown - // Shutdown all ThreadManagers. This will try to gracefully stop all the - // threads in the ThreadManagers (once they process any inflight requests) + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) for (const auto& value : sync_req_mgrs_) { value->Shutdown(); // ThreadManager's Shutdown() - } + } - // Wait for threads in all ThreadManagers to terminate + // Wait for threads in all ThreadManagers to terminate for (const auto& value : sync_req_mgrs_) { value->Wait(); - } + } // Drop the shutdown ref and wait for all other refs to drop as well. UnrefAndWaitLocked(); - + // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. if (callback_cq_ != nullptr) { @@ -1258,13 +1258,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { callback_cq_ = nullptr; } - // Drain the shutdown queue (if the previous call to AsyncNext() timed out - // and we didn't remove the tag from the queue yet) - while (shutdown_cq.Next(&tag, &ok)) { - // Nothing to be done here. Just ignore ok and tag values - } - - shutdown_notified_ = true; + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here. Just ignore ok and tag values + } + + shutdown_notified_ = true; shutdown_cv_.Broadcast(); #ifndef NDEBUG @@ -1286,23 +1286,23 @@ void Server::Wait() { void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, grpc::internal::Call* call) { - ops->FillOps(call); + ops->FillOps(call); } bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (GenericAsyncRequest::FinalizeResult(tag, status)) { - // We either had no interceptors run or we are done intercepting - if (*status) { + if (GenericAsyncRequest::FinalizeResult(tag, status)) { + // We either had no interceptors run or we are done intercepting + if (*status) { // Create a new request/response pair using the server and CQ values // stored in this object's base class. new UnimplementedAsyncRequest(server_, notification_cq_); - new UnimplementedAsyncResponse(this); - } else { - delete this; - } + new UnimplementedAsyncResponse(this); + } else { + delete this; + } } else { - // The tag was swallowed due to interception. We will see it again. + // The tag was swallowed due to interception. We will see it again. } return false; } @@ -1320,8 +1320,8 @@ grpc::ServerInitializer* Server::initializer() { } grpc::CompletionQueue* Server::CallbackCQ() { - // TODO(vjpai): Consider using a single global CQ for the default CQ - // if there is no explicit per-server CQ registered + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-server CQ registered grpc::internal::MutexLock l(&mu_); if (callback_cq_ != nullptr) { return callback_cq_; @@ -1330,11 +1330,11 @@ grpc::CompletionQueue* Server::CallbackCQ() { callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); - + // Transfer ownership of the new cq to its own shutdown callback shutdown_callback->TakeCQ(callback_cq_); - return callback_cq_; + return callback_cq_; } - + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index 458ac20d87..fea258e6e7 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -43,50 +43,50 @@ class ServerContextBase::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - // must ref the call before calling constructor and after deleting this + // must ref the call before calling constructor and after deleting this CompletionOp(internal::Call* call, ::grpc::internal::ServerCallbackCall* callback_controller) - : call_(*call), + : call_(*call), callback_controller_(callback_controller), - has_tag_(false), + has_tag_(false), tag_(nullptr), - core_cq_tag_(this), + core_cq_tag_(this), refs_(2), finalized_(false), - cancelled_(0), - done_intercepting_(false) {} - - // CompletionOp isn't copyable or movable - CompletionOp(const CompletionOp&) = delete; - CompletionOp& operator=(const CompletionOp&) = delete; - CompletionOp(CompletionOp&&) = delete; - CompletionOp& operator=(CompletionOp&&) = delete; - - ~CompletionOp() { - if (call_.server_rpc_info()) { - call_.server_rpc_info()->Unref(); - } - } - + cancelled_(0), + done_intercepting_(false) {} + + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + + ~CompletionOp() { + if (call_.server_rpc_info()) { + call_.server_rpc_info()->Unref(); + } + } + void FillOps(internal::Call* call) override; - - // This should always be arena allocated in the call, so override delete. - // But this class is not trivially destructible, so must actually call delete - // before allowing the arena to be freed + + // This should always be arena allocated in the call, so override delete. + // But this class is not trivially destructible, so must actually call delete + // before allowing the arena to be freed static void operator delete(void* /*ptr*/, std::size_t size) { // Use size to avoid unused-parameter warning since assert seems to be // compiled out and treated as unused in some gcc optimized versions. (void)size; - assert(size == sizeof(CompletionOp)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - + assert(size == sizeof(CompletionOp)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -100,36 +100,36 @@ class ServerContextBase::CompletionOp final tag_ = tag; } - void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } - - void* core_cq_tag() override { return core_cq_tag_; } - + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } + + void* core_cq_tag() override { return core_cq_tag_; } + void Unref(); - // This will be called while interceptors are run if the RPC is a hijacked - // RPC. This should set hijacking state for each of the ops. - void SetHijackingState() override { - /* Servers don't allow hijacking */ + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + void SetHijackingState() override { + /* Servers don't allow hijacking */ GPR_ASSERT(false); - } - - /* Should be called after interceptors are done running */ - void ContinueFillOpsAfterInterception() override {} - - /* Should be called after interceptors are done running on the finalize result - * path */ - void ContinueFinalizeResultAfterInterception() override { - done_intercepting_ = true; - if (!has_tag_) { - /* We don't have a tag to return. */ + } + + /* Should be called after interceptors are done running */ + void ContinueFillOpsAfterInterception() override {} + + /* Should be called after interceptors are done running on the finalize result + * path */ + void ContinueFinalizeResultAfterInterception() override { + done_intercepting_ = true; + if (!has_tag_) { + /* We don't have a tag to return. */ Unref(); - return; - } - /* Start a dummy op so that we can return the tag */ + return; + } + /* Start a dummy op so that we can return the tag */ GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr) == GRPC_CALL_OK); - } - + } + private: bool CheckCancelledNoPluck() { grpc_core::MutexLock lock(&mu_); @@ -140,37 +140,37 @@ class ServerContextBase::CompletionOp final ::grpc::internal::ServerCallbackCall* const callback_controller_; bool has_tag_; void* tag_; - void* core_cq_tag_; + void* core_cq_tag_; grpc_core::RefCount refs_; grpc_core::Mutex mu_; bool finalized_; - int cancelled_; // This is an int (not bool) because it is passed to core - bool done_intercepting_; + int cancelled_; // This is an int (not bool) because it is passed to core + bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; void ServerContextBase::CompletionOp::Unref() { if (refs_.Unref()) { - grpc_call* call = call_.call(); + grpc_call* call = call_.call(); delete this; - grpc_call_unref(call); + grpc_call_unref(call); } } void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { - grpc_op ops; - ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops.data.recv_close_on_server.cancelled = &cancelled_; - ops.flags = 0; - ops.reserved = nullptr; - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - interceptor_methods_.SetCallOpSetInterface(this); + grpc_op ops; + ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops.data.recv_close_on_server.cancelled = &cancelled_; + ops.flags = 0; + ops.reserved = nullptr; + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + interceptor_methods_.SetCallOpSetInterface(this); // The following call_start_batch is internally-generated so no need for an // explanatory log on failure. GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, nullptr) == GRPC_CALL_OK); - /* No interceptors to run here */ + /* No interceptors to run here */ } bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { @@ -187,9 +187,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { } Unref(); return has_tag; - } + } finalized_ = true; - + // If for some reason the incoming status is false, mark that as a // cancellation. // TODO(vjpai): does this ever happen? @@ -200,24 +200,24 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { call_cancel = (cancelled_ != 0); // Release the lock since we may call a callback and interceptors. } - + if (call_cancel && callback_controller_ != nullptr) { callback_controller_->MaybeCallOnCancel(); } - /* Add interception point and run through interceptors */ - interceptor_methods_.AddInterceptionHookPoint( + /* Add interception point and run through interceptors */ + interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE); - if (interceptor_methods_.RunInterceptors()) { + if (interceptor_methods_.RunInterceptors()) { // No interceptors were run bool has_tag = has_tag_; if (has_tag) { - *tag = tag_; - } + *tag = tag_; + } Unref(); return has_tag; - } + } // There are interceptors to be run. Return false for now. - return false; + return false; } // ServerContextBase body @@ -233,17 +233,17 @@ ServerContextBase::ServerContextBase(gpr_timespec deadline, void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr) { - deadline_ = deadline; - std::swap(*client_metadata_.arr(), *arr); -} - + deadline_ = deadline; + std::swap(*client_metadata_.arr(), *arr); +} + ServerContextBase::~ServerContextBase() { if (completion_op_) { completion_op_->Unref(); } - if (rpc_info_) { - rpc_info_->Unref(); - } + if (rpc_info_) { + rpc_info_->Unref(); + } if (default_reactor_used_.load(std::memory_order_relaxed)) { reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); } @@ -261,19 +261,19 @@ void ServerContextBase::BeginCompletionOp( internal::Call* call, std::function<void(bool)> callback, ::grpc::internal::ServerCallbackCall* callback_controller) { GPR_ASSERT(!completion_op_); - if (rpc_info_) { - rpc_info_->Ref(); - } - grpc_call_ref(call->call()); - completion_op_ = - new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) + if (rpc_info_) { + rpc_info_->Ref(); + } + grpc_call_ref(call->call()); + completion_op_ = + new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call, callback_controller); if (callback_controller != nullptr) { completion_tag_.Set(call->call(), std::move(callback), completion_op_, true); - completion_op_->set_core_cq_tag(&completion_tag_); - completion_op_->set_tag(completion_op_); - } else if (has_notify_when_done_tag_) { + completion_op_->set_core_cq_tag(&completion_tag_); + completion_op_->set_tag(completion_op_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -295,11 +295,11 @@ void ServerContextBase::AddTrailingMetadata(const TString& key, void ServerContextBase::TryCancel() const { internal::CancelInterceptorBatchMethods cancel_methods; - if (rpc_info_) { - for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { - rpc_info_->RunInterceptor(&cancel_methods, i); - } - } + if (rpc_info_) { + for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { + rpc_info_->RunInterceptor(&cancel_methods, i); + } + } grpc_call_error err = grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); @@ -309,15 +309,15 @@ void ServerContextBase::TryCancel() const { } bool ServerContextBase::IsCancelled() const { - if (completion_tag_) { - // When using callback API, this result is always valid. - return completion_op_->CheckCancelledAsync(); - } else if (has_notify_when_done_tag_) { - // When using async API, the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API, the result is always valid + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } |