diff options
author | heretic <heretic@yandex-team.ru> | 2022-02-10 16:45:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:43 +0300 |
commit | 397cbe258b9e064f49c4ca575279f02f39fef76e (patch) | |
tree | a0b0eb3cca6a14e4e8ea715393637672fa651284 /contrib/libs/grpc/src/cpp/server | |
parent | 43f5a35593ebc9f6bcea619bb170394ea7ae468e (diff) | |
download | ydb-397cbe258b9e064f49c4ca575279f02f39fef76e.tar.gz |
Restoring authorship annotation for <heretic@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
21 files changed, 507 insertions, 507 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc index 07697a52d1..0e4e95465f 100644 --- a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc +++ b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc @@ -24,8 +24,8 @@ namespace grpc { void AsyncGenericService::RequestCall( GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - ::grpc::CompletionQueue* call_cq, - ::grpc::ServerCompletionQueue* notification_cq, void* tag) { + ::grpc::CompletionQueue* call_cq, + ::grpc::ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq, tag); } diff --git a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc index 9aad932429..69ee6eed68 100644 --- a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc +++ b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc @@ -21,10 +21,10 @@ namespace grpc { std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( - const TString& name, const TString& value) { + const TString& name, const TString& value) { class StringOption final : public ServerBuilderOption { public: - StringOption(const TString& name, const TString& value) + StringOption(const TString& name, const TString& value) : name_(name), value_(value) {} virtual void UpdateArguments(ChannelArguments* args) override { @@ -35,17 +35,17 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( override {} private: - const TString name_; - const TString value_; + const TString name_; + const TString value_; }; return std::unique_ptr<ServerBuilderOption>(new StringOption(name, value)); } std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( - const TString& name, int value) { + const TString& name, int value) { class IntOption final : public ServerBuilderOption { public: - IntOption(const TString& name, int value) + IntOption(const TString& name, int value) : name_(name), value_(value) {} virtual void UpdateArguments(ChannelArguments* args) override { @@ -56,7 +56,7 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( override {} private: - const TString name_; + const TString name_; const int value_; }; return std::unique_ptr<ServerBuilderOption>(new IntOption(name, value)); diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc index 6dcf84bf40..1f402ef53d 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc @@ -24,9 +24,9 @@ #include <grpc/support/alloc.h> namespace grpc { - -namespace { - + +namespace { + grpc::protobuf::util::Status ParseJson(const char* json_str, grpc::protobuf::Message* message) { grpc::protobuf::json::JsonParseOptions options; @@ -34,8 +34,8 @@ grpc::protobuf::util::Status ParseJson(const char* json_str, return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); } -} // namespace - +} // namespace + Status ChannelzService::GetTopChannels( ServerContext* /*unused*/, const channelz::v1::GetTopChannelsRequest* request, diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc index ae26a447ab..bae1b59544 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc @@ -33,7 +33,7 @@ class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { public: ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {} - TString name() override { return "channelz_service"; } + TString name() override { return "channelz_service"; } void InitServer(grpc::ServerInitializer* si) override { si->RegisterService(channelz_service_); @@ -41,7 +41,7 @@ class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { void Finish(grpc::ServerInitializer* /*si*/) override {} - void ChangeArguments(const TString& /*name*/, void* /*value*/) override {} + void ChangeArguments(const TString& /*name*/, void* /*value*/) override {} bool has_sync_methods() const override { if (channelz_service_) { @@ -75,12 +75,12 @@ namespace channelz { namespace experimental { void InitChannelzService() { - static struct Initializer { - Initializer() { - ::grpc::ServerBuilder::InternalAddPluginFactory( - &grpc::channelz::experimental::CreateChannelzServicePlugin); - } - } initialize; + static struct Initializer { + Initializer() { + ::grpc::ServerBuilder::InternalAddPluginFactory( + &grpc::channelz::experimental::CreateChannelzServicePlugin); + } + } initialize; } } // namespace experimental diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc index 09d2a9d3b5..9641301ab7 100644 --- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc +++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc @@ -20,7 +20,7 @@ #include <memory> -#include <grpcpp/server_builder.h> +#include <grpcpp/server_builder.h> #include <grpcpp/support/channel_arguments.h> namespace grpc { @@ -42,7 +42,7 @@ class AcceptorWrapper : public experimental::ExternalConnectionAcceptor { } // namespace ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl( - const TString& name, + const TString& name, ServerBuilder::experimental_type::ExternalConnectionType type, std::shared_ptr<ServerCredentials> creds) : name_(name), creds_(std::move(creds)) { diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h index 430c72862e..8a1179bef2 100644 --- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h +++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h @@ -36,7 +36,7 @@ class ExternalConnectionAcceptorImpl : public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> { public: ExternalConnectionAcceptorImpl( - const TString& name, + const TString& name, ServerBuilder::experimental_type::ExternalConnectionType type, std::shared_ptr<ServerCredentials> creds); // Should only be called once. @@ -56,7 +56,7 @@ class ExternalConnectionAcceptorImpl void SetToChannelArgs(::grpc::ChannelArguments* args); private: - const TString name_; + const TString name_; std::shared_ptr<ServerCredentials> creds_; grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned grpc_core::Mutex mu_; diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index 3cc508d0cb..059c97a414 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -18,16 +18,16 @@ #include <memory> -#include "upb/upb.hpp" - +#include "upb/upb.hpp" + #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpcpp/impl/codegen/method_handler.h> +#include <grpcpp/impl/codegen/method_handler.h> #include "src/cpp/server/health/default_health_check_service.h" #include "src/proto/grpc/health/v1/health.upb.h" -#include "upb/upb.hpp" +#include "upb/upb.hpp" #define MAX_SERVICE_NAME_LENGTH 200 @@ -42,7 +42,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { } void DefaultHealthCheckService::SetServingStatus( - const TString& service_name, bool serving) { + const TString& service_name, bool serving) { grpc_core::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. @@ -77,7 +77,7 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( - const TString& service_name) const { + const TString& service_name) const { grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) { @@ -88,7 +88,7 @@ DefaultHealthCheckService::GetServingStatus( } void DefaultHealthCheckService::RegisterCallHandler( - const TString& service_name, + const TString& service_name, std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { grpc_core::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; @@ -98,7 +98,7 @@ void DefaultHealthCheckService::RegisterCallHandler( } void DefaultHealthCheckService::UnregisterCallHandler( - const TString& service_name, + const TString& service_name, const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); @@ -185,7 +185,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { } void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { - HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg); + HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg); void* tag; bool ok; while (true) { @@ -200,7 +200,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { } bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( - const ByteBuffer& request, TString* service_name) { + const ByteBuffer& request, TString* service_name) { std::vector<Slice> slices; if (!request.Dump(&slices).ok()) return false; uint8_t* request_bytes = nullptr; @@ -301,7 +301,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: // Process request. gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, this); - TString service_name; + TString service_name; grpc::Status status = Status::OK; ByteBuffer response; if (!service_->DecodeRequest(request_, &service_name)) { diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h index 9da1dfc15f..6d7479b016 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h @@ -194,7 +194,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { HealthCheckServiceImpl* service_; ByteBuffer request_; - TString service_name_; + TString service_name_; GenericServerAsyncWriter stream_; ServerContext ctx_; @@ -213,7 +213,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // Returns true on success. static bool DecodeRequest(const ByteBuffer& request, - TString* service_name); + TString* service_name); static bool EncodeResponse(ServingStatus status, ByteBuffer* response); // Needed to appease Windows compilers, which don't seem to allow @@ -234,12 +234,12 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { DefaultHealthCheckService(); - void SetServingStatus(const TString& service_name, bool serving) override; + void SetServingStatus(const TString& service_name, bool serving) override; void SetServingStatus(bool serving) override; void Shutdown() override; - ServingStatus GetServingStatus(const TString& service_name) const; + ServingStatus GetServingStatus(const TString& service_name) const; HealthCheckServiceImpl* GetHealthCheckService( std::unique_ptr<ServerCompletionQueue> cq); @@ -266,16 +266,16 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { }; void RegisterCallHandler( - const TString& service_name, + const TString& service_name, std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); void UnregisterCallHandler( - const TString& service_name, + const TString& service_name, const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); mutable grpc_core::Mutex mu_; - bool shutdown_ = false; // Guarded by mu_. - std::map<TString, ServiceData> services_map_; // Guarded by mu_. + bool shutdown_ = false; // Guarded by mu_. + std::map<TString, ServiceData> services_map_; // Guarded by mu_. std::unique_ptr<HealthCheckServiceImpl> impl_; }; diff --git a/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc index a0fa2d62f5..35c194cb1b 100644 --- a/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc @@ -16,9 +16,9 @@ * */ -#include <grpcpp/health_check_service_interface.h> +#include <grpcpp/health_check_service_interface.h> -namespace grpc { +namespace grpc { namespace { bool g_grpc_default_health_check_service_enabled = false; } // namespace @@ -31,4 +31,4 @@ void EnableDefaultHealthCheckService(bool enable) { g_grpc_default_health_check_service_enabled = enable; } -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc index 3f33f4e045..f78a4daf07 100644 --- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc @@ -21,11 +21,11 @@ #include <grpc/grpc.h> #include <grpc/support/log.h> -namespace grpc { +namespace grpc { namespace { class InsecureServerCredentialsImpl final : public ServerCredentials { public: - int AddPortToServer(const TString& addr, grpc_server* server) override { + int AddPortToServer(const TString& addr, grpc_server* server) override { return grpc_server_add_insecure_http2_port(server, addr.c_str()); } void SetAuthMetadataProcessor( @@ -41,4 +41,4 @@ std::shared_ptr<ServerCredentials> InsecureServerCredentials() { new InsecureServerCredentialsImpl()); } -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc index f07fa812a7..46a368da91 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc @@ -77,8 +77,8 @@ const typename C::value_type* RandomElement(const C& container) { } // namespace -LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token, - TString user_id) +LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token, + TString user_id) : user_id_(std::move(user_id)) { GPR_ASSERT(client_ip_and_token.size() >= 2); int ip_hex_size; @@ -98,7 +98,7 @@ LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token, } } -TString LoadRecordKey::GetClientIpBytes() const { +TString LoadRecordKey::GetClientIpBytes() const { if (client_ip_hex_.empty()) { return ""; } else if (client_ip_hex_.size() == kIpv4AddressLength) { @@ -110,8 +110,8 @@ TString LoadRecordKey::GetClientIpBytes() const { return ""; } ip_bytes = grpc_htonl(ip_bytes); - return TString(reinterpret_cast<const char*>(&ip_bytes), - sizeof(ip_bytes)); + return TString(reinterpret_cast<const char*>(&ip_bytes), + sizeof(ip_bytes)); } else if (client_ip_hex_.size() == kIpv6AddressLength) { uint32_t ip_bytes[4]; for (size_t i = 0; i < 4; ++i) { @@ -125,14 +125,14 @@ TString LoadRecordKey::GetClientIpBytes() const { } ip_bytes[i] = grpc_htonl(ip_bytes[i]); } - return TString(reinterpret_cast<const char*>(ip_bytes), - sizeof(ip_bytes)); + return TString(reinterpret_cast<const char*>(ip_bytes), + sizeof(ip_bytes)); } else { GPR_UNREACHABLE_CODE(return ""); } } -LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls, +LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls, double total_metric_value) { call_metrics_.emplace(std::move(metric_name), CallMetricValue(num_calls, total_metric_value)); @@ -177,8 +177,8 @@ uint64_t PerBalancerStore::GetNumCallsInProgressForReport() { return num_calls_in_progress_; } -void PerHostStore::ReportStreamCreated(const TString& lb_id, - const TString& load_key) { +void PerHostStore::ReportStreamCreated(const TString& lb_id, + const TString& load_key) { GPR_ASSERT(lb_id != kInvalidLbId); SetUpForNewLbId(lb_id, load_key); // Prior to this one, there was no load balancer receiving report, so we may @@ -188,7 +188,7 @@ void PerHostStore::ReportStreamCreated(const TString& lb_id, // this stream. Need to discuss with LB team. if (assigned_stores_.size() == 1) { for (const auto& p : per_balancer_stores_) { - const TString& other_lb_id = p.first; + const TString& other_lb_id = p.first; const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second; if (other_lb_id != lb_id) { orphaned_store->Resume(); @@ -203,7 +203,7 @@ void PerHostStore::ReportStreamCreated(const TString& lb_id, } } -void PerHostStore::ReportStreamClosed(const TString& lb_id) { +void PerHostStore::ReportStreamClosed(const TString& lb_id) { auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id); GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end()); // Remove this closed stream from our records. @@ -215,7 +215,7 @@ void PerHostStore::ReportStreamClosed(const TString& lb_id) { // The stores that were assigned to this balancer are orphaned now. They // should be re-assigned to other balancers which are still receiving reports. for (PerBalancerStore* orphaned_store : orphaned_stores) { - const TString* new_receiver = nullptr; + const TString* new_receiver = nullptr; auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key()); if (it != load_key_to_receiving_lb_ids_.end()) { // First, try to pick from the active balancers with the same load key. @@ -235,21 +235,21 @@ void PerHostStore::ReportStreamClosed(const TString& lb_id) { } PerBalancerStore* PerHostStore::FindPerBalancerStore( - const TString& lb_id) const { + const TString& lb_id) const { return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end() ? per_balancer_stores_.find(lb_id)->second.get() : nullptr; } const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores( - const TString& lb_id) const { + const TString& lb_id) const { auto it = assigned_stores_.find(lb_id); if (it == assigned_stores_.end()) return nullptr; return &(it->second); } void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store, - const TString& new_receiver) { + const TString& new_receiver) { auto it = assigned_stores_.find(new_receiver); GPR_ASSERT(it != assigned_stores_.end()); it->second.insert(orphaned_store); @@ -260,8 +260,8 @@ void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store, new_receiver.c_str()); } -void PerHostStore::SetUpForNewLbId(const TString& lb_id, - const TString& load_key) { +void PerHostStore::SetUpForNewLbId(const TString& lb_id, + const TString& load_key) { // The top-level caller (i.e., LoadReportService) should guarantee the // lb_id is unique for each reporting stream. GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end()); @@ -284,7 +284,7 @@ PerBalancerStore* LoadDataStore::FindPerBalancerStore( } } -void LoadDataStore::MergeRow(const TString& hostname, +void LoadDataStore::MergeRow(const TString& hostname, const LoadRecordKey& key, const LoadRecordValue& value) { PerBalancerStore* per_balancer_store = @@ -315,20 +315,20 @@ void LoadDataStore::MergeRow(const TString& hostname, } const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores( - const TString& hostname, const TString& lb_id) { + const TString& hostname, const TString& lb_id) { auto it = per_host_stores_.find(hostname); if (it == per_host_stores_.end()) return nullptr; return it->second.GetAssignedStores(lb_id); } -void LoadDataStore::ReportStreamCreated(const TString& hostname, - const TString& lb_id, - const TString& load_key) { +void LoadDataStore::ReportStreamCreated(const TString& hostname, + const TString& lb_id, + const TString& load_key) { per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key); } -void LoadDataStore::ReportStreamClosed(const TString& hostname, - const TString& lb_id) { +void LoadDataStore::ReportStreamClosed(const TString& hostname, + const TString& lb_id) { auto it_per_host_store = per_host_stores_.find(hostname); GPR_ASSERT(it_per_host_store != per_host_stores_.end()); it_per_host_store->second.ReportStreamClosed(lb_id); diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h index 61ba618331..f6a31b87bc 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h @@ -30,8 +30,8 @@ #include "src/cpp/server/load_reporter/constants.h" -#include <util/string/cast.h> - +#include <util/string/cast.h> + namespace grpc { namespace load_reporter { @@ -69,17 +69,17 @@ class CallMetricValue { // The key of a load record. class LoadRecordKey { public: - LoadRecordKey(TString lb_id, TString lb_tag, TString user_id, - TString client_ip_hex) + LoadRecordKey(TString lb_id, TString lb_tag, TString user_id, + TString client_ip_hex) : lb_id_(std::move(lb_id)), lb_tag_(std::move(lb_tag)), user_id_(std::move(user_id)), client_ip_hex_(std::move(client_ip_hex)) {} // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. - LoadRecordKey(const TString& client_ip_and_token, TString user_id); + LoadRecordKey(const TString& client_ip_and_token, TString user_id); - TString ToString() const { + TString ToString() const { return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ + ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ + "]"; @@ -91,17 +91,17 @@ class LoadRecordKey { } // Gets the client IP bytes in network order (i.e., big-endian). - TString GetClientIpBytes() const; + TString GetClientIpBytes() const; // Getters. - const TString& lb_id() const { return lb_id_; } - const TString& lb_tag() const { return lb_tag_; } - const TString& user_id() const { return user_id_; } - const TString& client_ip_hex() const { return client_ip_hex_; } + const TString& lb_id() const { return lb_id_; } + const TString& lb_tag() const { return lb_tag_; } + const TString& user_id() const { return user_id_; } + const TString& client_ip_hex() const { return client_ip_hex_; } struct Hasher { - void hash_combine(size_t* seed, const TString& k) const { - *seed ^= std::hash<TString>()(k) + 0x9e3779b9 + (*seed << 6) + + void hash_combine(size_t* seed, const TString& k) const { + *seed ^= std::hash<TString>()(k) + 0x9e3779b9 + (*seed << 6) + (*seed >> 2); } @@ -116,10 +116,10 @@ class LoadRecordKey { }; private: - TString lb_id_; - TString lb_tag_; - TString user_id_; - TString client_ip_hex_; + TString lb_id_; + TString lb_tag_; + TString user_id_; + TString client_ip_hex_; }; // The value of a load record. @@ -135,7 +135,7 @@ class LoadRecordValue { bytes_recv_(bytes_recv), latency_ms_(latency_ms) {} - LoadRecordValue(TString metric_name, uint64_t num_calls, + LoadRecordValue(TString metric_name, uint64_t num_calls, double total_metric_value); void MergeFrom(const LoadRecordValue& other) { @@ -146,7 +146,7 @@ class LoadRecordValue { bytes_recv_ += other.bytes_recv_; latency_ms_ += other.latency_ms_; for (const auto& p : other.call_metrics_) { - const TString& key = p.first; + const TString& key = p.first; const CallMetricValue& value = p.second; call_metrics_[key].MergeFrom(value); } @@ -156,17 +156,17 @@ class LoadRecordValue { return static_cast<int64_t>(start_count_ - ok_count_ - error_count_); } - TString ToString() const { - return "[start_count_=" + ::ToString(start_count_) + - ", ok_count_=" + ::ToString(ok_count_) + - ", error_count_=" + ::ToString(error_count_) + - ", bytes_sent_=" + ::ToString(bytes_sent_) + - ", bytes_recv_=" + ::ToString(bytes_recv_) + - ", latency_ms_=" + ::ToString(latency_ms_) + ", " + - ::ToString(call_metrics_.size()) + " other call metric(s)]"; + TString ToString() const { + return "[start_count_=" + ::ToString(start_count_) + + ", ok_count_=" + ::ToString(ok_count_) + + ", error_count_=" + ::ToString(error_count_) + + ", bytes_sent_=" + ::ToString(bytes_sent_) + + ", bytes_recv_=" + ::ToString(bytes_recv_) + + ", latency_ms_=" + ::ToString(latency_ms_) + ", " + + ::ToString(call_metrics_.size()) + " other call metric(s)]"; } - bool InsertCallMetric(const TString& metric_name, + bool InsertCallMetric(const TString& metric_name, const CallMetricValue& metric_value) { return call_metrics_.insert({metric_name, metric_value}).second; } @@ -178,7 +178,7 @@ class LoadRecordValue { uint64_t bytes_sent() const { return bytes_sent_; } uint64_t bytes_recv() const { return bytes_recv_; } uint64_t latency_ms() const { return latency_ms_; } - const std::unordered_map<TString, CallMetricValue>& call_metrics() const { + const std::unordered_map<TString, CallMetricValue>& call_metrics() const { return call_metrics_; } @@ -189,7 +189,7 @@ class LoadRecordValue { uint64_t bytes_sent_ = 0; uint64_t bytes_recv_ = 0; uint64_t latency_ms_ = 0; - std::unordered_map<TString, CallMetricValue> call_metrics_; + std::unordered_map<TString, CallMetricValue> call_metrics_; }; // Stores the data associated with a particular LB ID. @@ -198,7 +198,7 @@ class PerBalancerStore { using LoadRecordMap = std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>; - PerBalancerStore(TString lb_id, TString load_key) + PerBalancerStore(TString lb_id, TString load_key) : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {} // Merge a load record with the given key and value if the store is not @@ -218,7 +218,7 @@ class PerBalancerStore { uint64_t GetNumCallsInProgressForReport(); - TString ToString() { + TString ToString() { return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ + "]"; } @@ -226,14 +226,14 @@ class PerBalancerStore { void ClearLoadRecordMap() { load_record_map_.clear(); } // Getters. - const TString& lb_id() const { return lb_id_; } - const TString& load_key() const { return load_key_; } + const TString& lb_id() const { return lb_id_; } + const TString& load_key() const { return load_key_; } const LoadRecordMap& load_record_map() const { return load_record_map_; } private: - TString lb_id_; + TString lb_id_; // TODO(juanlishen): Use bytestring protobuf type? - TString load_key_; + TString load_key_; LoadRecordMap load_record_map_; uint64_t num_calls_in_progress_ = 0; uint64_t last_reported_num_calls_in_progress_ = 0; @@ -247,39 +247,39 @@ class PerHostStore { // LB ID (guaranteed unique) associated with that stream. If it is the only // active store, adopt all the orphaned stores. If it is the first created // store, adopt the store of kInvalidLbId. - void ReportStreamCreated(const TString& lb_id, - const TString& load_key); + void ReportStreamCreated(const TString& lb_id, + const TString& load_key); // When a report stream is closed, the PerBalancerStores assigned to the // associate LB ID need to be re-assigned to other active balancers, // ideally with the same load key. If there is no active balancer, we have // to suspend those stores and drop the incoming load data until they are // resumed. - void ReportStreamClosed(const TString& lb_id); + void ReportStreamClosed(const TString& lb_id); // Returns null if not found. Caller doesn't own the returned store. - PerBalancerStore* FindPerBalancerStore(const TString& lb_id) const; + PerBalancerStore* FindPerBalancerStore(const TString& lb_id) const; // Returns null if lb_id is not found. The returned pointer points to the // underlying data structure, which is not owned by the caller. const std::set<PerBalancerStore*>* GetAssignedStores( - const TString& lb_id) const; + const TString& lb_id) const; private: // Creates a PerBalancerStore for the given LB ID, assigns the store to // itself, and records the LB ID to the load key. - void SetUpForNewLbId(const TString& lb_id, const TString& load_key); + void SetUpForNewLbId(const TString& lb_id, const TString& load_key); void AssignOrphanedStore(PerBalancerStore* orphaned_store, - const TString& new_receiver); + const TString& new_receiver); - std::unordered_map<TString, std::set<TString>> + std::unordered_map<TString, std::set<TString>> load_key_to_receiving_lb_ids_; // Key: LB ID. The key set includes all the LB IDs that have been // allocated for reporting streams so far. // Value: the unique pointer to the PerBalancerStore of the LB ID. - std::unordered_map<TString, std::unique_ptr<PerBalancerStore>> + std::unordered_map<TString, std::unique_ptr<PerBalancerStore>> per_balancer_stores_; // Key: LB ID. The key set includes the LB IDs of the balancers that are @@ -287,7 +287,7 @@ class PerHostStore { // Value: the set of raw pointers to the PerBalancerStores assigned to the LB // ID. Note that the sets in assigned_stores_ form a division of the value set // of per_balancer_stores_. - std::unordered_map<TString, std::set<PerBalancerStore*>> assigned_stores_; + std::unordered_map<TString, std::set<PerBalancerStore*>> assigned_stores_; }; // Thread-unsafe two-level bookkeeper of all the load data. @@ -302,8 +302,8 @@ class PerHostStore { class LoadDataStore { public: // Returns null if not found. Caller doesn't own the returned store. - PerBalancerStore* FindPerBalancerStore(const TString& hostname, - const TString& lb_id) const; + PerBalancerStore* FindPerBalancerStore(const TString& hostname, + const TString& lb_id) const; // Returns null if hostname or lb_id is not found. The returned pointer points // to the underlying data structure, which is not owned by the caller. @@ -313,33 +313,33 @@ class LoadDataStore { // If a PerBalancerStore can be found by the hostname and LB ID in // LoadRecordKey, the load data will be merged to that store. Otherwise, // only track the number of the in-progress calls for this unknown LB ID. - void MergeRow(const TString& hostname, const LoadRecordKey& key, + void MergeRow(const TString& hostname, const LoadRecordKey& key, const LoadRecordValue& value); // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated // with some received load data but unknown to this load data store)? - bool IsTrackedUnknownBalancerId(const TString& lb_id) const { + bool IsTrackedUnknownBalancerId(const TString& lb_id) const { return unknown_balancer_id_trackers_.find(lb_id) != unknown_balancer_id_trackers_.end(); } // Wrapper around PerHostStore::ReportStreamCreated. - void ReportStreamCreated(const TString& hostname, - const TString& lb_id, - const TString& load_key); + void ReportStreamCreated(const TString& hostname, + const TString& lb_id, + const TString& load_key); // Wrapper around PerHostStore::ReportStreamClosed. - void ReportStreamClosed(const TString& hostname, - const TString& lb_id); + void ReportStreamClosed(const TString& hostname, + const TString& lb_id); private: // Buffered data that was fetched from Census but hasn't been sent to // balancer. We need to keep this data ourselves because Census will // delete the data once it's returned. - std::unordered_map<TString, PerHostStore> per_host_stores_; + std::unordered_map<TString, PerHostStore> per_host_stores_; // Tracks the number of in-progress calls for each unknown LB ID. - std::unordered_map<TString, uint64_t> unknown_balancer_id_trackers_; + std::unordered_map<TString, uint64_t> unknown_balancer_id_trackers_; }; } // namespace load_reporter diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc index 24ad9f3f24..b1acde1010 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc @@ -24,14 +24,14 @@ #include <grpc/support/log.h> -namespace grpc { +namespace grpc { namespace load_reporter { namespace experimental { void AddLoadReportingCost(grpc::ServerContext* ctx, - const TString& cost_name, double cost_value) { + const TString& cost_name, double cost_value) { if (std::isnormal(cost_value)) { - TString buf; + TString buf; buf.resize(sizeof(cost_value) + cost_name.size()); memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value)); memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(), @@ -44,4 +44,4 @@ void AddLoadReportingCost(grpc::ServerContext* ctx, } // namespace experimental } // namespace load_reporter -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index 732602bcb7..b084b01836 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -21,7 +21,7 @@ #include <memory> #include <grpcpp/impl/codegen/slice.h> -#include <grpcpp/impl/grpc_library.h> +#include <grpcpp/impl/grpc_library.h> #include <grpcpp/security/auth_metadata_processor.h> #include "src/cpp/common/secure_auth_context.h" @@ -92,7 +92,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( status.error_message().c_str()); } -int SecureServerCredentials::AddPortToServer(const TString& addr, +int SecureServerCredentials::AddPortToServer(const TString& addr, grpc_server* server) { return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_); } @@ -145,11 +145,11 @@ std::shared_ptr<ServerCredentials> LocalServerCredentials( } std::shared_ptr<ServerCredentials> TlsServerCredentials( - const grpc::experimental::TlsCredentialsOptions& options) { - grpc::GrpcLibraryCodegen init; - return std::shared_ptr<ServerCredentials>(new SecureServerCredentials( - grpc_tls_server_credentials_create(options.c_credentials_options()))); + const grpc::experimental::TlsCredentialsOptions& options) { + grpc::GrpcLibraryCodegen init; + return std::shared_ptr<ServerCredentials>(new SecureServerCredentials( + grpc_tls_server_credentials_create(options.c_credentials_options()))); } } // namespace experimental -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h index 9e3fb3f9eb..9a09c2314f 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h @@ -28,7 +28,7 @@ #include "src/cpp/server/thread_pool_interface.h" -namespace grpc { +namespace grpc { class SecureServerCredentials; @@ -64,7 +64,7 @@ class SecureServerCredentials final : public ServerCredentials { grpc_server_credentials_release(creds_); } - int AddPortToServer(const TString& addr, grpc_server* server) override; + int AddPortToServer(const TString& addr, grpc_server* server) override; void SetAuthMetadataProcessor( const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override; @@ -74,6 +74,6 @@ class SecureServerCredentials final : public ServerCredentials { std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; }; -} // namespace grpc +} // namespace grpc #endif // GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 0cc00b365f..e3a9f4e4a6 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -26,21 +26,21 @@ #include <utility> -#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/thread_pool_interface.h" -namespace grpc { +namespace grpc { -static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>* +static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>* g_plugin_factory_list; static gpr_once once_init_plugin_list = GPR_ONCE_INIT; static void do_plugin_list_init(void) { g_plugin_factory_list = - new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>(); + new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>(); } ServerBuilder::ServerBuilder() @@ -68,29 +68,29 @@ ServerBuilder::~ServerBuilder() { } } -std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue( +std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { - grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue( + grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue( GRPC_CQ_NEXT, is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, nullptr); cqs_.push_back(cq); - return std::unique_ptr<grpc::ServerCompletionQueue>(cq); + return std::unique_ptr<grpc::ServerCompletionQueue>(cq); } -ServerBuilder& ServerBuilder::RegisterService(Service* service) { +ServerBuilder& ServerBuilder::RegisterService(Service* service) { services_.emplace_back(new NamedService(service)); return *this; } -ServerBuilder& ServerBuilder::RegisterService(const TString& addr, - Service* service) { +ServerBuilder& ServerBuilder::RegisterService(const TString& addr, + Service* service) { services_.emplace_back(new NamedService(addr, service)); return *this; } ServerBuilder& ServerBuilder::RegisterAsyncGenericService( - AsyncGenericService* service) { + AsyncGenericService* service) { if (generic_service_ || callback_generic_service_) { gpr_log(GPR_ERROR, "Adding multiple generic services is unsupported for now. " @@ -102,22 +102,22 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( return *this; } -#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL -ServerBuilder& ServerBuilder::RegisterCallbackGenericService( - CallbackGenericService* service) { - if (generic_service_ || callback_generic_service_) { - gpr_log(GPR_ERROR, - "Adding multiple generic services is unsupported for now. " - "Dropping the service %p", - (void*)service); - } else { - callback_generic_service_ = service; - } - return *this; -} -#else +#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL +ServerBuilder& ServerBuilder::RegisterCallbackGenericService( + CallbackGenericService* service) { + if (generic_service_ || callback_generic_service_) { + gpr_log(GPR_ERROR, + "Adding multiple generic services is unsupported for now. " + "Dropping the service %p", + (void*)service); + } else { + callback_generic_service_ = service; + } + return *this; +} +#else ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( - experimental::CallbackGenericService* service) { + experimental::CallbackGenericService* service) { if (builder_->generic_service_ || builder_->callback_generic_service_) { gpr_log(GPR_ERROR, "Adding multiple generic services is unsupported for now. " @@ -128,13 +128,13 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( } return *builder_; } -#endif +#endif std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> ServerBuilder::experimental_type::AddExternalConnectionAcceptor( experimental_type::ExternalConnectionType type, std::shared_ptr<ServerCredentials> creds) { - TString name_prefix("external:"); + TString name_prefix("external:"); char count_str[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str); builder_->acceptors_.emplace_back( @@ -144,7 +144,7 @@ ServerBuilder::experimental_type::AddExternalConnectionAcceptor( } ServerBuilder& ServerBuilder::SetOption( - std::unique_ptr<ServerBuilderOption> option) { + std::unique_ptr<ServerBuilderOption> option) { options_.push_back(std::move(option)); return *this; } @@ -193,7 +193,7 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm( } ServerBuilder& ServerBuilder::SetResourceQuota( - const grpc::ResourceQuota& resource_quota) { + const grpc::ResourceQuota& resource_quota) { if (resource_quota_ != nullptr) { grpc_resource_quota_unref(resource_quota_); } @@ -203,10 +203,10 @@ ServerBuilder& ServerBuilder::SetResourceQuota( } ServerBuilder& ServerBuilder::AddListeningPort( - const TString& addr_uri, std::shared_ptr<ServerCredentials> creds, - int* selected_port) { - const TString uri_scheme = "dns:"; - TString addr = addr_uri; + const TString& addr_uri, std::shared_ptr<ServerCredentials> creds, + int* selected_port) { + const TString uri_scheme = "dns:"; + TString addr = addr_uri; if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) { size_t pos = uri_scheme.size(); while (addr_uri[pos] == '/') ++pos; // Skip slashes. @@ -222,13 +222,13 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } - if (max_send_message_size_ >= -1) { + if (max_send_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_); } - for (const auto& option : options_) { - option->UpdateArguments(&args); - option->UpdatePlugins(&plugins_); - } + for (const auto& option : options_) { + option->UpdateArguments(&args); + option->UpdatePlugins(&plugins_); + } args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, enabled_compression_algorithms_bitset_); if (maybe_default_compression_level_.is_set) { @@ -245,11 +245,11 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { grpc_resource_quota_arg_vtable()); } - for (const auto& plugin : plugins_) { - plugin->UpdateServerBuilder(this); - plugin->UpdateChannelArguments(&args); - } - + for (const auto& plugin : plugins_) { + plugin->UpdateServerBuilder(this); + plugin->UpdateChannelArguments(&args); + } + // == Determine if the server has any syncrhonous methods == bool has_sync_methods = false; for (const auto& value : services_) { @@ -275,10 +275,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // This is different from the completion queues added to the server via // ServerBuilder's AddCompletionQueue() method (those completion queues // are in 'cqs_' member variable of ServerBuilder object) - std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> - sync_server_cqs( - std::make_shared< - std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>()); + std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> + sync_server_cqs( + std::make_shared< + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>()); bool has_frequently_polled_cqs = false; for (const auto& cq : cqs_) { @@ -307,7 +307,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { sync_server_cqs->emplace_back( - new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); + new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } @@ -329,20 +329,20 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } std::unique_ptr<grpc::Server> server(new grpc::Server( - &args, sync_server_cqs, sync_server_settings_.min_pollers, - sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, - std::move(acceptors_), resource_quota_, - std::move(interceptor_creators_))); + &args, sync_server_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, + std::move(acceptors_), resource_quota_, + std::move(interceptor_creators_))); - ServerInitializer* initializer = server->initializer(); + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e // 1. sync_server_cqs: internal completion queues created IF this is a sync // server // 2. cqs_: Completion queues added via AddCompletionQueue() call - for (const auto& cq : *sync_server_cqs) { - grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + for (const auto& cq : *sync_server_cqs) { + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); has_frequently_polled_cqs = true; } @@ -355,12 +355,12 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for // listening to incoming channels. Such completion queues must be registered - // as non-listening queues. In debug mode, these should have their server list - // tracked since these are provided the user and must be Shutdown by the user - // after the server is shutdown. - for (const auto& cq : cqs_) { - grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); - cq->RegisterServer(server.get()); + // as non-listening queues. In debug mode, these should have their server list + // tracked since these are provided the user and must be Shutdown by the user + // after the server is shutdown. + for (const auto& cq : cqs_) { + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + cq->RegisterServer(server.get()); } if (!has_frequently_polled_cqs) { @@ -416,7 +416,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } void ServerBuilder::InternalAddPluginFactory( - std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) { + std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); (*g_plugin_factory_list).push_back(CreatePlugin); } @@ -431,4 +431,4 @@ ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) { } } -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc index 40aef8e735..d2963851dc 100644 --- a/contrib/libs/grpc/src/cpp/server/server_callback.cc +++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc @@ -15,70 +15,70 @@ * */ -#include <grpcpp/impl/codegen/server_callback.h> +#include <grpcpp/impl/codegen/server_callback.h> #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/executor.h" -namespace grpc { +namespace grpc { namespace internal { -void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { - if (inline_ondone) { - CallOnDone(); - } else { - // Unlike other uses of closure, do not Ref or Unref here since at this - // point, all the Ref'fing and Unref'fing is done for this call. - grpc_core::ExecCtx exec_ctx; - struct ClosureWithArg { - grpc_closure closure; - ServerCallbackCall* call; - explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) { - GRPC_CLOSURE_INIT(&closure, - [](void* void_arg, grpc_error*) { - ClosureWithArg* arg = - static_cast<ClosureWithArg*>(void_arg); - arg->call->CallOnDone(); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); - } - }; - ClosureWithArg* arg = new ClosureWithArg(this); - grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); - } -} - +void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { + if (inline_ondone) { + CallOnDone(); + } else { + // Unlike other uses of closure, do not Ref or Unref here since at this + // point, all the Ref'fing and Unref'fing is done for this call. + grpc_core::ExecCtx exec_ctx; + struct ClosureWithArg { + grpc_closure closure; + ServerCallbackCall* call; + explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) { + GRPC_CLOSURE_INIT(&closure, + [](void* void_arg, grpc_error*) { + ClosureWithArg* arg = + static_cast<ClosureWithArg*>(void_arg); + arg->call->CallOnDone(); + delete arg; + }, + this, grpc_schedule_on_exec_ctx); + } + }; + ClosureWithArg* arg = new ClosureWithArg(this); + grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); + } +} + void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { if (reactor->InternalInlineable()) { reactor->OnCancel(); } else { - // Ref to make sure that the closure executes before the whole call gets - // destructed, and Unref within the closure. + // Ref to make sure that the closure executes before the whole call gets + // destructed, and Unref within the closure. Ref(); grpc_core::ExecCtx exec_ctx; - struct ClosureWithArg { - grpc_closure closure; + struct ClosureWithArg { + grpc_closure closure; ServerCallbackCall* call; ServerReactor* reactor; - ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg) - : call(call_arg), reactor(reactor_arg) { - GRPC_CLOSURE_INIT(&closure, - [](void* void_arg, grpc_error*) { - ClosureWithArg* arg = - static_cast<ClosureWithArg*>(void_arg); - arg->reactor->OnCancel(); - arg->call->MaybeDone(); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); - } + ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg) + : call(call_arg), reactor(reactor_arg) { + GRPC_CLOSURE_INIT(&closure, + [](void* void_arg, grpc_error*) { + ClosureWithArg* arg = + static_cast<ClosureWithArg*>(void_arg); + arg->reactor->OnCancel(); + arg->call->MaybeDone(); + delete arg; + }, + this, grpc_schedule_on_exec_ctx); + } }; - ClosureWithArg* arg = new ClosureWithArg(this, reactor); - grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); + ClosureWithArg* arg = new ClosureWithArg(this, reactor); + grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); } } } // namespace internal -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index c2a911c7f7..d8725fe0eb 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -23,7 +23,7 @@ #include <utility> #include <grpc/grpc.h> -#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpcpp/completion_queue.h> @@ -47,14 +47,14 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/completion_queue.h" -#include "src/core/lib/surface/server.h" +#include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" -#include <util/stream/str.h> - +#include <util/stream/str.h> + namespace grpc { namespace { @@ -99,15 +99,15 @@ class UnimplementedAsyncRequestContext { GenericServerAsyncReaderWriter generic_stream_; }; -// TODO(vjpai): Just for this file, use some contents of the experimental -// namespace here to make the code easier to read below. Remove this when -// de-experimentalized fully. -#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL -using ::grpc::experimental::CallbackGenericService; -using ::grpc::experimental::CallbackServerContext; -using ::grpc::experimental::GenericCallbackServerContext; -#endif - +// TODO(vjpai): Just for this file, use some contents of the experimental +// namespace here to make the code easier to read below. Remove this when +// de-experimentalized fully. +#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL +using ::grpc::experimental::CallbackGenericService; +using ::grpc::experimental::CallbackServerContext; +using ::grpc::experimental::GenericCallbackServerContext; +#endif + } // namespace ServerInterface::BaseAsyncRequest::BaseAsyncRequest( @@ -293,10 +293,10 @@ class Server::UnimplementedAsyncRequest final : private grpc::UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: - UnimplementedAsyncRequest(ServerInterface* server, - grpc::ServerCompletionQueue* cq) + UnimplementedAsyncRequest(ServerInterface* server, + grpc::ServerCompletionQueue* cq) : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, - nullptr, false) {} + nullptr, false) {} bool FinalizeResult(void** tag, bool* status) override; @@ -528,54 +528,54 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { }; template <class ServerContextType> -class Server::CallbackRequest final - : public grpc::internal::CompletionQueueTag { +class Server::CallbackRequest final + : public grpc::internal::CompletionQueueTag { public: - static_assert( - std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, - "ServerContextType must be derived from CallbackServerContext"); - - // For codegen services, the value of method represents the defined - // characteristics of the method being requested. For generic services, method - // is nullptr since these services don't have pre-defined methods. - CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, - grpc::CompletionQueue* cq, - grpc_core::Server::RegisteredCallAllocation* data) + static_assert( + std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, + "ServerContextType must be derived from CallbackServerContext"); + + // For codegen services, the value of method represents the defined + // characteristics of the method being requested. For generic services, method + // is nullptr since these services don't have pre-defined methods. + CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, + grpc::CompletionQueue* cq, + grpc_core::Server::RegisteredCallAllocation* data) : server_(server), method_(method), - has_request_payload_(method->method_type() == - grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING), - cq_(cq), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), + cq_(cq), tag_(this) { - CommonSetup(server, data); - data->deadline = &deadline_; - data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; + CommonSetup(server, data); + data->deadline = &deadline_; + data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; } - // For generic services, method is nullptr since these services don't have - // pre-defined methods. - CallbackRequest(Server* server, grpc::CompletionQueue* cq, - grpc_core::Server::BatchCallAllocation* data) - : server_(server), - method_(nullptr), - has_request_payload_(false), - call_details_(new grpc_call_details), - cq_(cq), - tag_(this) { - CommonSetup(server, data); - grpc_call_details_init(call_details_); - data->details = call_details_; + // For generic services, method is nullptr since these services don't have + // pre-defined methods. + CallbackRequest(Server* server, grpc::CompletionQueue* cq, + grpc_core::Server::BatchCallAllocation* data) + : server_(server), + method_(nullptr), + has_request_payload_(false), + call_details_(new grpc_call_details), + cq_(cq), + tag_(this) { + CommonSetup(server, data); + grpc_call_details_init(call_details_); + data->details = call_details_; } - ~CallbackRequest() { - delete call_details_; - grpc_metadata_array_destroy(&request_metadata_); - if (has_request_payload_ && request_payload_) { - grpc_byte_buffer_destroy(request_payload_); - } - server_->UnrefWithPossibleNotify(); + ~CallbackRequest() { + delete call_details_; + grpc_metadata_array_destroy(&request_metadata_); + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); + } + server_->UnrefWithPossibleNotify(); } // Needs specialization to account for different processing of metadata @@ -680,48 +680,48 @@ class Server::CallbackRequest final : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, - req_->handler_data_, [this] { delete req_; })); + req_->handler_data_, [this] { delete req_; })); } }; - template <class CallAllocation> - void CommonSetup(Server* server, CallAllocation* data) { - server->Ref(); + template <class CallAllocation> + void CommonSetup(Server* server, CallAllocation* data) { + server->Ref(); grpc_metadata_array_init(&request_metadata_); - data->tag = &tag_; - data->call = &call_; - data->initial_metadata = &request_metadata_; + data->tag = &tag_; + data->call = &call_; + data->initial_metadata = &request_metadata_; } Server* const server_; grpc::internal::RpcServiceMethod* const method_; const bool has_request_payload_; - grpc_byte_buffer* request_payload_ = nullptr; - void* request_ = nullptr; - void* handler_data_ = nullptr; + grpc_byte_buffer* request_payload_ = nullptr; + void* request_ = nullptr; + void* handler_data_ = nullptr; grpc::Status request_status_; - grpc_call_details* const call_details_ = nullptr; + grpc_call_details* const call_details_ = nullptr; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; - grpc::CompletionQueue* const cq_; + grpc::CompletionQueue* const cq_; CallbackCallTag tag_; ServerContextType ctx_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; template <> -bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( - void** /*tag*/, bool* /*status*/) { +bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( + void** /*tag*/, bool* /*status*/) { return false; } template <> -bool Server::CallbackRequest< - grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/, - bool* status) { +bool Server::CallbackRequest< + grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/, + bool* status) { if (*status) { - deadline_ = call_details_->deadline; + deadline_ = call_details_->deadline; // TODO(yangg) remove the copy here ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); @@ -732,14 +732,14 @@ bool Server::CallbackRequest< } template <> -const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() - const { +const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() + const { return method_->name(); } template <> const char* Server::CallbackRequest< - grpc::GenericCallbackServerContext>::method_name() const { + grpc::GenericCallbackServerContext>::method_name() const { return ctx_.method().c_str(); } @@ -867,7 +867,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { static grpc::internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( - grpc::ChannelArguments* args, + grpc::ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, @@ -879,13 +879,13 @@ Server::Server( interceptor_creators) : acceptors_(std::move(acceptors)), interceptor_creators_(std::move(interceptor_creators)), - max_receive_message_size_(INT_MIN), + max_receive_message_size_(INT_MIN), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), shutdown_notified_(false), server_(nullptr), - server_initializer_(new ServerInitializer(this)), + server_initializer_(new ServerInitializer(this)), health_check_service_disabled_(false) { g_gli_initializer.summon(); gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); @@ -930,10 +930,10 @@ Server::Server( channel_args.args[i].value.pointer.p)); } } - if (0 == - strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) { - max_receive_message_size_ = channel_args.args[i].value.integer; - } + if (0 == + strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) { + max_receive_message_size_ = channel_args.args[i].value.integer; + } } server_ = grpc_server_create(&channel_args, nullptr); } @@ -955,10 +955,10 @@ Server::~Server() { } } } - // Destroy health check service before we destroy the C server so that - // it does not call grpc_server_request_registered_call() after the C - // server has been destroyed. - health_check_service_.reset(); + // Destroy health check service before we destroy the C server so that + // it does not call grpc_server_request_registered_call() after the C + // server has been destroyed. + health_check_service_.reset(); grpc_server_destroy(server_); } @@ -1005,7 +1005,7 @@ static grpc_server_register_method_payload_handling PayloadHandlingForMethod( GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); } -bool Server::RegisterService(const TString* host, grpc::Service* service) { +bool Server::RegisterService(const TString* host, grpc::Service* service) { bool has_async_methods = service->has_async_methods(); if (has_async_methods) { GPR_ASSERT(service->server_ == nullptr && @@ -1037,16 +1037,16 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { value->AddSyncMethod(method.get(), method_registration_tag); } } else { - has_callback_methods_ = true; - grpc::internal::RpcServiceMethod* method_value = method.get(); - grpc::CompletionQueue* cq = CallbackCQ(); - server_->core_server->SetRegisteredMethodAllocator( - cq->cq(), method_registration_tag, [this, cq, method_value] { - grpc_core::Server::RegisteredCallAllocation result; - new CallbackRequest<grpc::CallbackServerContext>(this, method_value, - cq, &result); - return result; - }); + has_callback_methods_ = true; + grpc::internal::RpcServiceMethod* method_value = method.get(); + grpc::CompletionQueue* cq = CallbackCQ(); + server_->core_server->SetRegisteredMethodAllocator( + cq->cq(), method_registration_tag, [this, cq, method_value] { + grpc_core::Server::RegisteredCallAllocation result; + new CallbackRequest<grpc::CallbackServerContext>(this, method_value, + cq, &result); + return result; + }); } method_name = method->name(); @@ -1055,10 +1055,10 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { // Parse service name. if (method_name != nullptr) { std::stringstream ss(method_name); - std::string service_name; + std::string service_name; if (std::getline(ss, service_name, '/') && std::getline(ss, service_name, '/')) { - services_.push_back(service_name.c_str()); + services_.push_back(service_name.c_str()); } } return true; @@ -1072,7 +1072,7 @@ void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { } void Server::RegisterCallbackGenericService( - grpc::CallbackGenericService* service) { + grpc::CallbackGenericService* service) { GPR_ASSERT( service->server_ == nullptr && "Can only register a callback generic service against one server."); @@ -1080,15 +1080,15 @@ void Server::RegisterCallbackGenericService( has_callback_generic_service_ = true; generic_handler_.reset(service->Handler()); - grpc::CompletionQueue* cq = CallbackCQ(); - server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] { - grpc_core::Server::BatchCallAllocation result; - new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); - return result; - }); + grpc::CompletionQueue* cq = CallbackCQ(); + server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] { + grpc_core::Server::BatchCallAllocation result; + new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); + return result; + }); } -int Server::AddListeningPort(const TString& addr, +int Server::AddListeningPort(const TString& addr, grpc::ServerCredentials* creds) { GPR_ASSERT(!started_); int port = creds->AddPortToServer(addr, server_); @@ -1096,31 +1096,31 @@ int Server::AddListeningPort(const TString& addr, return port; } -void Server::Ref() { - shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed); -} - -void Server::UnrefWithPossibleNotify() { - if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - // No refs outstanding means that shutdown has been initiated and no more - // callback requests are outstanding. - grpc::internal::MutexLock lock(&mu_); - GPR_ASSERT(shutdown_); - shutdown_done_ = true; - shutdown_done_cv_.Signal(); - } -} - -void Server::UnrefAndWaitLocked() { - if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - shutdown_done_ = true; - return; // no need to wait on CV since done condition already set - } - shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); -} - +void Server::Ref() { + shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed); +} + +void Server::UnrefWithPossibleNotify() { + if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + // No refs outstanding means that shutdown has been initiated and no more + // callback requests are outstanding. + grpc::internal::MutexLock lock(&mu_); + GPR_ASSERT(shutdown_); + shutdown_done_ = true; + shutdown_done_cv_.Signal(); + } +} + +void Server::UnrefAndWaitLocked() { + if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + shutdown_done_ = true; + return; // no need to wait on CV since done condition already set + } + shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); +} + void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); @@ -1156,17 +1156,17 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // If this server uses callback methods, then create a callback generic // service to handle any unimplemented methods using the default reactor // creator - if (has_callback_methods_ && !has_callback_generic_service_) { - unimplemented_service_.reset(new grpc::CallbackGenericService); + if (has_callback_methods_ && !has_callback_generic_service_) { + unimplemented_service_.reset(new grpc::CallbackGenericService); RegisterCallbackGenericService(unimplemented_service_.get()); } -#ifndef NDEBUG - for (size_t i = 0; i < num_cqs; i++) { - cq_list_.push_back(cqs[i]); - } -#endif - +#ifndef NDEBUG + for (size_t i = 0; i < num_cqs; i++) { + cq_list_.push_back(cqs[i]); + } +#endif + grpc_server_start(server_); if (!has_async_generic_service_ && !has_callback_generic_service_) { @@ -1248,8 +1248,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) { value->Wait(); } - // Drop the shutdown ref and wait for all other refs to drop as well. - UnrefAndWaitLocked(); + // Drop the shutdown ref and wait for all other refs to drop as well. + UnrefAndWaitLocked(); // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. @@ -1266,15 +1266,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { shutdown_notified_ = true; shutdown_cv_.Broadcast(); - -#ifndef NDEBUG - // Unregister this server with the CQs passed into it by the user so that - // those can be checked for properly-ordered shutdown. - for (auto* cq : cq_list_) { - cq->UnregisterServer(this); - } - cq_list_.clear(); -#endif + +#ifndef NDEBUG + // Unregister this server with the CQs passed into it by the user so that + // those can be checked for properly-ordered shutdown. + for (auto* cq : cq_list_) { + cq->UnregisterServer(this); + } + cq_list_.clear(); +#endif } void Server::Wait() { @@ -1294,9 +1294,9 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, if (GenericAsyncRequest::FinalizeResult(tag, status)) { // We either had no interceptors run or we are done intercepting if (*status) { - // Create a new request/response pair using the server and CQ values - // stored in this object's base class. - new UnimplementedAsyncRequest(server_, notification_cq_); + // Create a new request/response pair using the server and CQ values + // stored in this object's base class. + new UnimplementedAsyncRequest(server_, notification_cq_); new UnimplementedAsyncResponse(this); } else { delete this; @@ -1323,18 +1323,18 @@ grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered grpc::internal::MutexLock l(&mu_); - if (callback_cq_ != nullptr) { - return callback_cq_; - } - auto* shutdown_callback = new grpc::ShutdownCallback; - callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); - + if (callback_cq_ != nullptr) { + return callback_cq_; + } + auto* shutdown_callback = new grpc::ShutdownCallback; + callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + return callback_cq_; } -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index 458ac20d87..1b93e3229a 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -16,7 +16,7 @@ * */ -#include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/impl/codegen/server_context.h> #include <algorithm> #include <utility> @@ -27,7 +27,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpcpp/impl/call.h> -#include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/support/server_callback.h> #include <grpcpp/support/time.h> @@ -35,17 +35,17 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/surface/call.h" -namespace grpc { +namespace grpc { // CompletionOp class ServerContextBase::CompletionOp final - : public internal::CallOpSetInterface { + : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq // must ref the call before calling constructor and after deleting this - CompletionOp(internal::Call* call, - ::grpc::internal::ServerCallbackCall* callback_controller) + CompletionOp(internal::Call* call, + ::grpc::internal::ServerCallbackCall* callback_controller) : call_(*call), callback_controller_(callback_controller), has_tag_(false), @@ -68,7 +68,7 @@ class ServerContextBase::CompletionOp final } } - void FillOps(internal::Call* call) override; + void FillOps(internal::Call* call) override; // This should always be arena allocated in the call, so override delete. // But this class is not trivially destructible, so must actually call delete @@ -136,8 +136,8 @@ class ServerContextBase::CompletionOp final return finalized_ ? (cancelled_ != 0) : false; } - internal::Call call_; - ::grpc::internal::ServerCallbackCall* const callback_controller_; + internal::Call call_; + ::grpc::internal::ServerCallbackCall* const callback_controller_; bool has_tag_; void* tag_; void* core_cq_tag_; @@ -146,7 +146,7 @@ class ServerContextBase::CompletionOp final bool finalized_; int cancelled_; // This is an int (not bool) because it is passed to core bool done_intercepting_; - internal::InterceptorBatchMethodsImpl interceptor_methods_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; }; void ServerContextBase::CompletionOp::Unref() { @@ -157,7 +157,7 @@ void ServerContextBase::CompletionOp::Unref() { } } -void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { +void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { grpc_op ops; ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops.data.recv_close_on_server.cancelled = &cancelled_; @@ -174,31 +174,31 @@ void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { } bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { - // Decide whether to call the cancel callback within the lock - bool call_cancel; - - { - grpc_core::MutexLock lock(&mu_); - if (done_intercepting_) { - // We are done intercepting. - bool has_tag = has_tag_; - if (has_tag) { - *tag = tag_; - } - Unref(); - return has_tag; - } - finalized_ = true; - - // If for some reason the incoming status is false, mark that as a - // cancellation. - // TODO(vjpai): does this ever happen? - if (!*status) { - cancelled_ = 1; + // Decide whether to call the cancel callback within the lock + bool call_cancel; + + { + grpc_core::MutexLock lock(&mu_); + if (done_intercepting_) { + // We are done intercepting. + bool has_tag = has_tag_; + if (has_tag) { + *tag = tag_; + } + Unref(); + return has_tag; } - - call_cancel = (cancelled_ != 0); - // Release the lock since we may call a callback and interceptors. + finalized_ = true; + + // If for some reason the incoming status is false, mark that as a + // cancellation. + // TODO(vjpai): does this ever happen? + if (!*status) { + cancelled_ = 1; + } + + call_cancel = (cancelled_ != 0); + // Release the lock since we may call a callback and interceptors. } if (call_cancel && callback_controller_ != nullptr) { @@ -206,28 +206,28 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { } /* Add interception point and run through interceptors */ interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_CLOSE); + experimental::InterceptionHookPoints::POST_RECV_CLOSE); if (interceptor_methods_.RunInterceptors()) { - // No interceptors were run - bool has_tag = has_tag_; - if (has_tag) { + // No interceptors were run + bool has_tag = has_tag_; + if (has_tag) { *tag = tag_; } Unref(); - return has_tag; + return has_tag; } - // There are interceptors to be run. Return false for now. + // There are interceptors to be run. Return false for now. return false; } // ServerContextBase body -ServerContextBase::ServerContextBase() - : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {} +ServerContextBase::ServerContextBase() + : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {} ServerContextBase::ServerContextBase(gpr_timespec deadline, - grpc_metadata_array* arr) - : deadline_(deadline) { + grpc_metadata_array* arr) + : deadline_(deadline) { std::swap(*client_metadata_.arr(), *arr); } @@ -237,7 +237,7 @@ void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, std::swap(*client_metadata_.arr(), *arr); } -ServerContextBase::~ServerContextBase() { +ServerContextBase::~ServerContextBase() { if (completion_op_) { completion_op_->Unref(); } @@ -245,21 +245,21 @@ ServerContextBase::~ServerContextBase() { rpc_info_->Unref(); } if (default_reactor_used_.load(std::memory_order_relaxed)) { - reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); - } -} - -ServerContextBase::CallWrapper::~CallWrapper() { - if (call) { - // If the ServerContext is part of the call's arena, this could free the - // object itself. - grpc_call_unref(call); + reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); } } +ServerContextBase::CallWrapper::~CallWrapper() { + if (call) { + // If the ServerContext is part of the call's arena, this could free the + // object itself. + grpc_call_unref(call); + } +} + void ServerContextBase::BeginCompletionOp( - internal::Call* call, std::function<void(bool)> callback, - ::grpc::internal::ServerCallbackCall* callback_controller) { + internal::Call* call, std::function<void(bool)> callback, + ::grpc::internal::ServerCallbackCall* callback_controller) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -279,30 +279,30 @@ void ServerContextBase::BeginCompletionOp( call->PerformOps(completion_op_); } -internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() { - return static_cast<internal::CompletionQueueTag*>(completion_op_); +internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() { + return static_cast<internal::CompletionQueueTag*>(completion_op_); } -void ServerContextBase::AddInitialMetadata(const TString& key, - const TString& value) { +void ServerContextBase::AddInitialMetadata(const TString& key, + const TString& value) { initial_metadata_.insert(std::make_pair(key, value)); } -void ServerContextBase::AddTrailingMetadata(const TString& key, - const TString& value) { +void ServerContextBase::AddTrailingMetadata(const TString& key, + const TString& value) { trailing_metadata_.insert(std::make_pair(key, value)); } void ServerContextBase::TryCancel() const { - internal::CancelInterceptorBatchMethods cancel_methods; + internal::CancelInterceptorBatchMethods cancel_methods; if (rpc_info_) { for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { rpc_info_->RunInterceptor(&cancel_methods, i); } } - grpc_call_error err = - grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED, - "Cancelled on the server side", nullptr); + grpc_call_error err = + grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED, + "Cancelled on the server side", nullptr); if (err != GRPC_CALL_OK) { gpr_log(GPR_ERROR, "TryCancel failed with: %d", err); } @@ -335,10 +335,10 @@ void ServerContextBase::set_compression_algorithm( AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name); } -TString ServerContextBase::peer() const { - TString peer; - if (call_.call) { - char* c_peer = grpc_call_get_peer(call_.call); +TString ServerContextBase::peer() const { + TString peer; + if (call_.call) { + char* c_peer = grpc_call_get_peer(call_.call); peer = c_peer; gpr_free(c_peer); } @@ -346,16 +346,16 @@ TString ServerContextBase::peer() const { } const struct census_context* ServerContextBase::census_context() const { - return call_.call == nullptr ? nullptr - : grpc_census_call_get_context(call_.call); + return call_.call == nullptr ? nullptr + : grpc_census_call_get_context(call_.call); } void ServerContextBase::SetLoadReportingCosts( - const std::vector<TString>& cost_data) { - if (call_.call == nullptr) return; + const std::vector<TString>& cost_data) { + if (call_.call == nullptr) return; for (const auto& cost_datum : cost_data) { AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum); } } -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_credentials.cc b/contrib/libs/grpc/src/cpp/server/server_credentials.cc index c3b3a8b379..449e583abd 100644 --- a/contrib/libs/grpc/src/cpp/server/server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/server_credentials.cc @@ -16,10 +16,10 @@ * */ -#include <grpcpp/security/server_credentials.h> +#include <grpcpp/security/server_credentials.h> -namespace grpc { +namespace grpc { ServerCredentials::~ServerCredentials() {} -} // namespace grpc +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_posix.cc b/contrib/libs/grpc/src/cpp/server/server_posix.cc index c3d40d4fa2..3aaa6db0ea 100644 --- a/contrib/libs/grpc/src/cpp/server/server_posix.cc +++ b/contrib/libs/grpc/src/cpp/server/server_posix.cc @@ -20,7 +20,7 @@ #include <grpc/grpc_posix.h> -namespace grpc { +namespace grpc { #ifdef GPR_SUPPORT_CHANNELS_FROM_FD @@ -30,4 +30,4 @@ void AddInsecureChannelFromFd(grpc::Server* server, int fd) { #endif // GPR_SUPPORT_CHANNELS_FROM_FD -} // namespace grpc +} // namespace grpc |