diff options
author | neksard <neksard@yandex-team.ru> | 2022-02-10 16:45:33 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:33 +0300 |
commit | 1d9c550e7c38e051d7961f576013a482003a70d9 (patch) | |
tree | b2cc84ee7850122e7ccf51d0ea21e4fa7e7a5685 /contrib/libs/grpc/src/cpp | |
parent | 8f7cf138264e0caa318144bf8a2c950e0b0a8593 (diff) | |
download | ydb-1d9c550e7c38e051d7961f576013a482003a70d9.tar.gz |
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp')
40 files changed, 2184 insertions, 2184 deletions
diff --git a/contrib/libs/grpc/src/cpp/README.md b/contrib/libs/grpc/src/cpp/README.md index 9405d38d927..967a0a43b7f 100755 --- a/contrib/libs/grpc/src/cpp/README.md +++ b/contrib/libs/grpc/src/cpp/README.md @@ -2,44 +2,44 @@ This directory contains the C++ implementation of gRPC. -# To start using gRPC C++ +# To start using gRPC C++ This section describes how to add gRPC as a dependency to your C++ project. -In the C++ world, there's no universally accepted standard for managing project dependencies. -Therefore, gRPC supports several major build systems, which should satisfy most users. +In the C++ world, there's no universally accepted standard for managing project dependencies. +Therefore, gRPC supports several major build systems, which should satisfy most users. ## Bazel - + Bazel is the primary build system used by the core gRPC development team. Bazel provides fast builds and it easily handles dependencies that support bazel. - -To add gRPC as a dependency in bazel: -1. determine commit SHA for the grpc release you want to use + +To add gRPC as a dependency in bazel: +1. determine commit SHA for the grpc release you want to use 2. Use the [http_archive](https://docs.bazel.build/versions/master/repo/http.html#http_archive) bazel rule to include gRPC source - ``` - http_archive( - name = "com_github_grpc_grpc", - urls = [ - "https://github.com/grpc/grpc/archive/YOUR_GRPC_COMMIT_SHA.tar.gz", - ], - strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA", - ) - - load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps") - - grpc_deps() - ``` - + ``` + http_archive( + name = "com_github_grpc_grpc", + urls = [ + "https://github.com/grpc/grpc/archive/YOUR_GRPC_COMMIT_SHA.tar.gz", + ], + strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA", + ) + + load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps") + + grpc_deps() + ``` + ## CMake - + `cmake` is your best option if you cannot use bazel. It supports building on Linux, MacOS and Windows (official support) but also has a good chance of working on other platforms (no promises!). `cmake` has good support for crosscompiling and can be used for targeting the Android platform. - + To build gRPC C++ from source, follow the [BUILDING guide](../../BUILDING.md). - + ### find_package The canonical way to discover dependencies in CMake is the @@ -127,22 +127,22 @@ first install gRPC C++ using CMake, and have your non-CMake project rely on the The default choice for building on UNIX based systems used to be `make`, but we are no longer recommending it. You should use `bazel` or `cmake` instead. -To install gRPC for C++ on your system using `make`, follow the [Building gRPC C++](../../BUILDING.md) -instructions to build from source and then install locally using `make install`. -This also installs the protocol buffer compiler `protoc` (if you don't have it already), -and the C++ gRPC plugin for `protoc`. - -WARNING: After installing with `make install` there is no easy way to uninstall, which can cause issues -if you later want to remove the grpc and/or protobuf installation or upgrade to a newer version. - -## Packaging systems - +To install gRPC for C++ on your system using `make`, follow the [Building gRPC C++](../../BUILDING.md) +instructions to build from source and then install locally using `make install`. +This also installs the protocol buffer compiler `protoc` (if you don't have it already), +and the C++ gRPC plugin for `protoc`. + +WARNING: After installing with `make install` there is no easy way to uninstall, which can cause issues +if you later want to remove the grpc and/or protobuf installation or upgrade to a newer version. + +## Packaging systems + We do not officially support any packaging system for C++, but there are some community-maintained packages that are kept up-to-date and are known to work well. More contributions and support for popular packaging systems are welcome! - + ### Install using vcpkg package gRPC is available using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager: - + ``` # install vcpkg package manager on your system using the official instructions git clone https://github.com/Microsoft/vcpkg.git @@ -157,8 +157,8 @@ vcpkg install grpc The gRPC port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository. -## Examples & Additional Documentation - +## Examples & Additional Documentation + You can find out how to build and run our simplest gRPC C++ example in our [C++ quick start](../../examples/cpp). @@ -175,6 +175,6 @@ documentation site at [grpc.io](https://grpc.io), specifically: APIs. -# To start developing gRPC C++ +# To start developing gRPC C++ -For instructions on how to build gRPC C++ from source, follow the [Building gRPC C++](../../BUILDING.md) instructions. +For instructions on how to build gRPC C++ from source, follow the [Building gRPC C++](../../BUILDING.md) instructions. diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc index fab05f2ac47..ac95c29efcd 100644 --- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc +++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc @@ -18,7 +18,7 @@ #include <grpcpp/channel.h> -#include <cstring> +#include <cstring> #include <memory> #include <grpc/grpc.h> @@ -30,7 +30,7 @@ #include <grpcpp/client_context.h> #include <grpcpp/completion_queue.h> #include <grpcpp/impl/call.h> -#include <grpcpp/impl/codegen/call_op_set.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_method.h> @@ -39,7 +39,7 @@ #include <grpcpp/support/config.h> #include <grpcpp/support/status.h> #include "src/core/lib/gpr/string.h" -#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/completion_queue.h" namespace grpc { @@ -49,23 +49,23 @@ Channel::Channel(const TString& host, grpc_channel* channel, ::grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) : host_(host), c_channel_(channel) { - interceptor_creators_ = std::move(interceptor_creators); + interceptor_creators_ = std::move(interceptor_creators); g_gli_initializer.summon(); } -Channel::~Channel() { - grpc_channel_destroy(c_channel_); - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - } -} +Channel::~Channel() { + grpc_channel_destroy(c_channel_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } +} namespace { -inline grpc_slice SliceFromArray(const char* arr, size_t len) { +inline grpc_slice SliceFromArray(const char* arr, size_t len) { return g_core_codegen_interface->grpc_slice_from_copied_buffer(arr, len); -} - +} + TString GetChannelInfoField(grpc_channel* channel, grpc_channel_info* channel_info, char*** channel_info_field) { @@ -93,14 +93,14 @@ TString Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -namespace experimental { - -void ChannelResetConnectionBackoff(Channel* channel) { - grpc_channel_reset_connect_backoff(channel->c_channel_); -} - -} // namespace experimental - +namespace experimental { + +void ChannelResetConnectionBackoff(Channel* channel) { + grpc_channel_reset_connect_backoff(channel->c_channel_); +} + +} // namespace experimental + ::grpc::internal::Call Channel::CreateCallInternal( const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, size_t interceptor_pos) { @@ -113,13 +113,13 @@ void ChannelResetConnectionBackoff(Channel* channel) { method.channel_tag(), context->raw_deadline(), nullptr); } else { const ::TString* host_str = nullptr; - if (!context->authority_.empty()) { - host_str = &context->authority_; + if (!context->authority_.empty()) { + host_str = &context->authority_; } else if (!host_.empty()) { - host_str = &host_; + host_str = &host_; } - grpc_slice method_slice = - SliceFromArray(method.name(), strlen(method.name())); + grpc_slice method_slice = + SliceFromArray(method.name(), strlen(method.name())); grpc_slice host_slice; if (host_str != nullptr) { host_slice = ::grpc::SliceFromCopiedString(*host_str); @@ -135,28 +135,28 @@ void ChannelResetConnectionBackoff(Channel* channel) { } } grpc_census_call_set_context(c_call, context->census_context()); - - // ClientRpcInfo should be set before call because set_call also checks - // whether the call has been cancelled, and if the call was cancelled, we + + // ClientRpcInfo should be set before call because set_call also checks + // whether the call has been cancelled, and if the call was cancelled, we // should notify the interceptors too. - auto* info = - context->set_client_rpc_info(method.name(), method.method_type(), this, - interceptor_creators_, interceptor_pos); + auto* info = + context->set_client_rpc_info(method.name(), method.method_type(), this, + interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); - + return ::grpc::internal::Call(c_call, this, cq, info); } ::grpc::internal::Call Channel::CreateCall( const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, CompletionQueue* cq) { - return CreateCallInternal(method, context, cq, 0); -} - + return CreateCallInternal(method, context, cq, 0); +} + void Channel::PerformOpsOnCall(::grpc::internal::CallOpSetInterface* ops, ::grpc::internal::Call* call) { - ops->FillOps( - call); // Make a copy of call. It's fine since Call just has pointers + ops->FillOps( + call); // Make a copy of call. It's fine since Call just has pointers } void* Channel::RegisterMethod(const char* method) { @@ -205,9 +205,9 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, return ok; } -namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { - public: +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: ShutdownCallback() { functor_run = &ShutdownCallback::Run; // Set inlineable to true since this callback is trivial and thus does not @@ -216,37 +216,37 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { // code. inlineable = true; } - // TakeCQ takes ownership of the cq into the shutdown callback - // so that the shutdown callback will be responsible for destroying it + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it void TakeCQ(::grpc::CompletionQueue* cq) { cq_ = cq; } - - // The Run function will get invoked by the completion queue library - // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { - auto* callback = static_cast<ShutdownCallback*>(cb); - delete callback->cq_; - delete callback; - } - - private: + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: ::grpc::CompletionQueue* cq_ = nullptr; -}; -} // namespace - +}; +} // namespace + ::grpc::CompletionQueue* Channel::CallbackCQ() { - // TODO(vjpai): Consider using a single global CQ for the default CQ - // if there is no explicit per-channel CQ registered + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-channel CQ registered grpc::internal::MutexLock l(&mu_); - if (callback_cq_ == nullptr) { - auto* shutdown_callback = new ShutdownCallback; + if (callback_cq_ == nullptr) { + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); - } - return callback_cq_; -} - + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +} + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/client_context.cc b/contrib/libs/grpc/src/cpp/client/client_context.cc index b8b123ef379..b75343d0895 100644 --- a/contrib/libs/grpc/src/cpp/client/client_context.cc +++ b/contrib/libs/grpc/src/cpp/client/client_context.cc @@ -24,7 +24,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include <grpcpp/impl/codegen/interceptor_common.h> +#include <grpcpp/impl/codegen/interceptor_common.h> #include <grpcpp/impl/codegen/sync.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/security/credentials.h> @@ -44,10 +44,10 @@ class DefaultGlobalClientCallbacks final }; static internal::GrpcLibraryInitializer g_gli_initializer; -static DefaultGlobalClientCallbacks* g_default_client_callbacks = - new DefaultGlobalClientCallbacks(); +static DefaultGlobalClientCallbacks* g_default_client_callbacks = + new DefaultGlobalClientCallbacks(); static ClientContext::GlobalCallbacks* g_client_callbacks = - g_default_client_callbacks; + g_default_client_callbacks; ClientContext::ClientContext() : initial_metadata_received_(false), @@ -60,7 +60,7 @@ ClientContext::ClientContext() deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), census_context_(nullptr), propagate_from_call_(nullptr), - compression_algorithm_(GRPC_COMPRESS_NONE), + compression_algorithm_(GRPC_COMPRESS_NONE), initial_metadata_corked_(false) { g_client_callbacks->DefaultConstructor(this); } @@ -119,13 +119,13 @@ void ClientContext::set_call(grpc_call* call, call_ = call; channel_ = channel; if (creds_ && !creds_->ApplyToCall(call_)) { - // TODO(yashykt): should interceptors also see this status? - SendCancelToInterceptors(); + // TODO(yashykt): should interceptors also see this status? + SendCancelToInterceptors(); grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Failed to set credentials to rpc.", nullptr); } if (call_canceled_) { - SendCancelToInterceptors(); + SendCancelToInterceptors(); grpc_call_cancel(call_, nullptr); } } @@ -146,20 +146,20 @@ void ClientContext::set_compression_algorithm( void ClientContext::TryCancel() { internal::MutexLock lock(&mu_); if (call_) { - SendCancelToInterceptors(); + SendCancelToInterceptors(); grpc_call_cancel(call_, nullptr); } else { call_canceled_ = true; } } -void ClientContext::SendCancelToInterceptors() { +void ClientContext::SendCancelToInterceptors() { internal::CancelInterceptorBatchMethods cancel_methods; - for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) { - rpc_info_.RunInterceptor(&cancel_methods, i); - } -} - + for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) { + rpc_info_.RunInterceptor(&cancel_methods, i); + } +} + TString ClientContext::peer() const { TString peer; if (call_) { @@ -171,9 +171,9 @@ TString ClientContext::peer() const { } void ClientContext::SetGlobalCallbacks(GlobalCallbacks* client_callbacks) { - GPR_ASSERT(g_client_callbacks == g_default_client_callbacks); + GPR_ASSERT(g_client_callbacks == g_default_client_callbacks); GPR_ASSERT(client_callbacks != nullptr); - GPR_ASSERT(client_callbacks != g_default_client_callbacks); + GPR_ASSERT(client_callbacks != g_default_client_callbacks); g_client_callbacks = client_callbacks; } diff --git a/contrib/libs/grpc/src/cpp/client/client_interceptor.cc b/contrib/libs/grpc/src/cpp/client/client_interceptor.cc index 9d98a60171b..a91950cae2d 100644 --- a/contrib/libs/grpc/src/cpp/client/client_interceptor.cc +++ b/contrib/libs/grpc/src/cpp/client/client_interceptor.cc @@ -1,44 +1,44 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpcpp/impl/codegen/client_interceptor.h> - -namespace grpc { - -namespace internal { -experimental::ClientInterceptorFactoryInterface* - g_global_client_interceptor_factory = nullptr; -} - -namespace experimental { -void RegisterGlobalClientInterceptorFactory( - ClientInterceptorFactoryInterface* factory) { +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpcpp/impl/codegen/client_interceptor.h> + +namespace grpc { + +namespace internal { +experimental::ClientInterceptorFactoryInterface* + g_global_client_interceptor_factory = nullptr; +} + +namespace experimental { +void RegisterGlobalClientInterceptorFactory( + ClientInterceptorFactoryInterface* factory) { if (internal::g_global_client_interceptor_factory != nullptr) { GPR_ASSERT(false && "It is illegal to call RegisterGlobalClientInterceptorFactory " "multiple times."); } - internal::g_global_client_interceptor_factory = factory; -} + internal::g_global_client_interceptor_factory = factory; +} // For testing purposes only. void TestOnlyResetGlobalClientInterceptorFactory() { internal::g_global_client_interceptor_factory = nullptr; } -} // namespace experimental -} // namespace grpc +} // namespace experimental +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel.cc b/contrib/libs/grpc/src/cpp/client/create_channel.cc index 941c2e92902..97327490ed2 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel.cc +++ b/contrib/libs/grpc/src/cpp/client/create_channel.cc @@ -41,45 +41,45 @@ std::shared_ptr<grpc::Channel> CreateCustomChannel( init_lib; // We need to call init in case of bad creds. return creds ? creds->CreateChannelImpl(target, args) : grpc::CreateChannelInternal( - "", - grpc_lame_client_channel_create( - nullptr, GRPC_STATUS_INVALID_ARGUMENT, - "Invalid credentials."), - std::vector<std::unique_ptr< + "", + grpc_lame_client_channel_create( + nullptr, GRPC_STATUS_INVALID_ARGUMENT, + "Invalid credentials."), + std::vector<std::unique_ptr< grpc::experimental:: ClientInterceptorFactoryInterface>>()); } -namespace experimental { -/// Create a new \em custom \a Channel pointing to \a target with \a -/// interceptors being invoked per call. -/// -/// \warning For advanced use and testing ONLY. Override default channel -/// arguments only if necessary. -/// -/// \param target The URI of the endpoint to connect to. -/// \param creds Credentials to use for the created channel. If it does not -/// hold an object or is invalid, a lame channel (one on which all operations -/// fail) is returned. -/// \param args Options for channel creation. +namespace experimental { +/// Create a new \em custom \a Channel pointing to \a target with \a +/// interceptors being invoked per call. +/// +/// \warning For advanced use and testing ONLY. Override default channel +/// arguments only if necessary. +/// +/// \param target The URI of the endpoint to connect to. +/// \param creds Credentials to use for the created channel. If it does not +/// hold an object or is invalid, a lame channel (one on which all operations +/// fail) is returned. +/// \param args Options for channel creation. std::shared_ptr<grpc::Channel> CreateCustomChannelWithInterceptors( const TString& target, const std::shared_ptr<grpc::ChannelCredentials>& creds, const grpc::ChannelArguments& args, - std::vector< + std::vector< std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { + interceptor_creators) { grpc::GrpcLibraryCodegen init_lib; // We need to call init in case of bad creds. - return creds ? creds->CreateChannelWithInterceptors( - target, args, std::move(interceptor_creators)) + return creds ? creds->CreateChannelWithInterceptors( + target, args, std::move(interceptor_creators)) : grpc::CreateChannelInternal( - "", - grpc_lame_client_channel_create( - nullptr, GRPC_STATUS_INVALID_ARGUMENT, - "Invalid credentials."), + "", + grpc_lame_client_channel_create( + nullptr, GRPC_STATUS_INVALID_ARGUMENT, + "Invalid credentials."), std::move(interceptor_creators)); -} -} // namespace experimental - +} +} // namespace experimental + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc b/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc index 8e85dbc7abb..da2a878a227 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc +++ b/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc @@ -24,13 +24,13 @@ struct grpc_channel; namespace grpc { -std::shared_ptr<Channel> CreateChannelInternal( +std::shared_ptr<Channel> CreateChannelInternal( const TString& host, grpc_channel* c_channel, std::vector<std::unique_ptr< ::grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { - return std::shared_ptr<Channel>( - new Channel(host, c_channel, std::move(interceptor_creators))); + interceptor_creators) { + return std::shared_ptr<Channel>( + new Channel(host, c_channel, std::move(interceptor_creators))); } } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_internal.h b/contrib/libs/grpc/src/cpp/client/create_channel_internal.h index d19a63b3c78..09d4e56b023 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel_internal.h +++ b/contrib/libs/grpc/src/cpp/client/create_channel_internal.h @@ -22,18 +22,18 @@ #include <memory> #include <grpcpp/channel.h> -#include <grpcpp/impl/codegen/client_interceptor.h> +#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/support/config.h> struct grpc_channel; namespace grpc { -std::shared_ptr<Channel> CreateChannelInternal( +std::shared_ptr<Channel> CreateChannelInternal( const TString& host, grpc_channel* c_channel, std::vector<std::unique_ptr< ::grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators); + interceptor_creators); } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc b/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc index a10abb759e8..db09eda8a66 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc +++ b/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc @@ -35,7 +35,7 @@ std::shared_ptr<Channel> CreateInsecureChannelFromFd(const TString& target, grpc::internal::GrpcLibrary init_lib; init_lib.init(); return CreateChannelInternal( - "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, nullptr), + "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, nullptr), std::vector< std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } @@ -47,31 +47,31 @@ std::shared_ptr<Channel> CreateCustomInsecureChannelFromFd( grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); return CreateChannelInternal( - "", - grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args), + "", + grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args), std::vector< std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } -namespace experimental { - +namespace experimental { + std::shared_ptr<Channel> CreateCustomInsecureChannelWithInterceptorsFromFd( const TString& target, int fd, const ChannelArguments& args, - std::vector< + std::vector< std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { + interceptor_creators) { grpc::internal::GrpcLibrary init_lib; - init_lib.init(); - grpc_channel_args channel_args; - args.SetChannelArgs(&channel_args); + init_lib.init(); + grpc_channel_args channel_args; + args.SetChannelArgs(&channel_args); return CreateChannelInternal( - "", - grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args), - std::move(interceptor_creators)); -} - -} // namespace experimental - + "", + grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args), + std::move(interceptor_creators)); +} + +} // namespace experimental + #endif // GPR_SUPPORT_CHANNELS_FROM_FD } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc index ac429445200..e5bafff70af 100644 --- a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc +++ b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc @@ -31,23 +31,23 @@ class InsecureChannelCredentialsImpl final : public ChannelCredentials { public: std::shared_ptr<Channel> CreateChannelImpl( const TString& target, const ChannelArguments& args) override { - return CreateChannelWithInterceptors( - target, args, - std::vector<std::unique_ptr< + return CreateChannelWithInterceptors( + target, args, + std::vector<std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface>>()); - } - + } + std::shared_ptr<Channel> CreateChannelWithInterceptors( const TString& target, const ChannelArguments& args, std::vector<std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) override { + interceptor_creators) override { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); return ::grpc::CreateChannelInternal( "", - grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr), - std::move(interceptor_creators)); + grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr), + std::move(interceptor_creators)); } SecureChannelCredentials* AsSecureCredentials() override { return nullptr; } diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc index cb9ffd3e846..0f6db3caa53 100644 --- a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc +++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc @@ -49,25 +49,25 @@ SecureChannelCredentials::SecureChannelCredentials( std::shared_ptr<Channel> SecureChannelCredentials::CreateChannelImpl( const TString& target, const ChannelArguments& args) { - return CreateChannelWithInterceptors( - target, args, + return CreateChannelWithInterceptors( + target, args, std::vector<std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface>>()); -} - +} + std::shared_ptr<Channel> -SecureChannelCredentials::CreateChannelWithInterceptors( +SecureChannelCredentials::CreateChannelWithInterceptors( const TString& target, const ChannelArguments& args, - std::vector< + std::vector< std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { + interceptor_creators) { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); return ::grpc::CreateChannelInternal( args.GetSslTargetNameOverride(), grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args, - nullptr), - std::move(interceptor_creators)); + nullptr), + std::move(interceptor_creators)); } SecureCallCredentials::SecureCallCredentials(grpc_call_credentials* c_creds) @@ -110,8 +110,8 @@ std::shared_ptr<ChannelCredentials> SslCredentials( grpc_channel_credentials* c_creds = grpc_ssl_credentials_create( options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), - options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr, - nullptr); + options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr, + nullptr); return WrapChannelCredentials(c_creds); } @@ -280,13 +280,13 @@ std::shared_ptr<ChannelCredentials> AltsCredentials( return WrapChannelCredentials(c_creds); } -// Builds Local Credentials -std::shared_ptr<ChannelCredentials> LocalCredentials( - grpc_local_connect_type type) { +// Builds Local Credentials +std::shared_ptr<ChannelCredentials> LocalCredentials( + grpc_local_connect_type type) { grpc::GrpcLibraryCodegen init; // To call grpc_init(). - return WrapChannelCredentials(grpc_local_credentials_create(type)); -} - + return WrapChannelCredentials(grpc_local_credentials_create(type)); +} + // Builds TLS Credentials given TLS options. std::shared_ptr<ChannelCredentials> TlsCredentials( const TlsCredentialsOptions& options) { @@ -433,10 +433,10 @@ int MetadataCredentialsPluginWrapper::GetMetadata( grpc_auth_metadata_context_copy(&context, &context_copy); // Asynchronous return. w->thread_pool_->Add([w, context_copy, cb, user_data]() mutable { - w->MetadataCredentialsPluginWrapper::InvokePlugin( + w->MetadataCredentialsPluginWrapper::InvokePlugin( context_copy, cb, user_data, nullptr, nullptr, nullptr, nullptr); grpc_auth_metadata_context_reset(&context_copy); - }); + }); return 0; } else { // Synchronous return. @@ -463,10 +463,10 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( grpc_status_code* status_code, const char** error_details) { std::multimap<TString, TString> metadata; - // const_cast is safe since the SecureAuthContext only inc/dec the refcount - // and the object is passed as a const ref to plugin_->GetMetadata. + // const_cast is safe since the SecureAuthContext only inc/dec the refcount + // and the object is passed as a const ref to plugin_->GetMetadata. SecureAuthContext cpp_channel_auth_context( - const_cast<grpc_auth_context*>(context.channel_auth_context)); + const_cast<grpc_auth_context*>(context.channel_auth_context)); Status status = plugin_->GetMetadata(context.service_url, context.method_name, cpp_channel_auth_context, &metadata); diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.h b/contrib/libs/grpc/src/cpp/client/secure_credentials.h index 329c003474f..4fc79346bf4 100644 --- a/contrib/libs/grpc/src/cpp/client/secure_credentials.h +++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.h @@ -26,7 +26,7 @@ #include <grpcpp/support/config.h> #include "y_absl/strings/str_cat.h" -#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/credentials/credentials.h" #include "src/cpp/server/thread_pool_interface.h" namespace grpc { @@ -36,14 +36,14 @@ class Channel; class SecureChannelCredentials final : public ChannelCredentials { public: explicit SecureChannelCredentials(grpc_channel_credentials* c_creds); - ~SecureChannelCredentials() { - if (c_creds_ != nullptr) c_creds_->Unref(); - } + ~SecureChannelCredentials() { + if (c_creds_ != nullptr) c_creds_->Unref(); + } grpc_channel_credentials* GetRawCreds() { return c_creds_; } std::shared_ptr<Channel> CreateChannelImpl( const TString& target, const ChannelArguments& args) override; - + SecureChannelCredentials* AsSecureCredentials() override { return this; } private: @@ -51,16 +51,16 @@ class SecureChannelCredentials final : public ChannelCredentials { const TString& target, const ChannelArguments& args, std::vector<std::unique_ptr< ::grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) override; + interceptor_creators) override; grpc_channel_credentials* const c_creds_; }; class SecureCallCredentials final : public CallCredentials { public: explicit SecureCallCredentials(grpc_call_credentials* c_creds); - ~SecureCallCredentials() { - if (c_creds_ != nullptr) c_creds_->Unref(); - } + ~SecureCallCredentials() { + if (c_creds_ != nullptr) c_creds_->Unref(); + } grpc_call_credentials* GetRawCreds() { return c_creds_; } bool ApplyToCall(grpc_call* call) override; diff --git a/contrib/libs/grpc/src/cpp/common/alarm.cc b/contrib/libs/grpc/src/cpp/common/alarm.cc index 83de2f2d3f1..a2612874b20 100644 --- a/contrib/libs/grpc/src/cpp/common/alarm.cc +++ b/contrib/libs/grpc/src/cpp/common/alarm.cc @@ -35,25 +35,25 @@ namespace grpc { namespace internal { -class AlarmImpl : public ::grpc::internal::CompletionQueueTag { +class AlarmImpl : public ::grpc::internal::CompletionQueueTag { public: AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); - } - ~AlarmImpl() {} + } + ~AlarmImpl() {} bool FinalizeResult(void** tag, bool* /*status*/) override { - *tag = tag_; - Unref(); - return true; - } - void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); - cq_ = cq->cq(); - tag_ = tag; - GPR_ASSERT(grpc_cq_begin_op(cq_, this)); + *tag = tag_; + Unref(); + return true; + } + void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); + cq_ = cq->cq(); + tag_ = tag; + GPR_ASSERT(grpc_cq_begin_op(cq_, this)); GRPC_CLOSURE_INIT( &on_alarm_, [](void* arg, grpc_error* error) { @@ -71,17 +71,17 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); }, this, grpc_schedule_on_exec_ctx); - grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), - &on_alarm_); + grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), + &on_alarm_); } - void Set(gpr_timespec deadline, std::function<void(bool)> f) { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + void Set(gpr_timespec deadline, std::function<void(bool)> f) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - // Don't use any CQ at all. Instead just use the timer to fire the function - callback_ = std::move(f); - Ref(); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { + // Don't use any CQ at all. Instead just use the timer to fire the function + callback_ = std::move(f); + Ref(); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { grpc_core::Executor::Run( GRPC_CLOSURE_CREATE( [](void* arg, grpc_error* error) { @@ -92,13 +92,13 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { }, arg, nullptr), error); - }, - this, grpc_schedule_on_exec_ctx); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } void Cancel() { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_timer_cancel(&timer_); } @@ -122,18 +122,18 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; void* tag_; - std::function<void(bool)> callback_; + std::function<void(bool)> callback_; }; } // namespace internal -static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer; Alarm::Alarm() : alarm_(new internal::AlarmImpl()) { g_gli_initializer.summon(); } -void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline, - void* tag) { +void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline, + void* tag) { // Note that we know that alarm_ is actually an internal::AlarmImpl // but we declared it as the base pointer to avoid a forward declaration // or exposing core data structures in the C++ public headers. @@ -142,15 +142,15 @@ void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline, static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag); } -void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) { - // Note that we know that alarm_ is actually an internal::AlarmImpl - // but we declared it as the base pointer to avoid a forward declaration - // or exposing core data structures in the C++ public headers. - // Thus it is safe to use a static_cast to the subclass here, and the - // C++ style guide allows us to do so in this case - static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f)); -} - +void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) { + // Note that we know that alarm_ is actually an internal::AlarmImpl + // but we declared it as the base pointer to avoid a forward declaration + // or exposing core data structures in the C++ public headers. + // Thus it is safe to use a static_cast to the subclass here, and the + // C++ style guide allows us to do so in this case + static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f)); +} + Alarm::~Alarm() { if (alarm_ != nullptr) { static_cast<internal::AlarmImpl*>(alarm_)->Destroy(); diff --git a/contrib/libs/grpc/src/cpp/common/channel_arguments.cc b/contrib/libs/grpc/src/cpp/common/channel_arguments.cc index 23be69f72a6..5a5dd91b5ec 100644 --- a/contrib/libs/grpc/src/cpp/common/channel_arguments.cc +++ b/contrib/libs/grpc/src/cpp/common/channel_arguments.cc @@ -108,7 +108,7 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { if (!replaced) { strings_.push_back(TString(mutator_arg.key)); args_.push_back(mutator_arg); - args_.back().key = const_cast<char*>(strings_.back().c_str()); + args_.back().key = const_cast<char*>(strings_.back().c_str()); } } diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.h b/contrib/libs/grpc/src/cpp/common/channel_filter.h index fab1c7744fe..5ce720b3075 100644 --- a/contrib/libs/grpc/src/cpp/common/channel_filter.h +++ b/contrib/libs/grpc/src/cpp/common/channel_filter.h @@ -207,18 +207,18 @@ class TransportStreamOpBatch { op_->payload->context[GRPC_CONTEXT_TRACING].value); } - const gpr_atm* get_peer_string() const { - if (op_->send_initial_metadata && - op_->payload->send_initial_metadata.peer_string != nullptr) { - return op_->payload->send_initial_metadata.peer_string; - } else if (op_->recv_initial_metadata && - op_->payload->recv_initial_metadata.peer_string != nullptr) { - return op_->payload->recv_initial_metadata.peer_string; - } else { - return nullptr; - } - } - + const gpr_atm* get_peer_string() const { + if (op_->send_initial_metadata && + op_->payload->send_initial_metadata.peer_string != nullptr) { + return op_->payload->send_initial_metadata.peer_string; + } else if (op_->recv_initial_metadata && + op_->payload->recv_initial_metadata.peer_string != nullptr) { + return op_->payload->recv_initial_metadata.peer_string; + } else { + return nullptr; + } + } + private: grpc_transport_stream_op_batch* op_; // Not owned. MetadataBatch send_initial_metadata_; @@ -366,11 +366,11 @@ void ChannelFilterPluginShutdown(); /// The \a include_filter argument specifies a function that will be called /// to determine at run-time whether or not to add the filter. If the /// value is nullptr, the filter will be added unconditionally. -/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should -/// ensure that subchannels with different filter lists will always have -/// different channel args. This requires setting a channel arg in case the -/// registration function relies on some condition other than channel args to -/// decide whether to add a filter or not. +/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should +/// ensure that subchannels with different filter lists will always have +/// different channel args. This requires setting a channel arg in case the +/// registration function relies on some condition other than channel args to +/// decide whether to add a filter or not. template <typename ChannelDataType, typename CallDataType> void RegisterChannelFilter( const char* name, grpc_channel_stack_type stack_type, int priority, diff --git a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc index 109b2ceb02b..96a7105eaf4 100644 --- a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc +++ b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc @@ -61,8 +61,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( auto core_cq_tag = static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; - *tag = core_cq_tag; - if (core_cq_tag->FinalizeResult(tag, ok)) { + *tag = core_cq_tag; + if (core_cq_tag->FinalizeResult(tag, ok)) { return GOT_EVENT; } break; @@ -89,7 +89,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { auto core_cq_tag = static_cast<::grpc::internal::CompletionQueueTag*>(res_tag); *ok = res == 1; - if (core_cq_tag->FinalizeResult(tag, ok)) { + if (core_cq_tag->FinalizeResult(tag, ok)) { return true; } } diff --git a/contrib/libs/grpc/src/cpp/common/core_codegen.cc b/contrib/libs/grpc/src/cpp/common/core_codegen.cc index 794154f79c5..75383ed5110 100644 --- a/contrib/libs/grpc/src/cpp/common/core_codegen.cc +++ b/contrib/libs/grpc/src/cpp/common/core_codegen.cc @@ -59,10 +59,10 @@ grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck( return ::grpc_completion_queue_create_for_pluck(reserved); } -void CoreCodegen::grpc_completion_queue_shutdown(grpc_completion_queue* cq) { - ::grpc_completion_queue_shutdown(cq); -} - +void CoreCodegen::grpc_completion_queue_shutdown(grpc_completion_queue* cq) { + ::grpc_completion_queue_shutdown(cq); +} + void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) { ::grpc_completion_queue_destroy(cq); } @@ -106,13 +106,13 @@ size_t CoreCodegen::grpc_byte_buffer_length(grpc_byte_buffer* bb) { return ::grpc_byte_buffer_length(bb); } -grpc_call_error CoreCodegen::grpc_call_start_batch(grpc_call* call, - const grpc_op* ops, - size_t nops, void* tag, - void* reserved) { - return ::grpc_call_start_batch(call, ops, nops, tag, reserved); -} - +grpc_call_error CoreCodegen::grpc_call_start_batch(grpc_call* call, + const grpc_op* ops, + size_t nops, void* tag, + void* reserved) { + return ::grpc_call_start_batch(call, ops, nops, tag, reserved); +} + grpc_call_error CoreCodegen::grpc_call_cancel_with_status( grpc_call* call, grpc_status_code status, const char* description, void* reserved) { diff --git a/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc b/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc index a4afbae45a8..64abff96338 100644 --- a/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc +++ b/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc @@ -33,8 +33,8 @@ ResourceQuota& ResourceQuota::Resize(size_t new_size) { return *this; } -ResourceQuota& ResourceQuota::SetMaxThreads(int new_max_threads) { - grpc_resource_quota_set_max_threads(impl_, new_max_threads); - return *this; -} +ResourceQuota& ResourceQuota::SetMaxThreads(int new_max_threads) { + grpc_resource_quota_set_max_threads(impl_, new_max_threads); + return *this; +} } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc b/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc index 96903583434..e1f97889c8e 100644 --- a/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc +++ b/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc @@ -23,11 +23,11 @@ namespace grpc { std::vector<grpc::string_ref> SecureAuthContext::GetPeerIdentity() const { - if (ctx_ == nullptr) { + if (ctx_ == nullptr) { return std::vector<grpc::string_ref>(); } - grpc_auth_property_iterator iter = - grpc_auth_context_peer_identity(ctx_.get()); + grpc_auth_property_iterator iter = + grpc_auth_context_peer_identity(ctx_.get()); std::vector<grpc::string_ref> identity; const grpc_auth_property* property = nullptr; while ((property = grpc_auth_property_iterator_next(&iter))) { @@ -38,20 +38,20 @@ std::vector<grpc::string_ref> SecureAuthContext::GetPeerIdentity() const { } TString SecureAuthContext::GetPeerIdentityPropertyName() const { - if (ctx_ == nullptr) { + if (ctx_ == nullptr) { return ""; } - const char* name = grpc_auth_context_peer_identity_property_name(ctx_.get()); + const char* name = grpc_auth_context_peer_identity_property_name(ctx_.get()); return name == nullptr ? "" : name; } std::vector<grpc::string_ref> SecureAuthContext::FindPropertyValues( const TString& name) const { - if (ctx_ == nullptr) { + if (ctx_ == nullptr) { return std::vector<grpc::string_ref>(); } grpc_auth_property_iterator iter = - grpc_auth_context_find_properties_by_name(ctx_.get(), name.c_str()); + grpc_auth_context_find_properties_by_name(ctx_.get(), name.c_str()); const grpc_auth_property* property = nullptr; std::vector<grpc::string_ref> values; while ((property = grpc_auth_property_iterator_next(&iter))) { @@ -61,9 +61,9 @@ std::vector<grpc::string_ref> SecureAuthContext::FindPropertyValues( } AuthPropertyIterator SecureAuthContext::begin() const { - if (ctx_ != nullptr) { + if (ctx_ != nullptr) { grpc_auth_property_iterator iter = - grpc_auth_context_property_iterator(ctx_.get()); + grpc_auth_context_property_iterator(ctx_.get()); const grpc_auth_property* property = grpc_auth_property_iterator_next(&iter); return AuthPropertyIterator(property, &iter); @@ -78,20 +78,20 @@ AuthPropertyIterator SecureAuthContext::end() const { void SecureAuthContext::AddProperty(const TString& key, const grpc::string_ref& value) { - if (ctx_ == nullptr) return; - grpc_auth_context_add_property(ctx_.get(), key.c_str(), value.data(), - value.size()); + if (ctx_ == nullptr) return; + grpc_auth_context_add_property(ctx_.get(), key.c_str(), value.data(), + value.size()); } bool SecureAuthContext::SetPeerIdentityPropertyName(const TString& name) { - if (ctx_ == nullptr) return false; - return grpc_auth_context_set_peer_identity_property_name(ctx_.get(), + if (ctx_ == nullptr) return false; + return grpc_auth_context_set_peer_identity_property_name(ctx_.get(), name.c_str()) != 0; } bool SecureAuthContext::IsPeerAuthenticated() const { - if (ctx_ == nullptr) return false; - return grpc_auth_context_peer_is_authenticated(ctx_.get()) != 0; + if (ctx_ == nullptr) return false; + return grpc_auth_context_peer_is_authenticated(ctx_.get()) != 0; } } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h index 40d1007dd60..51013efac70 100644 --- a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h +++ b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h @@ -21,17 +21,17 @@ #include <grpcpp/security/auth_context.h> -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/security/context/security_context.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/security/context/security_context.h" namespace grpc { class SecureAuthContext final : public AuthContext { public: - explicit SecureAuthContext(grpc_auth_context* ctx) - : ctx_(ctx != nullptr ? ctx->Ref() : nullptr) {} + explicit SecureAuthContext(grpc_auth_context* ctx) + : ctx_(ctx != nullptr ? ctx->Ref() : nullptr) {} - ~SecureAuthContext() override = default; + ~SecureAuthContext() override = default; bool IsPeerAuthenticated() const override; @@ -52,7 +52,7 @@ class SecureAuthContext final : public AuthContext { virtual bool SetPeerIdentityPropertyName(const TString& name) override; private: - grpc_core::RefCountedPtr<grpc_auth_context> ctx_; + grpc_core::RefCountedPtr<grpc_auth_context> ctx_; }; } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc b/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc index 5883d5c3a7d..908c46629e6 100644 --- a/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc +++ b/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc @@ -20,7 +20,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpcpp/security/auth_context.h> -#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/cpp/common/secure_auth_context.h" namespace grpc { @@ -29,8 +29,8 @@ std::shared_ptr<const AuthContext> CreateAuthContext(grpc_call* call) { if (call == nullptr) { return std::shared_ptr<const AuthContext>(); } - grpc_core::RefCountedPtr<grpc_auth_context> ctx(grpc_call_auth_context(call)); - return std::make_shared<SecureAuthContext>(ctx.get()); + grpc_core::RefCountedPtr<grpc_auth_context> ctx(grpc_call_auth_context(call)); + return std::make_shared<SecureAuthContext>(ctx.get()); } } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc index b707f3c4766..6dcf84bf40d 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc @@ -1,29 +1,29 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include "src/cpp/server/channelz/channelz_service.h" - -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> - -namespace grpc { +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/cpp/server/channelz/channelz_service.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +namespace grpc { namespace { @@ -33,121 +33,121 @@ grpc::protobuf::util::Status ParseJson(const char* json_str, options.case_insensitive_enum_parsing = true; return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); } - + } // namespace -Status ChannelzService::GetTopChannels( +Status ChannelzService::GetTopChannels( ServerContext* /*unused*/, const channelz::v1::GetTopChannelsRequest* request, - channelz::v1::GetTopChannelsResponse* response) { - char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_top_channels returned null"); - } + channelz::v1::GetTopChannelsResponse* response) { + char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_top_channels returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetServers( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetServers( ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request, - channelz::v1::GetServersResponse* response) { - char* json_str = grpc_channelz_get_servers(request->start_server_id()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_servers returned null"); - } + channelz::v1::GetServersResponse* response) { + char* json_str = grpc_channelz_get_servers(request->start_server_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_servers returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + Status ChannelzService::GetServer(ServerContext* /*unused*/, - const channelz::v1::GetServerRequest* request, - channelz::v1::GetServerResponse* response) { - char* json_str = grpc_channelz_get_server(request->server_id()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_server returned null"); - } + const channelz::v1::GetServerRequest* request, + channelz::v1::GetServerResponse* response) { + char* json_str = grpc_channelz_get_server(request->server_id()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetServerSockets( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetServerSockets( ServerContext* /*unused*/, const channelz::v1::GetServerSocketsRequest* request, - channelz::v1::GetServerSocketsResponse* response) { - char* json_str = grpc_channelz_get_server_sockets( - request->server_id(), request->start_socket_id(), request->max_results()); - if (json_str == nullptr) { - return Status(StatusCode::INTERNAL, - "grpc_channelz_get_server_sockets returned null"); - } + channelz::v1::GetServerSocketsResponse* response) { + char* json_str = grpc_channelz_get_server_sockets( + request->server_id(), request->start_socket_id(), request->max_results()); + if (json_str == nullptr) { + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server_sockets returned null"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetChannel( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetChannel( ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request, - channelz::v1::GetChannelResponse* response) { - char* json_str = grpc_channelz_get_channel(request->channel_id()); - if (json_str == nullptr) { - return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); - } + channelz::v1::GetChannelResponse* response) { + char* json_str = grpc_channelz_get_channel(request->channel_id()); + if (json_str == nullptr) { + return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -Status ChannelzService::GetSubchannel( + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +Status ChannelzService::GetSubchannel( ServerContext* /*unused*/, const channelz::v1::GetSubchannelRequest* request, - channelz::v1::GetSubchannelResponse* response) { - char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); - if (json_str == nullptr) { - return Status(StatusCode::NOT_FOUND, - "No object found for that SubchannelId"); - } + channelz::v1::GetSubchannelResponse* response) { + char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); + if (json_str == nullptr) { + return Status(StatusCode::NOT_FOUND, + "No object found for that SubchannelId"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + Status ChannelzService::GetSocket(ServerContext* /*unused*/, - const channelz::v1::GetSocketRequest* request, - channelz::v1::GetSocketResponse* response) { - char* json_str = grpc_channelz_get_socket(request->socket_id()); - if (json_str == nullptr) { - return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); - } + const channelz::v1::GetSocketRequest* request, + channelz::v1::GetSocketResponse* response) { + char* json_str = grpc_channelz_get_socket(request->socket_id()); + if (json_str == nullptr) { + return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); + } grpc::protobuf::util::Status s = ParseJson(json_str, response); - gpr_free(json_str); - if (!s.ok()) { - return Status(StatusCode::INTERNAL, s.ToString()); - } - return Status::OK; -} - -} // namespace grpc + gpr_free(json_str); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); + } + return Status::OK; +} + +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h index 72818a0d726..b4a66ba1c66 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h @@ -1,64 +1,64 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H -#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H - -#include <grpc/support/port_platform.h> - -#include <grpcpp/grpcpp.h> -#include "src/proto/grpc/channelz/channelz.grpc.pb.h" - -namespace grpc { - -class ChannelzService final : public channelz::v1::Channelz::Service { - private: - // implementation of GetTopChannels rpc - Status GetTopChannels( - ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, - channelz::v1::GetTopChannelsResponse* response) override; - // implementation of GetServers rpc - Status GetServers(ServerContext* unused, - const channelz::v1::GetServersRequest* request, - channelz::v1::GetServersResponse* response) override; - // implementation of GetServer rpc - Status GetServer(ServerContext* unused, - const channelz::v1::GetServerRequest* request, - channelz::v1::GetServerResponse* response) override; - // implementation of GetServerSockets rpc - Status GetServerSockets( - ServerContext* unused, - const channelz::v1::GetServerSocketsRequest* request, - channelz::v1::GetServerSocketsResponse* response) override; - // implementation of GetChannel rpc - Status GetChannel(ServerContext* unused, - const channelz::v1::GetChannelRequest* request, - channelz::v1::GetChannelResponse* response) override; - // implementation of GetSubchannel rpc - Status GetSubchannel(ServerContext* unused, - const channelz::v1::GetSubchannelRequest* request, - channelz::v1::GetSubchannelResponse* response) override; - // implementation of GetSocket rpc - Status GetSocket(ServerContext* unused, - const channelz::v1::GetSocketRequest* request, - channelz::v1::GetSocketResponse* response) override; -}; - -} // namespace grpc - -#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H +#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H + +#include <grpc/support/port_platform.h> + +#include <grpcpp/grpcpp.h> +#include "src/proto/grpc/channelz/channelz.grpc.pb.h" + +namespace grpc { + +class ChannelzService final : public channelz::v1::Channelz::Service { + private: + // implementation of GetTopChannels rpc + Status GetTopChannels( + ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, + channelz::v1::GetTopChannelsResponse* response) override; + // implementation of GetServers rpc + Status GetServers(ServerContext* unused, + const channelz::v1::GetServersRequest* request, + channelz::v1::GetServersResponse* response) override; + // implementation of GetServer rpc + Status GetServer(ServerContext* unused, + const channelz::v1::GetServerRequest* request, + channelz::v1::GetServerResponse* response) override; + // implementation of GetServerSockets rpc + Status GetServerSockets( + ServerContext* unused, + const channelz::v1::GetServerSocketsRequest* request, + channelz::v1::GetServerSocketsResponse* response) override; + // implementation of GetChannel rpc + Status GetChannel(ServerContext* unused, + const channelz::v1::GetChannelRequest* request, + channelz::v1::GetChannelResponse* response) override; + // implementation of GetSubchannel rpc + Status GetSubchannel(ServerContext* unused, + const channelz::v1::GetSubchannelRequest* request, + channelz::v1::GetSubchannelResponse* response) override; + // implementation of GetSocket rpc + Status GetSocket(ServerContext* unused, + const channelz::v1::GetSocketRequest* request, + channelz::v1::GetSocketResponse* response) override; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc index 35ecd081255..ae26a447ab3 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc @@ -1,72 +1,72 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <grpcpp/ext/channelz_service_plugin.h> -#include <grpcpp/impl/server_builder_plugin.h> -#include <grpcpp/impl/server_initializer.h> -#include <grpcpp/server.h> - -#include "src/cpp/server/channelz/channelz_service.h" - -namespace grpc { -namespace channelz { -namespace experimental { - -class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { - public: - ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {} - +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include <grpcpp/ext/channelz_service_plugin.h> +#include <grpcpp/impl/server_builder_plugin.h> +#include <grpcpp/impl/server_initializer.h> +#include <grpcpp/server.h> + +#include "src/cpp/server/channelz/channelz_service.h" + +namespace grpc { +namespace channelz { +namespace experimental { + +class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { + public: + ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {} + TString name() override { return "channelz_service"; } - - void InitServer(grpc::ServerInitializer* si) override { - si->RegisterService(channelz_service_); - } - + + void InitServer(grpc::ServerInitializer* si) override { + si->RegisterService(channelz_service_); + } + void Finish(grpc::ServerInitializer* /*si*/) override {} - + void ChangeArguments(const TString& /*name*/, void* /*value*/) override {} - - bool has_sync_methods() const override { - if (channelz_service_) { - return channelz_service_->has_synchronous_methods(); - } - return false; - } - - bool has_async_methods() const override { - if (channelz_service_) { - return channelz_service_->has_async_methods(); - } - return false; - } - - private: - std::shared_ptr<grpc::ChannelzService> channelz_service_; -}; - -static std::unique_ptr< ::grpc::ServerBuilderPlugin> -CreateChannelzServicePlugin() { - return std::unique_ptr< ::grpc::ServerBuilderPlugin>( - new ChannelzServicePlugin()); -} - + + bool has_sync_methods() const override { + if (channelz_service_) { + return channelz_service_->has_synchronous_methods(); + } + return false; + } + + bool has_async_methods() const override { + if (channelz_service_) { + return channelz_service_->has_async_methods(); + } + return false; + } + + private: + std::shared_ptr<grpc::ChannelzService> channelz_service_; +}; + +static std::unique_ptr< ::grpc::ServerBuilderPlugin> +CreateChannelzServicePlugin() { + return std::unique_ptr< ::grpc::ServerBuilderPlugin>( + new ChannelzServicePlugin()); +} + } // namespace experimental } // namespace channelz } // namespace grpc @@ -74,15 +74,15 @@ namespace grpc_impl { namespace channelz { namespace experimental { -void InitChannelzService() { +void InitChannelzService() { static struct Initializer { Initializer() { ::grpc::ServerBuilder::InternalAddPluginFactory( &grpc::channelz::experimental::CreateChannelzServicePlugin); } } initialize; -} - -} // namespace experimental -} // namespace channelz +} + +} // namespace experimental +} // namespace channelz } // namespace grpc_impl diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index 5f70ce05406..3cc508d0cbf 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -25,191 +25,191 @@ #include <grpc/support/log.h> #include <grpcpp/impl/codegen/method_handler.h> -#include "src/cpp/server/health/default_health_check_service.h" +#include "src/cpp/server/health/default_health_check_service.h" #include "src/proto/grpc/health/v1/health.upb.h" #include "upb/upb.hpp" #define MAX_SERVICE_NAME_LENGTH 200 namespace grpc { - -// -// DefaultHealthCheckService -// - -DefaultHealthCheckService::DefaultHealthCheckService() { - services_map_[""].SetServingStatus(SERVING); -} - -void DefaultHealthCheckService::SetServingStatus( + +// +// DefaultHealthCheckService +// + +DefaultHealthCheckService::DefaultHealthCheckService() { + services_map_[""].SetServingStatus(SERVING); +} + +void DefaultHealthCheckService::SetServingStatus( const TString& service_name, bool serving) { grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - // Set to NOT_SERVING in case service_name is not in the map. - serving = false; - } - services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); -} - -void DefaultHealthCheckService::SetServingStatus(bool serving) { - const ServingStatus status = serving ? SERVING : NOT_SERVING; + if (shutdown_) { + // Set to NOT_SERVING in case service_name is not in the map. + serving = false; + } + services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); +} + +void DefaultHealthCheckService::SetServingStatus(bool serving) { + const ServingStatus status = serving ? SERVING : NOT_SERVING; grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return; - } - for (auto& p : services_map_) { - ServiceData& service_data = p.second; - service_data.SetServingStatus(status); - } -} - -void DefaultHealthCheckService::Shutdown() { + if (shutdown_) { + return; + } + for (auto& p : services_map_) { + ServiceData& service_data = p.second; + service_data.SetServingStatus(status); + } +} + +void DefaultHealthCheckService::Shutdown() { grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return; - } - shutdown_ = true; - for (auto& p : services_map_) { - ServiceData& service_data = p.second; - service_data.SetServingStatus(NOT_SERVING); - } -} - -DefaultHealthCheckService::ServingStatus -DefaultHealthCheckService::GetServingStatus( + if (shutdown_) { + return; + } + shutdown_ = true; + for (auto& p : services_map_) { + ServiceData& service_data = p.second; + service_data.SetServingStatus(NOT_SERVING); + } +} + +DefaultHealthCheckService::ServingStatus +DefaultHealthCheckService::GetServingStatus( const TString& service_name) const { grpc_core::MutexLock lock(&mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) { - return NOT_FOUND; - } - const ServiceData& service_data = it->second; - return service_data.GetServingStatus(); -} - -void DefaultHealthCheckService::RegisterCallHandler( + auto it = services_map_.find(service_name); + if (it == services_map_.end()) { + return NOT_FOUND; + } + const ServiceData& service_data = it->second; + return service_data.GetServingStatus(); +} + +void DefaultHealthCheckService::RegisterCallHandler( const TString& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { grpc_core::MutexLock lock(&mu_); - ServiceData& service_data = services_map_[service_name]; - service_data.AddCallHandler(handler /* copies ref */); - HealthCheckServiceImpl::CallHandler* h = handler.get(); - h->SendHealth(std::move(handler), service_data.GetServingStatus()); -} - -void DefaultHealthCheckService::UnregisterCallHandler( + ServiceData& service_data = services_map_[service_name]; + service_data.AddCallHandler(handler /* copies ref */); + HealthCheckServiceImpl::CallHandler* h = handler.get(); + h->SendHealth(std::move(handler), service_data.GetServingStatus()); +} + +void DefaultHealthCheckService::UnregisterCallHandler( const TString& service_name, - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { grpc_core::MutexLock lock(&mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) return; - ServiceData& service_data = it->second; - service_data.RemoveCallHandler(handler); - if (service_data.Unused()) { - services_map_.erase(it); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq) { - GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); - return impl_.get(); -} - -// -// DefaultHealthCheckService::ServiceData -// - -void DefaultHealthCheckService::ServiceData::SetServingStatus( - ServingStatus status) { - status_ = status; - for (auto& call_handler : call_handlers_) { - call_handler->SendHealth(call_handler /* copies ref */, status); - } -} - -void DefaultHealthCheckService::ServiceData::AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - call_handlers_.insert(std::move(handler)); -} - -void DefaultHealthCheckService::ServiceData::RemoveCallHandler( - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { - call_handlers_.erase(handler); -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl -// - + auto it = services_map_.find(service_name); + if (it == services_map_.end()) return; + ServiceData& service_data = it->second; + service_data.RemoveCallHandler(handler); + if (service_data.Unused()) { + services_map_.erase(it); + } +} + +DefaultHealthCheckService::HealthCheckServiceImpl* +DefaultHealthCheckService::GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq) { + GPR_ASSERT(impl_ == nullptr); + impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); + return impl_.get(); +} + +// +// DefaultHealthCheckService::ServiceData +// + +void DefaultHealthCheckService::ServiceData::SetServingStatus( + ServingStatus status) { + status_ = status; + for (auto& call_handler : call_handlers_) { + call_handler->SendHealth(call_handler /* copies ref */, status); + } +} + +void DefaultHealthCheckService::ServiceData::AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + call_handlers_.insert(std::move(handler)); +} + +void DefaultHealthCheckService::ServiceData::RemoveCallHandler( + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { + call_handlers_.erase(handler); +} + +// +// DefaultHealthCheckService::HealthCheckServiceImpl +// + namespace { const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; -const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; +const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq) - : database_(database), cq_(std::move(cq)) { - // Add Check() method. - AddMethod(new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); - // Add Watch() method. - AddMethod(new internal::RpcServiceMethod( - kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); - // Create serving thread. - thread_ = std::unique_ptr<::grpc_core::Thread>( - new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); + DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq) + : database_(database), cq_(std::move(cq)) { + // Add Check() method. + AddMethod(new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); + // Add Watch() method. + AddMethod(new internal::RpcServiceMethod( + kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); + // Create serving thread. + thread_ = std::unique_ptr<::grpc_core::Thread>( + new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); } -DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - // We will reach here after the server starts shutting down. - shutdown_ = true; - { +DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { + // We will reach here after the server starts shutting down. + shutdown_ = true; + { grpc_core::MutexLock lock(&cq_shutdown_mu_); - cq_->Shutdown(); - } - thread_->Join(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { - // Request the calls we're interested in. - // We do this before starting the serving thread, so that we know it's - // done before server startup is complete. - CheckCallHandler::CreateAndStart(cq_.get(), database_, this); - WatchCallHandler::CreateAndStart(cq_.get(), database_, this); - // Start serving thread. - thread_->Start(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { + cq_->Shutdown(); + } + thread_->Join(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { + // Request the calls we're interested in. + // We do this before starting the serving thread, so that we know it's + // done before server startup is complete. + CheckCallHandler::CreateAndStart(cq_.get(), database_, this); + WatchCallHandler::CreateAndStart(cq_.get(), database_, this); + // Start serving thread. + thread_->Start(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg); - void* tag; - bool ok; - while (true) { - if (!service->cq_->Next(&tag, &ok)) { - // The completion queue is shutting down. - GPR_ASSERT(service->shutdown_); - break; - } - auto* next_step = static_cast<CallableTag*>(tag); - next_step->Run(ok); - } -} - -bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( + void* tag; + bool ok; + while (true) { + if (!service->cq_->Next(&tag, &ok)) { + // The completion queue is shutting down. + GPR_ASSERT(service->shutdown_); + break; + } + auto* next_step = static_cast<CallableTag*>(tag); + next_step->Run(ok); + } +} + +bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( const ByteBuffer& request, TString* service_name) { std::vector<Slice> slices; - if (!request.Dump(&slices).ok()) return false; + if (!request.Dump(&slices).ok()) return false; uint8_t* request_bytes = nullptr; size_t request_size = 0; - if (slices.size() == 1) { + if (slices.size() == 1) { request_bytes = const_cast<uint8_t*>(slices[0].begin()); request_size = slices[0].size(); - } else if (slices.size() > 1) { - request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); + } else if (slices.size() > 1) { + request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { memcpy(copy_to, slices[i].begin(), slices[i].size()); @@ -220,8 +220,8 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( grpc_health_v1_HealthCheckRequest* request_struct = grpc_health_v1_HealthCheckRequest_parse( reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); - if (slices.size() > 1) { - gpr_free(request_bytes); + if (slices.size() > 1) { + gpr_free(request_bytes); } if (request_struct == nullptr) { return false; @@ -232,17 +232,17 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( return false; } service_name->assign(service.data, service.size); - return true; -} + return true; +} -bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( - ServingStatus status, ByteBuffer* response) { +bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( + ServingStatus status, ByteBuffer* response) { upb::Arena arena; grpc_health_v1_HealthCheckResponse* response_struct = grpc_health_v1_HealthCheckResponse_new(arena.ptr()); grpc_health_v1_HealthCheckResponse_set_status( response_struct, - status == NOT_FOUND + status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING : grpc_health_v1_HealthCheckResponse_NOT_SERVING); @@ -256,249 +256,249 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); - return true; + return true; } -// -// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr<CallHandler> self = - std::make_shared<CheckCallHandler>(cq, database, service); - CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); - { +// +// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr<CallHandler> self = + std::make_shared<CheckCallHandler>(cq, database, service); + CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); + { grpc_core::MutexLock lock(&service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request a Check() call. - handler->next_ = - CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, - &handler->writer_, cq, cq, &handler->next_); - } + if (service->shutdown_) return; + // Request a Check() call. + handler->next_ = + CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, + &handler->writer_, cq, cq, &handler->next_); + } } -DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - // The value of ok being false means that the server is shutting down. - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Process request. - gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, - this); +DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + // The value of ok being false means that the server is shutting down. + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Process request. + gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, + this); TString service_name; - grpc::Status status = Status::OK; - ByteBuffer response; - if (!service_->DecodeRequest(request_, &service_name)) { - status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); - } else { - ServingStatus serving_status = database_->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - status = Status(StatusCode::NOT_FOUND, "service name unknown"); - } else if (!service_->EncodeResponse(serving_status, &response)) { - status = Status(StatusCode::INTERNAL, "could not encode response"); - } - } - // Send response. - { + grpc::Status status = Status::OK; + ByteBuffer response; + if (!service_->DecodeRequest(request_, &service_name)) { + status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); + } else { + ServingStatus serving_status = database_->GetServingStatus(service_name); + if (serving_status == NOT_FOUND) { + status = Status(StatusCode::NOT_FOUND, "service name unknown"); + } else if (!service_->EncodeResponse(serving_status, &response)) { + status = Status(StatusCode::INTERNAL, "could not encode response"); + } + } + // Send response. + { grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - next_ = - CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - if (status.ok()) { - writer_.Finish(response, status, &next_); - } else { - writer_.FinishWithError(status, &next_); - } - } - } + if (!service_->shutdown_) { + next_ = + CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + if (status.ok()) { + writer_.Finish(response, status, &next_); + } else { + writer_.FinishWithError(status, &next_); + } + } + } } -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", - service_, this); +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", + service_, this); } - self.reset(); // To appease clang-tidy. + self.reset(); // To appease clang-tidy. } -// -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr<CallHandler> self = - std::make_shared<WatchCallHandler>(cq, database, service); - WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); - { +// +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr<CallHandler> self = + std::make_shared<WatchCallHandler>(cq, database, service); + WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); + { grpc_core::MutexLock lock(&service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request AsyncNotifyWhenDone(). - handler->on_done_notified_ = - CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, - std::placeholders::_1, std::placeholders::_2), - self /* copies ref */); - handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); - // Request a Watch() call. - handler->next_ = - CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, - &handler->stream_, cq, cq, - &handler->next_); + if (service->shutdown_) return; + // Request AsyncNotifyWhenDone(). + handler->on_done_notified_ = + CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, + std::placeholders::_1, std::placeholders::_2), + self /* copies ref */); + handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); + // Request a Watch() call. + handler->next_ = + CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, + &handler->stream_, cq, cq, + &handler->next_); } } -DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - // Server shutting down. - // - // AsyncNotifyWhenDone() needs to be called before the call starts, but the - // tag will not pop out if the call never starts ( - // https://github.com/grpc/grpc/issues/10136). So we need to manually - // release the ownership of the handler in this case. - GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Parse request. - if (!service_->DecodeRequest(request_, &service_name_)) { - SendFinish(std::move(self), - Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); - return; - } - // Register the call for updates to the service. - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch started for service \"%s\" (handler: %p)", - service_, service_name_.c_str(), this); - database_->RegisterCallHandler(service_name_, std::move(self)); +DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + // Server shutting down. + // + // AsyncNotifyWhenDone() needs to be called before the call starts, but the + // tag will not pop out if the call never starts ( + // https://github.com/grpc/grpc/issues/10136). So we need to manually + // release the ownership of the handler in this case. + GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Parse request. + if (!service_->DecodeRequest(request_, &service_name_)) { + SendFinish(std::move(self), + Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); + return; + } + // Register the call for updates to the service. + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch started for service \"%s\" (handler: %p)", + service_, service_name_.c_str(), this); + database_->RegisterCallHandler(service_name_, std::move(self)); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { grpc_core::MutexLock lock(&send_mu_); - // If there's already a send in flight, cache the new status, and - // we'll start a new send for it when the one in flight completes. - if (send_in_flight_) { - pending_status_ = status; - return; - } - // Start a send. - SendHealthLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { - send_in_flight_ = true; - // Construct response. - ByteBuffer response; - bool success = service_->EncodeResponse(status, &response); - // Grab shutdown lock and send response. + // If there's already a send in flight, cache the new status, and + // we'll start a new send for it when the one in flight completes. + if (send_in_flight_) { + pending_status_ = status; + return; + } + // Start a send. + SendHealthLocked(std::move(self), status); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { + send_in_flight_ = true; + // Construct response. + ByteBuffer response; + bool success = service_->EncodeResponse(status, &response); + // Grab shutdown lock and send response. grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); - if (service_->shutdown_) { - SendFinishLocked(std::move(self), Status::CANCELLED); - return; - } - if (!success) { - SendFinishLocked(std::move(self), - Status(StatusCode::INTERNAL, "could not encode response")); - return; - } - next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Write(response, &next_); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { - if (!ok) { - SendFinish(std::move(self), Status::CANCELLED); - return; - } + if (service_->shutdown_) { + SendFinishLocked(std::move(self), Status::CANCELLED); + return; + } + if (!success) { + SendFinishLocked(std::move(self), + Status(StatusCode::INTERNAL, "could not encode response")); + return; + } + next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Write(response, &next_); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + SendFinish(std::move(self), Status::CANCELLED); + return; + } grpc_core::MutexLock lock(&send_mu_); - send_in_flight_ = false; - // If we got a new status since we started the last send, start a - // new send for it. - if (pending_status_ != NOT_FOUND) { - auto status = pending_status_; - pending_status_ = NOT_FOUND; - SendHealthLocked(std::move(self), status); - } -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { - if (finish_called_) return; + send_in_flight_ = false; + // If we got a new status since we started the last send, start a + // new send for it. + if (pending_status_ != NOT_FOUND) { + auto status = pending_status_; + pending_status_ = NOT_FOUND; + SendHealthLocked(std::move(self), status); + } +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { + if (finish_called_) return; grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); - if (service_->shutdown_) return; - SendFinishLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(status, &on_finish_done_); - finish_called_ = true; -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call finished (service_name: \"%s\", " - "handler: %p).", - service_, service_name_.c_str(), this); - } - self.reset(); // To appease clang-tidy. -} - -// TODO(roth): This method currently assumes that there will be only one -// thread polling the cq and invoking the corresponding callbacks. If -// that changes, we will need to add synchronization here. -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { - GPR_ASSERT(ok); - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call is notified done (handler: %p, " - "is_cancelled: %d).", - service_, this, static_cast<int>(ctx_.IsCancelled())); - database_->UnregisterCallHandler(service_name_, self); - SendFinish(std::move(self), Status::CANCELLED); -} - + if (service_->shutdown_) return; + SendFinishLocked(std::move(self), status); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(status, &on_finish_done_); + finish_called_ = true; +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call finished (service_name: \"%s\", " + "handler: %p).", + service_, service_name_.c_str(), this); + } + self.reset(); // To appease clang-tidy. +} + +// TODO(roth): This method currently assumes that there will be only one +// thread polling the cq and invoking the corresponding callbacks. If +// that changes, we will need to add synchronization here. +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { + GPR_ASSERT(ok); + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call is notified done (handler: %p, " + "is_cancelled: %d).", + service_, this, static_cast<int>(ctx_.IsCancelled())); + database_->UnregisterCallHandler(service_name_, self); + SendFinish(std::move(self), Status::CANCELLED); +} + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h index 5da0ef935a4..9da1dfc15fa 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h @@ -19,260 +19,260 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include <atomic> -#include <set> +#include <atomic> +#include <set> -#include <grpc/support/log.h> -#include <grpcpp/grpcpp.h> +#include <grpc/support/log.h> +#include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> -#include <grpcpp/impl/codegen/async_generic_service.h> -#include <grpcpp/impl/codegen/async_unary_call.h> +#include <grpcpp/impl/codegen/async_generic_service.h> +#include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/gprpp/thd.h" - +#include "src/core/lib/gprpp/thd.h" + namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; - + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Base class for call handlers. - class CallHandler { - public: - virtual ~CallHandler() = default; - virtual void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) = 0; - }; - - HealthCheckServiceImpl(DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq); - - ~HealthCheckServiceImpl(); - - void StartServingThread(); - + // Base class for call handlers. + class CallHandler { + public: + virtual ~CallHandler() = default; + virtual void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) = 0; + }; + + HealthCheckServiceImpl(DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq); + + ~HealthCheckServiceImpl(); + + void StartServingThread(); + private: - // A tag that can be called with a bool argument. It's tailored for - // CallHandler's use. Before being used, it should be constructed with a - // method of CallHandler and a shared pointer to the handler. The - // shared pointer will be moved to the invoked function and the function - // can only be invoked once. That makes ref counting of the handler easier, - // because the shared pointer is not bound to the function and can be gone - // once the invoked function returns (if not used any more). - class CallableTag { - public: - using HandlerFunction = - std::function<void(std::shared_ptr<CallHandler>, bool)>; - - CallableTag() {} - - CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) - : handler_function_(std::move(func)), handler_(std::move(handler)) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - } - - // Runs the tag. This should be called only once. The handler is no - // longer owned by this tag after this method is invoked. - void Run(bool ok) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - handler_function_(std::move(handler_), ok); - } - - // Releases and returns the shared pointer to the handler. - std::shared_ptr<CallHandler> ReleaseHandler() { - return std::move(handler_); - } - - private: - HandlerFunction handler_function_ = nullptr; - std::shared_ptr<CallHandler> handler_; - }; - - // Call handler for Check method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class CheckCallHandler : public CallHandler { - public: - // Instantiates a CheckCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // Not used for Check. + // A tag that can be called with a bool argument. It's tailored for + // CallHandler's use. Before being used, it should be constructed with a + // method of CallHandler and a shared pointer to the handler. The + // shared pointer will be moved to the invoked function and the function + // can only be invoked once. That makes ref counting of the handler easier, + // because the shared pointer is not bound to the function and can be gone + // once the invoked function returns (if not used any more). + class CallableTag { + public: + using HandlerFunction = + std::function<void(std::shared_ptr<CallHandler>, bool)>; + + CallableTag() {} + + CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) + : handler_function_(std::move(func)), handler_(std::move(handler)) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + } + + // Runs the tag. This should be called only once. The handler is no + // longer owned by this tag after this method is invoked. + void Run(bool ok) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + handler_function_(std::move(handler_), ok); + } + + // Releases and returns the shared pointer to the handler. + std::shared_ptr<CallHandler> ReleaseHandler() { + return std::move(handler_); + } + + private: + HandlerFunction handler_function_ = nullptr; + std::shared_ptr<CallHandler> handler_; + }; + + // Call handler for Check method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class CheckCallHandler : public CallHandler { + public: + // Instantiates a CheckCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // Not used for Check. void SendHealth(std::shared_ptr<CallHandler> /*self*/, ServingStatus /*status*/) override {} - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - GenericServerAsyncResponseWriter writer_; - ServerContext ctx_; - - CallableTag next_; - }; - - // Call handler for Watch method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class WatchCallHandler : public CallHandler { - public: - // Instantiates a WatchCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) override; - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Requires holding send_mu_. - void SendHealthLocked(std::shared_ptr<CallHandler> self, - ServingStatus status); - - // When sending a health result finishes. - void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); - - void SendFinish(std::shared_ptr<CallHandler> self, const Status& status); - - // Requires holding service_->cq_shutdown_mu_. - void SendFinishLocked(std::shared_ptr<CallHandler> self, - const Status& status); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // Called when AsyncNotifyWhenDone() notifies us. - void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + GenericServerAsyncResponseWriter writer_; + ServerContext ctx_; + + CallableTag next_; + }; + + // Call handler for Watch method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class WatchCallHandler : public CallHandler { + public: + // Instantiates a WatchCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) override; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Requires holding send_mu_. + void SendHealthLocked(std::shared_ptr<CallHandler> self, + ServingStatus status); + + // When sending a health result finishes. + void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); + + void SendFinish(std::shared_ptr<CallHandler> self, const Status& status); + + // Requires holding service_->cq_shutdown_mu_. + void SendFinishLocked(std::shared_ptr<CallHandler> self, + const Status& status); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // Called when AsyncNotifyWhenDone() notifies us. + void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; TString service_name_; - GenericServerAsyncWriter stream_; - ServerContext ctx_; - + GenericServerAsyncWriter stream_; + ServerContext ctx_; + grpc_core::Mutex send_mu_; - bool send_in_flight_ = false; // Guarded by mu_. - ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - - bool finish_called_ = false; - CallableTag next_; - CallableTag on_done_notified_; - CallableTag on_finish_done_; - }; - - // Handles the incoming requests and drives the completion queue in a loop. - static void Serve(void* arg); - - // Returns true on success. - static bool DecodeRequest(const ByteBuffer& request, + bool send_in_flight_ = false; // Guarded by mu_. + ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. + + bool finish_called_ = false; + CallableTag next_; + CallableTag on_done_notified_; + CallableTag on_finish_done_; + }; + + // Handles the incoming requests and drives the completion queue in a loop. + static void Serve(void* arg); + + // Returns true on success. + static bool DecodeRequest(const ByteBuffer& request, TString* service_name); - static bool EncodeResponse(ServingStatus status, ByteBuffer* response); - - // Needed to appease Windows compilers, which don't seem to allow - // nested classes to access protected members in the parent's - // superclass. - using Service::RequestAsyncServerStreaming; - using Service::RequestAsyncUnary; - - DefaultHealthCheckService* database_; - std::unique_ptr<ServerCompletionQueue> cq_; - - // To synchronize the operations related to shutdown state of cq_, so that - // we don't enqueue new tags into cq_ after it is already shut down. + static bool EncodeResponse(ServingStatus status, ByteBuffer* response); + + // Needed to appease Windows compilers, which don't seem to allow + // nested classes to access protected members in the parent's + // superclass. + using Service::RequestAsyncServerStreaming; + using Service::RequestAsyncUnary; + + DefaultHealthCheckService* database_; + std::unique_ptr<ServerCompletionQueue> cq_; + + // To synchronize the operations related to shutdown state of cq_, so that + // we don't enqueue new tags into cq_ after it is already shut down. grpc_core::Mutex cq_shutdown_mu_; - std::atomic_bool shutdown_{false}; - std::unique_ptr<::grpc_core::Thread> thread_; + std::atomic_bool shutdown_{false}; + std::unique_ptr<::grpc_core::Thread> thread_; }; DefaultHealthCheckService(); - + void SetServingStatus(const TString& service_name, bool serving) override; void SetServingStatus(bool serving) override; - - void Shutdown() override; - + + void Shutdown() override; + ServingStatus GetServingStatus(const TString& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq); - + HealthCheckServiceImpl* GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq); + private: - // Stores the current serving status of a service and any call - // handlers registered for updates when the service's status changes. - class ServiceData { - public: - void SetServingStatus(ServingStatus status); - ServingStatus GetServingStatus() const { return status_; } - void AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - void RemoveCallHandler( - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); - bool Unused() const { - return call_handlers_.empty() && status_ == NOT_FOUND; - } - - private: - ServingStatus status_ = NOT_FOUND; - std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> - call_handlers_; - }; - - void RegisterCallHandler( + // Stores the current serving status of a service and any call + // handlers registered for updates when the service's status changes. + class ServiceData { + public: + void SetServingStatus(ServingStatus status); + ServingStatus GetServingStatus() const { return status_; } + void AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + void RemoveCallHandler( + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); + bool Unused() const { + return call_handlers_.empty() && status_ == NOT_FOUND; + } + + private: + ServingStatus status_ = NOT_FOUND; + std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> + call_handlers_; + }; + + void RegisterCallHandler( const TString& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - - void UnregisterCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + + void UnregisterCallHandler( const TString& service_name, - const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); - + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); + mutable grpc_core::Mutex mu_; bool shutdown_ = false; // Guarded by mu_. std::map<TString, ServiceData> services_map_; // Guarded by mu_. diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h index eabed9711ec..00ad794a049 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h @@ -1,81 +1,81 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H -#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H - -#include <grpc/impl/codegen/port_platform.h> - -namespace grpc { -namespace load_reporter { - -// TODO(juanlishen): Update the version number with the PR number every time -// there is any change to the server load reporter. -constexpr uint32_t kVersion = 15853; - -// TODO(juanlishen): This window size is from the internal spec for the load -// reporter. Need to ask the gRPC LB team whether we should make this and the -// fetching interval configurable. -constexpr uint32_t kFeedbackSampleWindowSeconds = 10; -constexpr uint32_t kFetchAndSampleIntervalSeconds = 1; - -constexpr size_t kLbIdLength = 8; -constexpr size_t kIpv4AddressLength = 8; -constexpr size_t kIpv6AddressLength = 32; - -constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>"; - -// Call statuses. - -constexpr char kCallStatusOk[] = "OK"; -constexpr char kCallStatusServerError[] = "5XX"; -constexpr char kCallStatusClientError[] = "4XX"; - -// Tag keys. - -constexpr char kTagKeyToken[] = "token"; -constexpr char kTagKeyHost[] = "host"; -constexpr char kTagKeyUserId[] = "user_id"; -constexpr char kTagKeyStatus[] = "status"; -constexpr char kTagKeyMetricName[] = "metric_name"; - -// Measure names. - -constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count"; -constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count"; -constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent"; -constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received"; -constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms"; -constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric"; - -// View names. - -constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count"; -constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count"; -constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent"; -constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received"; -constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms"; -constexpr char kViewOtherCallMetricCount[] = - "grpc.io/lb_view/other_call_metric_count"; -constexpr char kViewOtherCallMetricValue[] = - "grpc.io/lb_view/other_call_metric_value"; - -} // namespace load_reporter -} // namespace grpc - -#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H +#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H + +#include <grpc/impl/codegen/port_platform.h> + +namespace grpc { +namespace load_reporter { + +// TODO(juanlishen): Update the version number with the PR number every time +// there is any change to the server load reporter. +constexpr uint32_t kVersion = 15853; + +// TODO(juanlishen): This window size is from the internal spec for the load +// reporter. Need to ask the gRPC LB team whether we should make this and the +// fetching interval configurable. +constexpr uint32_t kFeedbackSampleWindowSeconds = 10; +constexpr uint32_t kFetchAndSampleIntervalSeconds = 1; + +constexpr size_t kLbIdLength = 8; +constexpr size_t kIpv4AddressLength = 8; +constexpr size_t kIpv6AddressLength = 32; + +constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>"; + +// Call statuses. + +constexpr char kCallStatusOk[] = "OK"; +constexpr char kCallStatusServerError[] = "5XX"; +constexpr char kCallStatusClientError[] = "4XX"; + +// Tag keys. + +constexpr char kTagKeyToken[] = "token"; +constexpr char kTagKeyHost[] = "host"; +constexpr char kTagKeyUserId[] = "user_id"; +constexpr char kTagKeyStatus[] = "status"; +constexpr char kTagKeyMetricName[] = "metric_name"; + +// Measure names. + +constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count"; +constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count"; +constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent"; +constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received"; +constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms"; +constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric"; + +// View names. + +constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count"; +constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count"; +constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent"; +constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received"; +constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms"; +constexpr char kViewOtherCallMetricCount[] = + "grpc.io/lb_view/other_call_metric_count"; +constexpr char kViewOtherCallMetricValue[] = + "grpc.io/lb_view/other_call_metric_value"; + +} // namespace load_reporter +} // namespace grpc + +#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h index 8544b054177..f514b0752f7 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h @@ -1,36 +1,36 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H -#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H - -#include <grpc/impl/codegen/port_platform.h> - -#include <utility> - -namespace grpc { -namespace load_reporter { - -// Reads the CPU stats (in a pair of busy and total numbers) from the system. -// The units of the stats should be the same. -std::pair<uint64_t, uint64_t> GetCpuStatsImpl(); - -} // namespace load_reporter -} // namespace grpc - -#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H +#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H + +#include <grpc/impl/codegen/port_platform.h> + +#include <utility> + +namespace grpc { +namespace load_reporter { + +// Reads the CPU stats (in a pair of busy and total numbers) from the system. +// The units of the stats should be the same. +std::pair<uint64_t, uint64_t> GetCpuStatsImpl(); + +} // namespace load_reporter +} // namespace grpc + +#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc index 8565d384a81..561d4f50482 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc @@ -1,48 +1,48 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_LINUX - -#include <cstdio> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - FILE* fp; - fp = fopen("/proc/stat", "r"); - uint64_t user, nice, system, idle; +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_LINUX + +#include <cstdio> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + FILE* fp; + fp = fopen("/proc/stat", "r"); + uint64_t user, nice, system, idle; if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { // Something bad happened with the information, so assume it's all invalid user = nice = system = idle = 0; } - fclose(fp); - busy = user + nice + system; - total = busy + idle; - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // GPR_LINUX + fclose(fp); + busy = user + nice + system; + total = busy + idle; + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // GPR_LINUX diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc index 125631a3d1e..dbdde304c20 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc @@ -1,45 +1,45 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_APPLE - -#include <mach/mach.h> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - host_cpu_load_info_data_t cpuinfo; - mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT; - if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, - (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) { - for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i]; - busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE]; - } - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // GPR_APPLE +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_APPLE + +#include <mach/mach.h> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + host_cpu_load_info_data_t cpuinfo; + mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT; + if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, + (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) { + for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i]; + busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE]; + } + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // GPR_APPLE diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc index e2d61859c8e..80fb8b6da1f 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc @@ -1,40 +1,40 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) - -#include <grpc/support/log.h> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - gpr_log(GPR_ERROR, - "Platforms other than Linux, Windows, and MacOS are not supported."); - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) + +#include <grpc/support/log.h> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + gpr_log(GPR_ERROR, + "Platforms other than Linux, Windows, and MacOS are not supported."); + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE) diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc index bc5718a0568..0a98e848a2c 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc @@ -1,55 +1,55 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_WINDOWS - -#include <windows.h> -#include <cstdint> - -#include "src/cpp/server/load_reporter/get_cpu_stats.h" - -namespace grpc { -namespace load_reporter { - -namespace { - -uint64_t FiletimeToInt(const FILETIME& ft) { - ULARGE_INTEGER i; - i.LowPart = ft.dwLowDateTime; - i.HighPart = ft.dwHighDateTime; - return i.QuadPart; -} - -} // namespace - -std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { - uint64_t busy = 0, total = 0; - FILETIME idle, kernel, user; - if (GetSystemTimes(&idle, &kernel, &user) != 0) { - total = FiletimeToInt(kernel) + FiletimeToInt(user); - busy = total - FiletimeToInt(idle); - } - return std::make_pair(busy, total); -} - -} // namespace load_reporter -} // namespace grpc - -#endif // GPR_WINDOWS +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINDOWS + +#include <windows.h> +#include <cstdint> + +#include "src/cpp/server/load_reporter/get_cpu_stats.h" + +namespace grpc { +namespace load_reporter { + +namespace { + +uint64_t FiletimeToInt(const FILETIME& ft) { + ULARGE_INTEGER i; + i.LowPart = ft.dwLowDateTime; + i.HighPart = ft.dwHighDateTime; + return i.QuadPart; +} + +} // namespace + +std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { + uint64_t busy = 0, total = 0; + FILETIME idle, kernel, user; + if (GetSystemTimes(&idle, &kernel, &user) != 0) { + total = FiletimeToInt(kernel) + FiletimeToInt(user); + busy = total - FiletimeToInt(idle); + } + return std::make_pair(busy, total); +} + +} // namespace load_reporter +} // namespace grpc + +#endif // GPR_WINDOWS diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc index 12e8203fe1b..f07fa812a7d 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc @@ -16,15 +16,15 @@ * */ -#include <grpc/impl/codegen/port_platform.h> - -#include <stdio.h> +#include <grpc/impl/codegen/port_platform.h> + +#include <stdio.h> #include <cstdlib> #include <set> #include <unordered_map> #include <vector> -#include "src/core/lib/iomgr/socket_utils.h" +#include "src/core/lib/iomgr/socket_utils.h" #include "src/cpp/server/load_reporter/load_data_store.h" namespace grpc { @@ -79,65 +79,65 @@ const typename C::value_type* RandomElement(const C& container) { LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token, TString user_id) - : user_id_(std::move(user_id)) { - GPR_ASSERT(client_ip_and_token.size() >= 2); - int ip_hex_size; - GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", - &ip_hex_size) == 1); - GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength || - ip_hex_size == kIpv6AddressLength); - size_t cur_pos = 2; - client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size); - cur_pos += ip_hex_size; - if (client_ip_and_token.size() - cur_pos < kLbIdLength) { - lb_id_ = kInvalidLbId; - lb_tag_ = ""; - } else { - lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength); - lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength); - } -} - + : user_id_(std::move(user_id)) { + GPR_ASSERT(client_ip_and_token.size() >= 2); + int ip_hex_size; + GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", + &ip_hex_size) == 1); + GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength || + ip_hex_size == kIpv6AddressLength); + size_t cur_pos = 2; + client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size); + cur_pos += ip_hex_size; + if (client_ip_and_token.size() - cur_pos < kLbIdLength) { + lb_id_ = kInvalidLbId; + lb_tag_ = ""; + } else { + lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength); + lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength); + } +} + TString LoadRecordKey::GetClientIpBytes() const { - if (client_ip_hex_.empty()) { - return ""; - } else if (client_ip_hex_.size() == kIpv4AddressLength) { - uint32_t ip_bytes; - if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) { - gpr_log(GPR_ERROR, - "Can't parse client IP (%s) from a hex string to an integer.", - client_ip_hex_.c_str()); - return ""; - } - ip_bytes = grpc_htonl(ip_bytes); + if (client_ip_hex_.empty()) { + return ""; + } else if (client_ip_hex_.size() == kIpv4AddressLength) { + uint32_t ip_bytes; + if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) { + gpr_log(GPR_ERROR, + "Can't parse client IP (%s) from a hex string to an integer.", + client_ip_hex_.c_str()); + return ""; + } + ip_bytes = grpc_htonl(ip_bytes); return TString(reinterpret_cast<const char*>(&ip_bytes), sizeof(ip_bytes)); - } else if (client_ip_hex_.size() == kIpv6AddressLength) { - uint32_t ip_bytes[4]; - for (size_t i = 0; i < 4; ++i) { - if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x", - ip_bytes + i) != 1) { - gpr_log( - GPR_ERROR, - "Can't parse client IP part (%s) from a hex string to an integer.", - client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str()); - return ""; - } - ip_bytes[i] = grpc_htonl(ip_bytes[i]); - } + } else if (client_ip_hex_.size() == kIpv6AddressLength) { + uint32_t ip_bytes[4]; + for (size_t i = 0; i < 4; ++i) { + if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x", + ip_bytes + i) != 1) { + gpr_log( + GPR_ERROR, + "Can't parse client IP part (%s) from a hex string to an integer.", + client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str()); + return ""; + } + ip_bytes[i] = grpc_htonl(ip_bytes[i]); + } return TString(reinterpret_cast<const char*>(ip_bytes), sizeof(ip_bytes)); - } else { - GPR_UNREACHABLE_CODE(return ""); - } -} - + } else { + GPR_UNREACHABLE_CODE(return ""); + } +} + LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls, - double total_metric_value) { - call_metrics_.emplace(std::move(metric_name), - CallMetricValue(num_calls, total_metric_value)); -} - + double total_metric_value) { + call_metrics_.emplace(std::move(metric_name), + CallMetricValue(num_calls, total_metric_value)); +} + void PerBalancerStore::MergeRow(const LoadRecordKey& key, const LoadRecordValue& value) { // During suspension, the load data received will be dropped. diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h index 7047488c0e8..61ba618331a 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h @@ -28,8 +28,8 @@ #include <grpc/support/log.h> #include <grpcpp/impl/codegen/config.h> -#include "src/cpp/server/load_reporter/constants.h" - +#include "src/cpp/server/load_reporter/constants.h" + #include <util/string/cast.h> namespace grpc { @@ -76,9 +76,9 @@ class LoadRecordKey { user_id_(std::move(user_id)), client_ip_hex_(std::move(client_ip_hex)) {} - // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. + // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. LoadRecordKey(const TString& client_ip_and_token, TString user_id); - + TString ToString() const { return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ + ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ + @@ -90,9 +90,9 @@ class LoadRecordKey { user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_; } - // Gets the client IP bytes in network order (i.e., big-endian). + // Gets the client IP bytes in network order (i.e., big-endian). TString GetClientIpBytes() const; - + // Getters. const TString& lb_id() const { return lb_id_; } const TString& lb_tag() const { return lb_tag_; } @@ -126,8 +126,8 @@ class LoadRecordKey { class LoadRecordValue { public: explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0, - uint64_t error_count = 0, uint64_t bytes_sent = 0, - uint64_t bytes_recv = 0, uint64_t latency_ms = 0) + uint64_t error_count = 0, uint64_t bytes_sent = 0, + uint64_t bytes_recv = 0, uint64_t latency_ms = 0) : start_count_(start_count), ok_count_(ok_count), error_count_(error_count), @@ -136,8 +136,8 @@ class LoadRecordValue { latency_ms_(latency_ms) {} LoadRecordValue(TString metric_name, uint64_t num_calls, - double total_metric_value); - + double total_metric_value); + void MergeFrom(const LoadRecordValue& other) { start_count_ += other.start_count_; ok_count_ += other.ok_count_; @@ -175,9 +175,9 @@ class LoadRecordValue { uint64_t start_count() const { return start_count_; } uint64_t ok_count() const { return ok_count_; } uint64_t error_count() const { return error_count_; } - uint64_t bytes_sent() const { return bytes_sent_; } - uint64_t bytes_recv() const { return bytes_recv_; } - uint64_t latency_ms() const { return latency_ms_; } + uint64_t bytes_sent() const { return bytes_sent_; } + uint64_t bytes_recv() const { return bytes_recv_; } + uint64_t latency_ms() const { return latency_ms_; } const std::unordered_map<TString, CallMetricValue>& call_metrics() const { return call_metrics_; } @@ -186,9 +186,9 @@ class LoadRecordValue { uint64_t start_count_ = 0; uint64_t ok_count_ = 0; uint64_t error_count_ = 0; - uint64_t bytes_sent_ = 0; - uint64_t bytes_recv_ = 0; - uint64_t latency_ms_ = 0; + uint64_t bytes_sent_ = 0; + uint64_t bytes_recv_ = 0; + uint64_t latency_ms_ = 0; std::unordered_map<TString, CallMetricValue> call_metrics_; }; diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc index a57ddc47150..24ad9f3f248 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc @@ -1,47 +1,47 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/impl/codegen/port_platform.h> - -#include <grpcpp/ext/server_load_reporting.h> - -#include <cmath> - -#include <grpc/support/log.h> - +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include <grpcpp/ext/server_load_reporting.h> + +#include <cmath> + +#include <grpc/support/log.h> + namespace grpc { -namespace load_reporter { -namespace experimental { - -void AddLoadReportingCost(grpc::ServerContext* ctx, +namespace load_reporter { +namespace experimental { + +void AddLoadReportingCost(grpc::ServerContext* ctx, const TString& cost_name, double cost_value) { - if (std::isnormal(cost_value)) { + if (std::isnormal(cost_value)) { TString buf; - buf.resize(sizeof(cost_value) + cost_name.size()); - memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value)); - memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(), - cost_name.size()); - ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf); - } else { - gpr_log(GPR_ERROR, "Call metric value is not normal."); - } -} - -} // namespace experimental -} // namespace load_reporter + buf.resize(sizeof(cost_value) + cost_name.size()); + memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value)); + memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(), + cost_name.size()); + ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf); + } else { + gpr_log(GPR_ERROR, "Call metric value is not normal."); + } +} + +} // namespace experimental +} // namespace load_reporter } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index 3b734535157..732602bcb70 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -44,10 +44,10 @@ void AuthMetadataProcessorAyncWrapper::Process( return; } if (w->processor_->IsBlocking()) { - w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { - w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, - cb, user_data); - }); + w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { + w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, + cb, user_data); + }); } else { // invoke directly. w->InvokeProcessor(context, md, num_md, cb, user_data); @@ -62,7 +62,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key), StringRefFromSlice(&md[i].value))); } - SecureAuthContext context(ctx); + SecureAuthContext context(ctx); AuthMetadataProcessor::OutputMetadata consumed_metadata; AuthMetadataProcessor::OutputMetadata response_metadata; @@ -138,12 +138,12 @@ std::shared_ptr<ServerCredentials> AltsServerCredentials( new SecureServerCredentials(c_creds)); } -std::shared_ptr<ServerCredentials> LocalServerCredentials( - grpc_local_connect_type type) { - return std::shared_ptr<ServerCredentials>( - new SecureServerCredentials(grpc_local_server_credentials_create(type))); -} - +std::shared_ptr<ServerCredentials> LocalServerCredentials( + grpc_local_connect_type type) { + return std::shared_ptr<ServerCredentials>( + new SecureServerCredentials(grpc_local_server_credentials_create(type))); +} + std::shared_ptr<ServerCredentials> TlsServerCredentials( const grpc::experimental::TlsCredentialsOptions& options) { grpc::GrpcLibraryCodegen init; diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index ec5d4eec8cd..0cc00b365ff 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -24,8 +24,8 @@ #include <grpcpp/resource_quota.h> #include <grpcpp/server.h> -#include <utility> - +#include <utility> + #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" @@ -44,8 +44,8 @@ static void do_plugin_list_init(void) { } ServerBuilder::ServerBuilder() - : max_receive_message_size_(INT_MIN), - max_send_message_size_(INT_MIN), + : max_receive_message_size_(INT_MIN), + max_send_message_size_(INT_MIN), sync_server_settings_(SyncServerSettings()), resource_quota_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); @@ -71,9 +71,9 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue( - GRPC_CQ_NEXT, - is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, - nullptr); + GRPC_CQ_NEXT, + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, + nullptr); cqs_.push_back(cq); return std::unique_ptr<grpc::ServerCompletionQueue>(cq); } @@ -180,7 +180,7 @@ ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus( ServerBuilder& ServerBuilder::SetDefaultCompressionLevel( grpc_compression_level level) { - maybe_default_compression_level_.is_set = true; + maybe_default_compression_level_.is_set = true; maybe_default_compression_level_.level = level; return *this; } @@ -212,14 +212,14 @@ ServerBuilder& ServerBuilder::AddListeningPort( while (addr_uri[pos] == '/') ++pos; // Skip slashes. addr = addr_uri.substr(pos); } - Port port = {addr, std::move(creds), selected_port}; + Port port = {addr, std::move(creds), selected_port}; ports_.push_back(port); return *this; } std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { grpc::ChannelArguments args; - if (max_receive_message_size_ >= -1) { + if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } if (max_send_message_size_ >= -1) { @@ -306,14 +306,14 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back( + sync_server_cqs->emplace_back( new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } - // TODO(vjpai): Add a section here for plugins once they can support callback - // methods - + // TODO(vjpai): Add a section here for plugins once they can support callback + // methods + if (has_sync_methods) { // This is a Sync server gpr_log(GPR_INFO, @@ -324,16 +324,16 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); } - if (has_callback_methods) { - gpr_log(GPR_INFO, "Callback server."); - } - + if (has_callback_methods) { + gpr_log(GPR_INFO, "Callback server."); + } + std::unique_ptr<grpc::Server> server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, std::move(acceptors_), resource_quota_, std::move(interceptor_creators_))); - + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e @@ -347,10 +347,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } if (has_callback_methods || callback_generic_service_ != nullptr) { - auto* cq = server->CallbackCQ(); - grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); - } - + auto* cq = server->CallbackCQ(); + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + } + // cqs_ contains the completion queue added by calling the ServerBuilder's // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index 56bf75d730b..c2a911c7f7c 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -30,10 +30,10 @@ #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/method_handler.h> -#include <grpcpp/impl/codegen/server_interceptor.h> +#include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_service_method.h> #include <grpcpp/impl/server_initializer.h> @@ -43,10 +43,10 @@ #include <grpcpp/support/time.h> #include "src/core/ext/transport/inproc/inproc_transport.h" -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/external_connection_acceptor_impl.h" @@ -58,13 +58,13 @@ namespace grpc { namespace { -// The default value for maximum number of threads that can be created in the -// sync server. This value of INT_MAX is chosen to match the default behavior if -// no ResourceQuota is set. To modify the max number of threads in a sync -// server, pass a custom ResourceQuota object (with the desired number of -// max-threads set) to the server builder. -#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX - +// The default value for maximum number of threads that can be created in the +// sync server. This value of INT_MAX is chosen to match the default behavior if +// no ResourceQuota is set. To modify the max number of threads in a sync +// server, pass a custom ResourceQuota object (with the desired number of +// max-threads set) to the server builder. +#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -319,10 +319,10 @@ class Server::UnimplementedAsyncResponse final grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { - delete this; - } else { - // The tag was swallowed due to interception. We will see it again. - } + delete this; + } else { + // The tag was swallowed due to interception. We will see it again. + } return false; } @@ -334,7 +334,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), - method_tag_(method_tag), + method_tag_(method_tag), in_flight_(false), has_request_payload_(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || @@ -362,11 +362,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - if (method_tag_) { - if (grpc_server_request_registered_call( - server, method_tag_, &call_, &deadline_, &request_metadata_, + if (method_tag_) { + if (grpc_server_request_registered_call( + server, method_tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this) != GRPC_CALL_OK) { + notify_cq, this) != GRPC_CALL_OK) { TeardownRequest(); return; } @@ -384,21 +384,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - void PostShutdownCleanup() { - if (call_) { - grpc_call_unref(call_); - call_ = nullptr; - } - if (cq_) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; - } - } - + void PostShutdownCleanup() { + if (call_) { + grpc_call_unref(call_); + call_ = nullptr; + } + if (cq_) { + grpc_completion_queue_destroy(cq_); + cq_ = nullptr; + } + } + bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + cq_ = nullptr; } if (call_details_) { deadline_ = call_details_->deadline; @@ -408,26 +408,26 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { return true; } - // The CallData class represents a call that is "active" as opposed - // to just being requested. It wraps and takes ownership of the cq from - // the call request + // The CallData class represents a call that is "active" as opposed + // to just being requested. It wraps and takes ownership of the cq from + // the call request class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - request_(nullptr), - method_(mrd->method_), - call_( - mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), method_->method_type(), - server->interceptor_creators_)), - server_(server), - global_callbacks_(nullptr), - resources_(false) { + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), + request_(nullptr), + method_(mrd->method_), + call_( + mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(method_->name(), method_->method_type(), + server->interceptor_creators_)), + server_(server), + global_callbacks_(nullptr), + resources_(false) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -441,82 +441,82 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, - bool resources) { - global_callbacks_ = global_callbacks; - resources_ = resources; + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { + global_callbacks_ = global_callbacks; + resources_ = resources; - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); - if (has_request_payload_) { - // Set interception point for RECV MESSAGE - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - request_ = handler->Deserialize(call_.call(), request_payload_, + if (has_request_payload_) { + // Set interception point for RECV MESSAGE + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + request_ = handler->Deserialize(call_.call(), request_payload_, &request_status_, nullptr); - request_payload_ = nullptr; - interceptor_methods_.AddInterceptionHookPoint( + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_, nullptr); - } - - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } + interceptor_methods_.SetRecvMessage(request_, nullptr); + } + + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } } - void ContinueRunAfterInterception() { - { - ctx_.BeginCompletionOp(&call_, nullptr, nullptr); - global_callbacks_->PreSynchronousRequest(&ctx_); - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); + void ContinueRunAfterInterception() { + { + ctx_.BeginCompletionOp(&call_, nullptr, nullptr); + global_callbacks_->PreSynchronousRequest(&ctx_); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_, request_status_, nullptr, nullptr)); - request_ = nullptr; - global_callbacks_->PostSynchronousRequest(&ctx_); - - cq_.Shutdown(); - + request_ = nullptr; + global_callbacks_->PostSynchronousRequest(&ctx_); + + cq_.Shutdown(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - - /* Ensure the cq_ is shutdown */ + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + + /* Ensure the cq_ is shutdown */ grpc::DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - } - delete this; - } - + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + } + delete this; + } + private: grpc::CompletionQueue cq_; grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - void* request_; + void* request_; grpc::Status request_status_; grpc::internal::RpcServiceMethod* const method_; grpc::internal::Call call_; - Server* server_; - std::shared_ptr<GlobalCallbacks> global_callbacks_; - bool resources_; + Server* server_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: grpc::internal::RpcServiceMethod* const method_; - void* const method_tag_; + void* const method_tag_; bool in_flight_; const bool has_request_payload_; grpc_call* call_; @@ -541,19 +541,19 @@ class Server::CallbackRequest final CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, grpc::CompletionQueue* cq, grpc_core::Server::RegisteredCallAllocation* data) - : server_(server), - method_(method), + : server_(server), + method_(method), has_request_payload_(method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), cq_(cq), - tag_(this) { + tag_(this) { CommonSetup(server, data); data->deadline = &deadline_; data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; - } - + } + // For generic services, method is nullptr since these services don't have // pre-defined methods. CallbackRequest(Server* server, grpc::CompletionQueue* cq, @@ -567,8 +567,8 @@ class Server::CallbackRequest final CommonSetup(server, data); grpc_call_details_init(call_details_); data->details = call_details_; - } - + } + ~CallbackRequest() { delete call_details_; grpc_metadata_array_destroy(&request_metadata_); @@ -576,21 +576,21 @@ class Server::CallbackRequest final grpc_byte_buffer_destroy(request_payload_); } server_->UnrefWithPossibleNotify(); - } - + } + // Needs specialization to account for different processing of metadata // in generic API bool FinalizeResult(void** tag, bool* status) override; - - private: + + private: // method_name needs to be specialized between named method and generic const char* method_name() const; - class CallbackCallTag : public grpc_experimental_completion_queue_functor { - public: + class CallbackCallTag : public grpc_experimental_completion_queue_functor { + public: CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) : req_(req) { - functor_run = &CallbackCallTag::StaticRun; + functor_run = &CallbackCallTag::StaticRun; // Set inlineable to true since this callback is internally-controlled // without taking any locks, and thus does not need to be run from the // executor (which triggers a thread hop). This should only be used by @@ -598,42 +598,42 @@ class Server::CallbackRequest final // here is actually non-trivial, but there is no chance of having user // locks conflict with each other so it's ok to run inlined. inlineable = true; - } - - // force_run can not be performed on a tag if operations using this tag - // have been sent to PerformOpsOnCall. It is intended for error conditions - // that are detected before the operations are internally processed. - void force_run(bool ok) { Run(ok); } - - private: + } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: Server::CallbackRequest<ServerContextType>* req_; grpc::internal::Call* call_; - - static void StaticRun(grpc_experimental_completion_queue_functor* cb, - int ok) { - static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); - } - void Run(bool ok) { - void* ignored = req_; - bool new_ok = ok; - GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == req_); - - if (!ok) { - // The call has been shutdown. - // Delete its contents to free up the request. - delete req_; - return; - } - - // Bind the call, deadline, and metadata from what we got - req_->ctx_.set_call(req_->call_); - req_->ctx_.cq_ = req_->cq_; - req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, - &req_->request_metadata_); - req_->request_metadata_.count = 0; - - // Create a C++ Call to control the underlying core call + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = req_; + bool new_ok = ok; + GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == req_); + + if (!ok) { + // The call has been shutdown. + // Delete its contents to free up the request. + delete req_; + return; + } + + // Bind the call, deadline, and metadata from what we got + req_->ctx_.set_call(req_->call_); + req_->ctx_.cq_ = req_->cq_; + req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); + req_->request_metadata_.count = 0; + + // Create a C++ Call to control the underlying core call call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) grpc::internal::Call( @@ -645,71 +645,71 @@ class Server::CallbackRequest final ? req_->method_->method_type() : grpc::internal::RpcMethod::BIDI_STREAMING, req_->server_->interceptor_creators_)); - - req_->interceptor_methods_.SetCall(call_); - req_->interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - req_->interceptor_methods_.AddInterceptionHookPoint( + + req_->interceptor_methods_.SetCall(call_); + req_->interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); - req_->interceptor_methods_.SetRecvInitialMetadata( - &req_->ctx_.client_metadata_); - - if (req_->has_request_payload_) { - // Set interception point for RECV MESSAGE - req_->request_ = req_->method_->handler()->Deserialize( + req_->interceptor_methods_.SetRecvInitialMetadata( + &req_->ctx_.client_metadata_); + + if (req_->has_request_payload_) { + // Set interception point for RECV MESSAGE + req_->request_ = req_->method_->handler()->Deserialize( req_->call_, req_->request_payload_, &req_->request_status_, &req_->handler_data_); - req_->request_payload_ = nullptr; - req_->interceptor_methods_.AddInterceptionHookPoint( + req_->request_payload_ = nullptr; + req_->interceptor_methods_.AddInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); - } - - if (req_->interceptor_methods_.RunInterceptors( - [this] { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } - } - void ContinueRunAfterInterception() { + req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); + } + + if (req_->interceptor_methods_.RunInterceptors( + [this] { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } + void ContinueRunAfterInterception() { auto* handler = (req_->method_ != nullptr) ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); - } - }; - + } + }; + template <class CallAllocation> void CommonSetup(Server* server, CallAllocation* data) { server->Ref(); - grpc_metadata_array_init(&request_metadata_); + grpc_metadata_array_init(&request_metadata_); data->tag = &tag_; data->call = &call_; data->initial_metadata = &request_metadata_; - } - - Server* const server_; + } + + Server* const server_; grpc::internal::RpcServiceMethod* const method_; - const bool has_request_payload_; + const bool has_request_payload_; grpc_byte_buffer* request_payload_ = nullptr; void* request_ = nullptr; void* handler_data_ = nullptr; grpc::Status request_status_; grpc_call_details* const call_details_ = nullptr; - grpc_call* call_; - gpr_timespec deadline_; - grpc_metadata_array request_metadata_; + grpc_call* call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; - CallbackCallTag tag_; + CallbackCallTag tag_; ServerContextType ctx_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; -}; - +}; + template <> bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( void** /*tag*/, bool* /*status*/) { @@ -750,13 +750,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - grpc_resource_quota* rq, int min_pollers, - int max_pollers, int cq_timeout_msec) - : ThreadManager("SyncServer", rq, min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), - global_callbacks_(std::move(global_callbacks)) {} + global_callbacks_(std::move(global_callbacks)) {} WorkStatus PollForWork(void** tag, bool* ok) override { *tag = nullptr; @@ -778,7 +778,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok, bool resources) override { + void DoWork(void* tag, bool ok, bool resources) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -789,9 +789,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } if (ok) { - // Calldata takes ownership of the completion queue and interceptors - // inside sync_req - auto* cd = new SyncRequest::CallData(server_, sync_req); + // Calldata takes ownership of the completion queue and interceptors + // inside sync_req + auto* cd = new SyncRequest::CallData(server_, sync_req); // Prepare for the next request if (!IsShutdown()) { sync_req->SetupRequest(); // Create new completion queue for sync_req @@ -799,7 +799,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd->Run(global_callbacks_, resources); + cd->Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -831,17 +831,17 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void* tag; bool ok; while (server_cq_->Next(&tag, &ok)) { - if (ok) { - // If a request was pulled off the queue, it means that the thread - // handling the request added it to the completion queue after shutdown - // was called - because the thread had already started and checked the - // shutdown flag before shutdown was called. In this case, we simply - // clean it up here, *after* calling wait on all the worker threads, at - // which point we are certain no in-flight requests will add more to the - // queue. This fixes an intermittent memory leak on shutdown. - SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - sync_req->PostShutdownCleanup(); - } + if (ok) { + // If a request was pulled off the queue, it means that the thread + // handling the request added it to the completion queue after shutdown + // was called - because the thread had already started and checked the + // shutdown flag before shutdown was called. In this case, we simply + // clean it up here, *after* calling wait on all the worker threads, at + // which point we are certain no in-flight requests will add more to the + // queue. This fixes an intermittent memory leak on shutdown. + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + sync_req->PostShutdownCleanup(); + } } } @@ -870,17 +870,17 @@ Server::Server( grpc::ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, + int min_pollers, int max_pollers, int sync_cq_timeout_msec, std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors, - grpc_resource_quota* server_rq, - std::vector< + grpc_resource_quota* server_rq, + std::vector< std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> - interceptor_creators) + interceptor_creators) : acceptors_(std::move(acceptors)), interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(INT_MIN), - sync_server_cqs_(std::move(sync_server_cqs)), + sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), shutdown_notified_(false), @@ -893,23 +893,23 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { - bool default_rq_created = false; - if (server_rq == nullptr) { - server_rq = grpc_resource_quota_create("SyncServer-default-rq"); - grpc_resource_quota_set_max_threads(server_rq, - DEFAULT_MAX_SYNC_SERVER_THREADS); - default_rq_created = true; - } - + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-default-rq"); + grpc_resource_quota_set_max_threads(server_rq, + DEFAULT_MAX_SYNC_SERVER_THREADS); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, server_rq, min_pollers, - max_pollers, sync_cq_timeout_msec)); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); + } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); } - - if (default_rq_created) { - grpc_resource_quota_unref(server_rq); - } } for (auto& acceptor : acceptors_) { @@ -974,24 +974,24 @@ std::shared_ptr<grpc::Channel> Server::InProcessChannel( const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( - "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), + "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), std::vector<std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<grpc::Channel> -Server::experimental_type::InProcessChannelWithInterceptors( +Server::experimental_type::InProcessChannelWithInterceptors( const grpc::ChannelArguments& args, - std::vector< + std::vector< std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) { - grpc_channel_args channel_args = args.c_channel_args(); + interceptor_creators) { + grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( - "inproc", - grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), - std::move(interceptor_creators)); -} - + "inproc", + grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), + std::move(interceptor_creators)); +} + static grpc_server_register_method_payload_handling PayloadHandlingForMethod( grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { @@ -1014,29 +1014,29 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { } const char* method_name = nullptr; - + for (const auto& method : service->methods_) { if (method.get() == nullptr) { // Handled by generic service if any. continue; } - void* method_registration_tag = grpc_server_register_method( + void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method.get()), 0); - if (method_registration_tag == nullptr) { + if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - if (method->handler() == nullptr) { // Async method without handler - method->set_server_tag(method_registration_tag); - } else if (method->api_type() == + if (method->handler() == nullptr) { // Async method without handler + method->set_server_tag(method_registration_tag); + } else if (method->api_type() == grpc::internal::RpcServiceMethod::ApiType::SYNC) { for (const auto& value : sync_req_mgrs_) { value->AddSyncMethod(method.get(), method_registration_tag); } - } else { + } else { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); @@ -1130,23 +1130,23 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // explicit one. grpc::ServerCompletionQueue* health_check_cq = nullptr; grpc::DefaultHealthCheckService::HealthCheckServiceImpl* - default_health_check_service_impl = nullptr; + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && grpc::DefaultHealthCheckServiceEnabled()) { auto* default_hc_service = new grpc::DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - // We create a non-polling CQ to avoid impacting application - // performance. This ensures that we don't introduce thread hops - // for application requests that wind up on this CQ, which is polled - // in its own thread. + health_check_service_.reset(default_hc_service); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. health_check_cq = new grpc::ServerCompletionQueue( GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); - grpc_server_register_completion_queue(server_, health_check_cq->cq(), - nullptr); - default_health_check_service_impl = - default_hc_service->GetHealthCheckService( + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); - RegisterService(nullptr, default_health_check_service_impl); + RegisterService(nullptr, default_health_check_service_impl); } for (auto& acceptor : acceptors_) { @@ -1179,26 +1179,26 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } - if (health_check_cq != nullptr) { - new UnimplementedAsyncRequest(this, health_check_cq); - } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } } - // If this server has any support for synchronous methods (has any sync - // server CQs), make sure that we have a ResourceExhausted handler - // to deal with the case of thread exhaustion - if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { + // If this server has any support for synchronous methods (has any sync + // server CQs), make sure that we have a ResourceExhausted handler + // to deal with the case of thread exhaustion + if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { resource_exhausted_handler_.reset( new grpc::internal::ResourceExhaustedHandler); - } - + } + for (const auto& value : sync_req_mgrs_) { value->Start(); } - - if (default_health_check_service_impl != nullptr) { - default_health_check_service_impl->StartServingThread(); - } + + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } for (auto& acceptor : acceptors_) { acceptor->Start(); @@ -1207,50 +1207,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { void Server::ShutdownInternal(gpr_timespec deadline) { grpc::internal::MutexLock lock(&mu_); - if (shutdown_) { - return; - } + if (shutdown_) { + return; + } - shutdown_ = true; + shutdown_ = true; for (auto& acceptor : acceptors_) { acceptor->Shutdown(); } - /// The completion queue to use for server shutdown completion notification + /// The completion queue to use for server shutdown completion notification grpc::CompletionQueue shutdown_cq; grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag - grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - shutdown_cq.Shutdown(); + shutdown_cq.Shutdown(); - void* tag; - bool ok; + void* tag; + bool ok; grpc::CompletionQueue::NextStatus status = - shutdown_cq.AsyncNext(&tag, &ok, deadline); + shutdown_cq.AsyncNext(&tag, &ok, deadline); - // If this timed out, it means we are done with the grace period for a clean - // shutdown. We should force a shutdown now by cancelling all inflight calls + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { - grpc_server_cancel_all_calls(server_); - } - // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has - // successfully shutdown + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown - // Shutdown all ThreadManagers. This will try to gracefully stop all the - // threads in the ThreadManagers (once they process any inflight requests) + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) for (const auto& value : sync_req_mgrs_) { value->Shutdown(); // ThreadManager's Shutdown() - } + } - // Wait for threads in all ThreadManagers to terminate + // Wait for threads in all ThreadManagers to terminate for (const auto& value : sync_req_mgrs_) { value->Wait(); - } + } // Drop the shutdown ref and wait for all other refs to drop as well. UnrefAndWaitLocked(); - + // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. if (callback_cq_ != nullptr) { @@ -1258,13 +1258,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { callback_cq_ = nullptr; } - // Drain the shutdown queue (if the previous call to AsyncNext() timed out - // and we didn't remove the tag from the queue yet) - while (shutdown_cq.Next(&tag, &ok)) { - // Nothing to be done here. Just ignore ok and tag values - } - - shutdown_notified_ = true; + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here. Just ignore ok and tag values + } + + shutdown_notified_ = true; shutdown_cv_.Broadcast(); #ifndef NDEBUG @@ -1286,23 +1286,23 @@ void Server::Wait() { void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, grpc::internal::Call* call) { - ops->FillOps(call); + ops->FillOps(call); } bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (GenericAsyncRequest::FinalizeResult(tag, status)) { - // We either had no interceptors run or we are done intercepting - if (*status) { + if (GenericAsyncRequest::FinalizeResult(tag, status)) { + // We either had no interceptors run or we are done intercepting + if (*status) { // Create a new request/response pair using the server and CQ values // stored in this object's base class. new UnimplementedAsyncRequest(server_, notification_cq_); - new UnimplementedAsyncResponse(this); - } else { - delete this; - } + new UnimplementedAsyncResponse(this); + } else { + delete this; + } } else { - // The tag was swallowed due to interception. We will see it again. + // The tag was swallowed due to interception. We will see it again. } return false; } @@ -1320,8 +1320,8 @@ grpc::ServerInitializer* Server::initializer() { } grpc::CompletionQueue* Server::CallbackCQ() { - // TODO(vjpai): Consider using a single global CQ for the default CQ - // if there is no explicit per-server CQ registered + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-server CQ registered grpc::internal::MutexLock l(&mu_); if (callback_cq_ != nullptr) { return callback_cq_; @@ -1330,11 +1330,11 @@ grpc::CompletionQueue* Server::CallbackCQ() { callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); - + // Transfer ownership of the new cq to its own shutdown callback shutdown_callback->TakeCQ(callback_cq_); - return callback_cq_; + return callback_cq_; } - + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index fea258e6e75..458ac20d87c 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -43,50 +43,50 @@ class ServerContextBase::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - // must ref the call before calling constructor and after deleting this + // must ref the call before calling constructor and after deleting this CompletionOp(internal::Call* call, ::grpc::internal::ServerCallbackCall* callback_controller) - : call_(*call), + : call_(*call), callback_controller_(callback_controller), - has_tag_(false), + has_tag_(false), tag_(nullptr), - core_cq_tag_(this), + core_cq_tag_(this), refs_(2), finalized_(false), - cancelled_(0), - done_intercepting_(false) {} - - // CompletionOp isn't copyable or movable - CompletionOp(const CompletionOp&) = delete; - CompletionOp& operator=(const CompletionOp&) = delete; - CompletionOp(CompletionOp&&) = delete; - CompletionOp& operator=(CompletionOp&&) = delete; - - ~CompletionOp() { - if (call_.server_rpc_info()) { - call_.server_rpc_info()->Unref(); - } - } - + cancelled_(0), + done_intercepting_(false) {} + + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + + ~CompletionOp() { + if (call_.server_rpc_info()) { + call_.server_rpc_info()->Unref(); + } + } + void FillOps(internal::Call* call) override; - - // This should always be arena allocated in the call, so override delete. - // But this class is not trivially destructible, so must actually call delete - // before allowing the arena to be freed + + // This should always be arena allocated in the call, so override delete. + // But this class is not trivially destructible, so must actually call delete + // before allowing the arena to be freed static void operator delete(void* /*ptr*/, std::size_t size) { // Use size to avoid unused-parameter warning since assert seems to be // compiled out and treated as unused in some gcc optimized versions. (void)size; - assert(size == sizeof(CompletionOp)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - + assert(size == sizeof(CompletionOp)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -100,36 +100,36 @@ class ServerContextBase::CompletionOp final tag_ = tag; } - void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } - - void* core_cq_tag() override { return core_cq_tag_; } - + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } + + void* core_cq_tag() override { return core_cq_tag_; } + void Unref(); - // This will be called while interceptors are run if the RPC is a hijacked - // RPC. This should set hijacking state for each of the ops. - void SetHijackingState() override { - /* Servers don't allow hijacking */ + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + void SetHijackingState() override { + /* Servers don't allow hijacking */ GPR_ASSERT(false); - } - - /* Should be called after interceptors are done running */ - void ContinueFillOpsAfterInterception() override {} - - /* Should be called after interceptors are done running on the finalize result - * path */ - void ContinueFinalizeResultAfterInterception() override { - done_intercepting_ = true; - if (!has_tag_) { - /* We don't have a tag to return. */ + } + + /* Should be called after interceptors are done running */ + void ContinueFillOpsAfterInterception() override {} + + /* Should be called after interceptors are done running on the finalize result + * path */ + void ContinueFinalizeResultAfterInterception() override { + done_intercepting_ = true; + if (!has_tag_) { + /* We don't have a tag to return. */ Unref(); - return; - } - /* Start a dummy op so that we can return the tag */ + return; + } + /* Start a dummy op so that we can return the tag */ GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr) == GRPC_CALL_OK); - } - + } + private: bool CheckCancelledNoPluck() { grpc_core::MutexLock lock(&mu_); @@ -140,37 +140,37 @@ class ServerContextBase::CompletionOp final ::grpc::internal::ServerCallbackCall* const callback_controller_; bool has_tag_; void* tag_; - void* core_cq_tag_; + void* core_cq_tag_; grpc_core::RefCount refs_; grpc_core::Mutex mu_; bool finalized_; - int cancelled_; // This is an int (not bool) because it is passed to core - bool done_intercepting_; + int cancelled_; // This is an int (not bool) because it is passed to core + bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; void ServerContextBase::CompletionOp::Unref() { if (refs_.Unref()) { - grpc_call* call = call_.call(); + grpc_call* call = call_.call(); delete this; - grpc_call_unref(call); + grpc_call_unref(call); } } void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { - grpc_op ops; - ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops.data.recv_close_on_server.cancelled = &cancelled_; - ops.flags = 0; - ops.reserved = nullptr; - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - interceptor_methods_.SetCallOpSetInterface(this); + grpc_op ops; + ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops.data.recv_close_on_server.cancelled = &cancelled_; + ops.flags = 0; + ops.reserved = nullptr; + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + interceptor_methods_.SetCallOpSetInterface(this); // The following call_start_batch is internally-generated so no need for an // explanatory log on failure. GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, nullptr) == GRPC_CALL_OK); - /* No interceptors to run here */ + /* No interceptors to run here */ } bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { @@ -187,9 +187,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { } Unref(); return has_tag; - } + } finalized_ = true; - + // If for some reason the incoming status is false, mark that as a // cancellation. // TODO(vjpai): does this ever happen? @@ -200,24 +200,24 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { call_cancel = (cancelled_ != 0); // Release the lock since we may call a callback and interceptors. } - + if (call_cancel && callback_controller_ != nullptr) { callback_controller_->MaybeCallOnCancel(); } - /* Add interception point and run through interceptors */ - interceptor_methods_.AddInterceptionHookPoint( + /* Add interception point and run through interceptors */ + interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE); - if (interceptor_methods_.RunInterceptors()) { + if (interceptor_methods_.RunInterceptors()) { // No interceptors were run bool has_tag = has_tag_; if (has_tag) { - *tag = tag_; - } + *tag = tag_; + } Unref(); return has_tag; - } + } // There are interceptors to be run. Return false for now. - return false; + return false; } // ServerContextBase body @@ -233,17 +233,17 @@ ServerContextBase::ServerContextBase(gpr_timespec deadline, void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr) { - deadline_ = deadline; - std::swap(*client_metadata_.arr(), *arr); -} - + deadline_ = deadline; + std::swap(*client_metadata_.arr(), *arr); +} + ServerContextBase::~ServerContextBase() { if (completion_op_) { completion_op_->Unref(); } - if (rpc_info_) { - rpc_info_->Unref(); - } + if (rpc_info_) { + rpc_info_->Unref(); + } if (default_reactor_used_.load(std::memory_order_relaxed)) { reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); } @@ -261,19 +261,19 @@ void ServerContextBase::BeginCompletionOp( internal::Call* call, std::function<void(bool)> callback, ::grpc::internal::ServerCallbackCall* callback_controller) { GPR_ASSERT(!completion_op_); - if (rpc_info_) { - rpc_info_->Ref(); - } - grpc_call_ref(call->call()); - completion_op_ = - new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) + if (rpc_info_) { + rpc_info_->Ref(); + } + grpc_call_ref(call->call()); + completion_op_ = + new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call, callback_controller); if (callback_controller != nullptr) { completion_tag_.Set(call->call(), std::move(callback), completion_op_, true); - completion_op_->set_core_cq_tag(&completion_tag_); - completion_op_->set_tag(completion_op_); - } else if (has_notify_when_done_tag_) { + completion_op_->set_core_cq_tag(&completion_tag_); + completion_op_->set_tag(completion_op_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -295,11 +295,11 @@ void ServerContextBase::AddTrailingMetadata(const TString& key, void ServerContextBase::TryCancel() const { internal::CancelInterceptorBatchMethods cancel_methods; - if (rpc_info_) { - for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { - rpc_info_->RunInterceptor(&cancel_methods, i); - } - } + if (rpc_info_) { + for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { + rpc_info_->RunInterceptor(&cancel_methods, i); + } + } grpc_call_error err = grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); @@ -309,15 +309,15 @@ void ServerContextBase::TryCancel() const { } bool ServerContextBase::IsCancelled() const { - if (completion_tag_) { - // When using callback API, this result is always valid. - return completion_op_->CheckCancelledAsync(); - } else if (has_notify_when_done_tag_) { - // When using async API, the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API, the result is always valid + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc index c93129dc5f8..c8560aa81dd 100644 --- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc +++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc @@ -22,7 +22,7 @@ #include <grpc/support/log.h> #include "src/core/lib/gprpp/thd.h" -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc { @@ -49,17 +49,17 @@ ThreadManager::WorkerThread::~WorkerThread() { thd_.Join(); } -ThreadManager::ThreadManager(const char* name, - grpc_resource_quota* resource_quota, - int min_pollers, int max_pollers) +ThreadManager::ThreadManager(const char* name, + grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0), - max_active_threads_sofar_(0) { - resource_user_ = grpc_resource_user_create(resource_quota, name); -} + num_threads_(0), + max_active_threads_sofar_(0) { + resource_user_ = grpc_resource_user_create(resource_quota, name); +} ThreadManager::~ThreadManager() { { @@ -67,8 +67,8 @@ ThreadManager::~ThreadManager() { GPR_ASSERT(num_threads_ == 0); } - grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx - grpc_resource_user_unref(resource_user_); + grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx + grpc_resource_user_unref(resource_user_); CleanupCompletedThreads(); } @@ -89,27 +89,27 @@ bool ThreadManager::IsShutdown() { return shutdown_; } -int ThreadManager::GetMaxActiveThreadsSoFar() { +int ThreadManager::GetMaxActiveThreadsSoFar() { grpc_core::MutexLock list_lock(&list_mu_); - return max_active_threads_sofar_; -} - + return max_active_threads_sofar_; +} + void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { grpc_core::MutexLock list_lock(&list_mu_); completed_threads_.push_back(thd); } - { + { grpc_core::MutexLock lock(&mu_); - num_threads_--; - if (num_threads_ == 0) { + num_threads_--; + if (num_threads_ == 0) { shutdown_cv_.Signal(); - } + } } - - // Give a thread back to the resource quota - grpc_resource_user_free_threads(resource_user_, 1); + + // Give a thread back to the resource quota + grpc_resource_user_free_threads(resource_user_, 1); } void ThreadManager::CleanupCompletedThreads() { @@ -124,19 +124,19 @@ void ThreadManager::CleanupCompletedThreads() { } void ThreadManager::Initialize() { - if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) { - gpr_log(GPR_ERROR, - "No thread quota available to even create the minimum required " - "polling threads (i.e %d). Unable to start the thread manager", - min_pollers_); - abort(); - } - + if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) { + gpr_log(GPR_ERROR, + "No thread quota available to even create the minimum required " + "polling threads (i.e %d). Unable to start the thread manager", + min_pollers_); + abort(); + } + { grpc_core::MutexLock lock(&mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; - max_active_threads_sofar_ = min_pollers_; + max_active_threads_sofar_ = min_pollers_; } for (int i = 0; i < min_pollers_; i++) { @@ -167,18 +167,18 @@ void ThreadManager::MainWorkLoop() { done = true; break; case WORK_FOUND: - // If we got work and there are now insufficient pollers and there is - // quota available to create a new thread, start a new poller thread - bool resource_exhausted = false; + // If we got work and there are now insufficient pollers and there is + // quota available to create a new thread, start a new poller thread + bool resource_exhausted = false; if (!shutdown_ && num_pollers_ < min_pollers_) { - if (grpc_resource_user_allocate_threads(resource_user_, 1)) { - // We can allocate a new poller thread - num_pollers_++; - num_threads_++; - if (num_threads_ > max_active_threads_sofar_) { - max_active_threads_sofar_ = num_threads_; - } - // Drop lock before spawning thread to avoid contention + if (grpc_resource_user_allocate_threads(resource_user_, 1)) { + // We can allocate a new poller thread + num_pollers_++; + num_threads_++; + if (num_threads_ > max_active_threads_sofar_) { + max_active_threads_sofar_ = num_threads_; + } + // Drop lock before spawning thread to avoid contention lock.Unlock(); WorkerThread* worker = new WorkerThread(this); if (worker->created()) { @@ -191,26 +191,26 @@ void ThreadManager::MainWorkLoop() { resource_exhausted = true; delete worker; } - } else if (num_pollers_ > 0) { - // There is still at least some thread polling, so we can go on - // even though we are below the number of pollers that we would - // like to have (min_pollers_) + } else if (num_pollers_ > 0) { + // There is still at least some thread polling, so we can go on + // even though we are below the number of pollers that we would + // like to have (min_pollers_) lock.Unlock(); - } else { - // There are no pollers to spare and we couldn't allocate - // a new thread, so resources are exhausted! + } else { + // There are no pollers to spare and we couldn't allocate + // a new thread, so resources are exhausted! lock.Unlock(); - resource_exhausted = true; - } + resource_exhausted = true; + } } else { - // There are a sufficient number of pollers available so we can do - // the work and continue polling with our existing poller threads + // There are a sufficient number of pollers available so we can do + // the work and continue polling with our existing poller threads lock.Unlock(); } // Lock is always released at this point - do the application work - // or return resource exhausted if there is new work but we couldn't - // get a thread in which to do it. - DoWork(tag, ok, !resource_exhausted); + // or return resource exhausted if there is new work but we couldn't + // get a thread in which to do it. + DoWork(tag, ok, !resource_exhausted); // Take the lock again to check post conditions lock.Lock(); // If we're shutdown, we should finish at this point. @@ -254,8 +254,8 @@ void ThreadManager::MainWorkLoop() { } }; - // This thread is exiting. Do some cleanup work i.e delete already completed - // worker threads + // This thread is exiting. Do some cleanup work i.e delete already completed + // worker threads CleanupCompletedThreads(); // If we are here, either ThreadManager is shutting down or it already has diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h index 902b237cfdf..43f1fd5585f 100644 --- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h +++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h @@ -26,14 +26,14 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" -#include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/resource_quota.h" namespace grpc { class ThreadManager { public: - explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota, - int min_pollers, int max_pollers); + explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -66,14 +66,14 @@ class ThreadManager { // The implementation of DoWork() is supposed to perform the work found by // PollForWork(). The tag and ok parameters are the same as returned by - // PollForWork(). The resources parameter indicates that the call actually - // has the resources available for performing the RPC's work. If it doesn't, - // the implementation should fail it appropriately. + // PollForWork(). The resources parameter indicates that the call actually + // has the resources available for performing the RPC's work. If it doesn't, + // the implementation should fail it appropriately. // // The implementation of DoWork() should also do any setup needed to ensure // that the next call to PollForWork() (not necessarily by the current thread) // actually finds some work - virtual void DoWork(void* tag, bool ok, bool resources) = 0; + virtual void DoWork(void* tag, bool ok, bool resources) = 0; // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which @@ -87,11 +87,11 @@ class ThreadManager { // all the threads have drained all the outstanding work virtual void Wait(); - // Max number of concurrent threads that were ever active in this thread - // manager so far. This is useful for debugging purposes (and in unit tests) - // to check if resource_quota is properly being enforced. - int GetMaxActiveThreadsSoFar(); - + // Max number of concurrent threads that were ever active in this thread + // manager so far. This is useful for debugging purposes (and in unit tests) + // to check if resource_quota is properly being enforced. + int GetMaxActiveThreadsSoFar(); + private: // Helper wrapper class around grpc_core::Thread. Takes a ThreadManager object // and starts a new grpc_core::Thread to calls the Run() function. @@ -99,24 +99,24 @@ class ThreadManager { // The Run() function calls ThreadManager::MainWorkLoop() function and once // that completes, it marks the WorkerThread completed by calling // ThreadManager::MarkAsCompleted() - // - // WHY IS THIS NEEDED?: - // When a thread terminates, some other thread *must* call Join() on that - // thread so that the resources are released. Having a WorkerThread wrapper - // will make this easier. Once Run() completes, each thread calls the - // following two functions: - // ThreadManager::CleanupCompletedThreads() - // ThreadManager::MarkAsCompleted() - // - // - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's - // completed_threads_ list - // - CleanupCompletedThreads() calls "Join()" on the threads that are already - // in the completed_threads_ list (since a thread cannot call Join() on - // itself, it calls CleanupCompletedThreads() *before* calling - // MarkAsCompleted()) - // - // TODO(sreek): Consider creating the threads 'detached' so that Join() need - // not be called (and the need for this WorkerThread class is eliminated) + // + // WHY IS THIS NEEDED?: + // When a thread terminates, some other thread *must* call Join() on that + // thread so that the resources are released. Having a WorkerThread wrapper + // will make this easier. Once Run() completes, each thread calls the + // following two functions: + // ThreadManager::CleanupCompletedThreads() + // ThreadManager::MarkAsCompleted() + // + // - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's + // completed_threads_ list + // - CleanupCompletedThreads() calls "Join()" on the threads that are already + // in the completed_threads_ list (since a thread cannot call Join() on + // itself, it calls CleanupCompletedThreads() *before* calling + // MarkAsCompleted()) + // + // TODO(sreek): Consider creating the threads 'detached' so that Join() need + // not be called (and the need for this WorkerThread class is eliminated) class WorkerThread { public: WorkerThread(ThreadManager* thd_mgr); @@ -141,21 +141,21 @@ class ThreadManager { void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); - // Protects shutdown_, num_pollers_, num_threads_ and - // max_active_threads_sofar_ + // Protects shutdown_, num_pollers_, num_threads_ and + // max_active_threads_sofar_ grpc_core::Mutex mu_; bool shutdown_; grpc_core::CondVar shutdown_cv_; - // The resource user object to use when requesting quota to create threads - // - // Note: The user of this ThreadManager object must create grpc_resource_quota - // object (that contains the actual max thread quota) and a grpc_resource_user + // The resource user object to use when requesting quota to create threads + // + // Note: The user of this ThreadManager object must create grpc_resource_quota + // object (that contains the actual max thread quota) and a grpc_resource_user // object through which quota is requested whenever new threads need to be - // created - grpc_resource_user* resource_user_; - + // created + grpc_resource_user* resource_user_; + // Number of threads doing polling int num_pollers_; @@ -163,15 +163,15 @@ class ThreadManager { int min_pollers_; int max_pollers_; - // The total number of threads currently active (includes threads includes the - // threads that are currently polling i.e num_pollers_) + // The total number of threads currently active (includes threads includes the + // threads that are currently polling i.e num_pollers_) int num_threads_; - // See GetMaxActiveThreadsSoFar()'s description. - // To be more specific, this variable tracks the max value num_threads_ was - // ever set so far - int max_active_threads_sofar_; - + // See GetMaxActiveThreadsSoFar()'s description. + // To be more specific, this variable tracks the max value num_threads_ was + // ever set so far + int max_active_threads_sofar_; + grpc_core::Mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; |