diff options
author | dvshkurko <dvshkurko@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
commit | 321ee9bce31ec6e238be26dbcbe539cffa2c3309 (patch) | |
tree | 14407a2757cbf29eb97e266b7f07e851f971000c /contrib/libs/grpc/src/cpp | |
parent | 2f6ca198245aeffd5e2d82b65927c2465b68b4f5 (diff) | |
download | ydb-321ee9bce31ec6e238be26dbcbe539cffa2c3309.tar.gz |
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp')
46 files changed, 1816 insertions, 1816 deletions
diff --git a/contrib/libs/grpc/src/cpp/README.md b/contrib/libs/grpc/src/cpp/README.md index 967a0a43b7..e7099c03aa 100755 --- a/contrib/libs/grpc/src/cpp/README.md +++ b/contrib/libs/grpc/src/cpp/README.md @@ -137,26 +137,26 @@ if you later want to remove the grpc and/or protobuf installation or upgrade to ## Packaging systems -We do not officially support any packaging system for C++, but there are some community-maintained packages that are kept up-to-date -and are known to work well. More contributions and support for popular packaging systems are welcome! - -### Install using vcpkg package -gRPC is available using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager: - -``` -# install vcpkg package manager on your system using the official instructions -git clone https://github.com/Microsoft/vcpkg.git -cd vcpkg -./bootstrap-vcpkg.sh -./vcpkg integrate install - -# install gRPC using vcpkg package manager -vcpkg install grpc -``` - -The gRPC port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository. - - +We do not officially support any packaging system for C++, but there are some community-maintained packages that are kept up-to-date +and are known to work well. More contributions and support for popular packaging systems are welcome! + +### Install using vcpkg package +gRPC is available using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager: + +``` +# install vcpkg package manager on your system using the official instructions +git clone https://github.com/Microsoft/vcpkg.git +cd vcpkg +./bootstrap-vcpkg.sh +./vcpkg integrate install + +# install gRPC using vcpkg package manager +vcpkg install grpc +``` + +The gRPC port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository. + + ## Examples & Additional Documentation You can find out how to build and run our simplest gRPC C++ example in our diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc index ac95c29efc..0d3fcc1444 100644 --- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc +++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc @@ -43,11 +43,11 @@ namespace grpc { -static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const TString& host, grpc_channel* channel, - std::vector<std::unique_ptr< - ::grpc::experimental::ClientInterceptorFactoryInterface>> - interceptor_creators) + std::vector<std::unique_ptr< + ::grpc::experimental::ClientInterceptorFactoryInterface>> + interceptor_creators) : host_(host), c_channel_(channel) { interceptor_creators_ = std::move(interceptor_creators); g_gli_initializer.summon(); @@ -101,9 +101,9 @@ void ChannelResetConnectionBackoff(Channel* channel) { } // namespace experimental -::grpc::internal::Call Channel::CreateCallInternal( - const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, - ::grpc::CompletionQueue* cq, size_t interceptor_pos) { +::grpc::internal::Call Channel::CreateCallInternal( + const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, + ::grpc::CompletionQueue* cq, size_t interceptor_pos) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = nullptr; if (kRegistered) { @@ -122,7 +122,7 @@ void ChannelResetConnectionBackoff(Channel* channel) { SliceFromArray(method.name(), strlen(method.name())); grpc_slice host_slice; if (host_str != nullptr) { - host_slice = ::grpc::SliceFromCopiedString(*host_str); + host_slice = ::grpc::SliceFromCopiedString(*host_str); } c_call = grpc_channel_create_call( c_channel_, context->propagate_from_call_, @@ -138,23 +138,23 @@ void ChannelResetConnectionBackoff(Channel* channel) { // ClientRpcInfo should be set before call because set_call also checks // whether the call has been cancelled, and if the call was cancelled, we - // should notify the interceptors too. + // should notify the interceptors too. auto* info = context->set_client_rpc_info(method.name(), method.method_type(), this, interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); - return ::grpc::internal::Call(c_call, this, cq, info); + return ::grpc::internal::Call(c_call, this, cq, info); } -::grpc::internal::Call Channel::CreateCall( - const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, - CompletionQueue* cq) { +::grpc::internal::Call Channel::CreateCall( + const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, + CompletionQueue* cq) { return CreateCallInternal(method, context, cq, 0); } -void Channel::PerformOpsOnCall(::grpc::internal::CallOpSetInterface* ops, - ::grpc::internal::Call* call) { +void Channel::PerformOpsOnCall(::grpc::internal::CallOpSetInterface* ops, + ::grpc::internal::Call* call) { ops->FillOps( call); // Make a copy of call. It's fine since Call just has pointers } @@ -170,11 +170,11 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { namespace { -class TagSaver final : public ::grpc::internal::CompletionQueueTag { +class TagSaver final : public ::grpc::internal::CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* /*status*/) override { + bool FinalizeResult(void** tag, bool* /*status*/) override { *tag = tag_; delete this; return true; @@ -188,7 +188,7 @@ class TagSaver final : public ::grpc::internal::CompletionQueueTag { void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, - ::grpc::CompletionQueue* cq, void* tag) { + ::grpc::CompletionQueue* cq, void* tag) { TagSaver* tag_saver = new TagSaver(tag); grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline, cq->cq(), tag_saver); @@ -196,7 +196,7 @@ void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) { - ::grpc::CompletionQueue cq; + ::grpc::CompletionQueue cq; bool ok = false; void* tag = nullptr; NotifyOnStateChangeImpl(last_observed, deadline, &cq, nullptr); @@ -208,17 +208,17 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, namespace { class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: - ShutdownCallback() { - functor_run = &ShutdownCallback::Run; - // Set inlineable to true since this callback is trivial and thus does not - // need to be run from the executor (triggering a thread hop). This should - // only be used by internal callbacks like this and not by user application - // code. - inlineable = true; - } + ShutdownCallback() { + functor_run = &ShutdownCallback::Run; + // Set inlineable to true since this callback is trivial and thus does not + // need to be run from the executor (triggering a thread hop). This should + // only be used by internal callbacks like this and not by user application + // code. + inlineable = true; + } // TakeCQ takes ownership of the cq into the shutdown callback // so that the shutdown callback will be responsible for destroying it - void TakeCQ(::grpc::CompletionQueue* cq) { cq_ = cq; } + void TakeCQ(::grpc::CompletionQueue* cq) { cq_ = cq; } // The Run function will get invoked by the completion queue library // when the shutdown is actually complete @@ -229,17 +229,17 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { } private: - ::grpc::CompletionQueue* cq_ = nullptr; + ::grpc::CompletionQueue* cq_ = nullptr; }; } // namespace -::grpc::CompletionQueue* Channel::CallbackCQ() { +::grpc::CompletionQueue* Channel::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-channel CQ registered - grpc::internal::MutexLock l(&mu_); + grpc::internal::MutexLock l(&mu_); if (callback_cq_ == nullptr) { auto* shutdown_callback = new ShutdownCallback; - callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ + callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/contrib/libs/grpc/src/cpp/client/client_context.cc b/contrib/libs/grpc/src/cpp/client/client_context.cc index b75343d089..e3cb5cef57 100644 --- a/contrib/libs/grpc/src/cpp/client/client_context.cc +++ b/contrib/libs/grpc/src/cpp/client/client_context.cc @@ -25,7 +25,7 @@ #include <grpc/support/string_util.h> #include <grpcpp/impl/codegen/interceptor_common.h> -#include <grpcpp/impl/codegen/sync.h> +#include <grpcpp/impl/codegen/sync.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/security/credentials.h> #include <grpcpp/server_context.h> @@ -33,14 +33,14 @@ namespace grpc { -class Channel; - +class Channel; + class DefaultGlobalClientCallbacks final : public ClientContext::GlobalCallbacks { public: ~DefaultGlobalClientCallbacks() override {} - void DefaultConstructor(ClientContext* /*context*/) override {} - void Destructor(ClientContext* /*context*/) override {} + void DefaultConstructor(ClientContext* /*context*/) override {} + void Destructor(ClientContext* /*context*/) override {} }; static internal::GrpcLibraryInitializer g_gli_initializer; @@ -72,23 +72,23 @@ ClientContext::~ClientContext() { g_client_callbacks->Destructor(this); } -void ClientContext::set_credentials( +void ClientContext::set_credentials( const std::shared_ptr<CallCredentials>& creds) { - creds_ = creds; - // If call_ is set, we have already created the call, and set the call - // credentials. This should only be done before we have started the batch - // for sending initial metadata. - if (creds_ != nullptr && call_ != nullptr) { - if (!creds_->ApplyToCall(call_)) { - SendCancelToInterceptors(); - grpc_call_cancel_with_status(call_, GRPC_STATUS_CANCELLED, - "Failed to set credentials to rpc.", - nullptr); - } - } -} - -std::unique_ptr<ClientContext> ClientContext::FromInternalServerContext( + creds_ = creds; + // If call_ is set, we have already created the call, and set the call + // credentials. This should only be done before we have started the batch + // for sending initial metadata. + if (creds_ != nullptr && call_ != nullptr) { + if (!creds_->ApplyToCall(call_)) { + SendCancelToInterceptors(); + grpc_call_cancel_with_status(call_, GRPC_STATUS_CANCELLED, + "Failed to set credentials to rpc.", + nullptr); + } + } +} + +std::unique_ptr<ClientContext> ClientContext::FromInternalServerContext( const grpc::ServerContextBase& context, PropagationOptions options) { std::unique_ptr<ClientContext> ctx(new ClientContext); ctx->propagate_from_call_ = context.call_.call; @@ -96,17 +96,17 @@ std::unique_ptr<ClientContext> ClientContext::FromInternalServerContext( return ctx; } -std::unique_ptr<ClientContext> ClientContext::FromServerContext( +std::unique_ptr<ClientContext> ClientContext::FromServerContext( const grpc::ServerContext& server_context, PropagationOptions options) { - return FromInternalServerContext(server_context, options); -} - -std::unique_ptr<ClientContext> ClientContext::FromCallbackServerContext( + return FromInternalServerContext(server_context, options); +} + +std::unique_ptr<ClientContext> ClientContext::FromCallbackServerContext( const grpc::CallbackServerContext& server_context, - PropagationOptions options) { - return FromInternalServerContext(server_context, options); -} - + PropagationOptions options) { + return FromInternalServerContext(server_context, options); +} + void ClientContext::AddMetadata(const TString& meta_key, const TString& meta_value) { send_initial_metadata_.insert(std::make_pair(meta_key, meta_value)); diff --git a/contrib/libs/grpc/src/cpp/client/client_interceptor.cc b/contrib/libs/grpc/src/cpp/client/client_interceptor.cc index a91950cae2..9b2f03b49a 100644 --- a/contrib/libs/grpc/src/cpp/client/client_interceptor.cc +++ b/contrib/libs/grpc/src/cpp/client/client_interceptor.cc @@ -28,17 +28,17 @@ experimental::ClientInterceptorFactoryInterface* namespace experimental { void RegisterGlobalClientInterceptorFactory( ClientInterceptorFactoryInterface* factory) { - if (internal::g_global_client_interceptor_factory != nullptr) { - GPR_ASSERT(false && - "It is illegal to call RegisterGlobalClientInterceptorFactory " - "multiple times."); - } + if (internal::g_global_client_interceptor_factory != nullptr) { + GPR_ASSERT(false && + "It is illegal to call RegisterGlobalClientInterceptorFactory " + "multiple times."); + } internal::g_global_client_interceptor_factory = factory; } - -// For testing purposes only. -void TestOnlyResetGlobalClientInterceptorFactory() { - internal::g_global_client_interceptor_factory = nullptr; -} + +// For testing purposes only. +void TestOnlyResetGlobalClientInterceptorFactory() { + internal::g_global_client_interceptor_factory = nullptr; +} } // namespace experimental } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel.cc b/contrib/libs/grpc/src/cpp/client/create_channel.cc index 97327490ed..0a3863e83f 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel.cc +++ b/contrib/libs/grpc/src/cpp/client/create_channel.cc @@ -21,7 +21,7 @@ #include <grpcpp/channel.h> #include <grpcpp/create_channel.h> #include <grpcpp/impl/grpc_library.h> -#include <grpcpp/security/credentials.h> +#include <grpcpp/security/credentials.h> #include <grpcpp/support/channel_arguments.h> #include "src/cpp/client/create_channel_internal.h" @@ -29,25 +29,25 @@ namespace grpc { std::shared_ptr<grpc::Channel> CreateChannel( const grpc::string& target, - const std::shared_ptr<grpc::ChannelCredentials>& creds) { + const std::shared_ptr<grpc::ChannelCredentials>& creds) { return CreateCustomChannel(target, creds, grpc::ChannelArguments()); } std::shared_ptr<grpc::Channel> CreateCustomChannel( const grpc::string& target, - const std::shared_ptr<grpc::ChannelCredentials>& creds, - const grpc::ChannelArguments& args) { - grpc::GrpcLibraryCodegen - init_lib; // We need to call init in case of bad creds. - return creds ? creds->CreateChannelImpl(target, args) - : grpc::CreateChannelInternal( + const std::shared_ptr<grpc::ChannelCredentials>& creds, + const grpc::ChannelArguments& args) { + grpc::GrpcLibraryCodegen + init_lib; // We need to call init in case of bad creds. + return creds ? creds->CreateChannelImpl(target, args) + : grpc::CreateChannelInternal( "", grpc_lame_client_channel_create( nullptr, GRPC_STATUS_INVALID_ARGUMENT, "Invalid credentials."), std::vector<std::unique_ptr< - grpc::experimental:: - ClientInterceptorFactoryInterface>>()); + grpc::experimental:: + ClientInterceptorFactoryInterface>>()); } namespace experimental { @@ -62,23 +62,23 @@ namespace experimental { /// hold an object or is invalid, a lame channel (one on which all operations /// fail) is returned. /// \param args Options for channel creation. -std::shared_ptr<grpc::Channel> CreateCustomChannelWithInterceptors( +std::shared_ptr<grpc::Channel> CreateCustomChannelWithInterceptors( const TString& target, - const std::shared_ptr<grpc::ChannelCredentials>& creds, - const grpc::ChannelArguments& args, + const std::shared_ptr<grpc::ChannelCredentials>& creds, + const grpc::ChannelArguments& args, std::vector< - std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { - grpc::GrpcLibraryCodegen - init_lib; // We need to call init in case of bad creds. + grpc::GrpcLibraryCodegen + init_lib; // We need to call init in case of bad creds. return creds ? creds->CreateChannelWithInterceptors( target, args, std::move(interceptor_creators)) - : grpc::CreateChannelInternal( + : grpc::CreateChannelInternal( "", grpc_lame_client_channel_create( nullptr, GRPC_STATUS_INVALID_ARGUMENT, "Invalid credentials."), - std::move(interceptor_creators)); + std::move(interceptor_creators)); } } // namespace experimental diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc b/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc index da2a878a22..a1ce29c8c0 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc +++ b/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc @@ -26,11 +26,11 @@ namespace grpc { std::shared_ptr<Channel> CreateChannelInternal( const TString& host, grpc_channel* c_channel, - std::vector<std::unique_ptr< - ::grpc::experimental::ClientInterceptorFactoryInterface>> + std::vector<std::unique_ptr< + ::grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { return std::shared_ptr<Channel>( new Channel(host, c_channel, std::move(interceptor_creators))); } - + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_internal.h b/contrib/libs/grpc/src/cpp/client/create_channel_internal.h index 09d4e56b02..8932d12a4f 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel_internal.h +++ b/contrib/libs/grpc/src/cpp/client/create_channel_internal.h @@ -21,7 +21,7 @@ #include <memory> -#include <grpcpp/channel.h> +#include <grpcpp/channel.h> #include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/support/config.h> @@ -31,8 +31,8 @@ namespace grpc { std::shared_ptr<Channel> CreateChannelInternal( const TString& host, grpc_channel* c_channel, - std::vector<std::unique_ptr< - ::grpc::experimental::ClientInterceptorFactoryInterface>> + std::vector<std::unique_ptr< + ::grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators); } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc b/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc index db09eda8a6..57b0cc7da6 100644 --- a/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc +++ b/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc @@ -20,19 +20,19 @@ #include <grpc/grpc_posix.h> #include <grpcpp/channel.h> #include <grpcpp/impl/grpc_library.h> -#include <grpcpp/support/channel_arguments.h> +#include <grpcpp/support/channel_arguments.h> #include "src/cpp/client/create_channel_internal.h" namespace grpc { -class ChannelArguments; - +class ChannelArguments; + #ifdef GPR_SUPPORT_CHANNELS_FROM_FD std::shared_ptr<Channel> CreateInsecureChannelFromFd(const TString& target, int fd) { - grpc::internal::GrpcLibrary init_lib; + grpc::internal::GrpcLibrary init_lib; init_lib.init(); return CreateChannelInternal( "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, nullptr), @@ -58,9 +58,9 @@ namespace experimental { std::shared_ptr<Channel> CreateCustomInsecureChannelWithInterceptorsFromFd( const TString& target, int fd, const ChannelArguments& args, std::vector< - std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { - grpc::internal::GrpcLibrary init_lib; + grpc::internal::GrpcLibrary init_lib; init_lib.init(); grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); diff --git a/contrib/libs/grpc/src/cpp/client/credentials_cc.cc b/contrib/libs/grpc/src/cpp/client/credentials_cc.cc index 9dfb2f491c..99cf583f72 100644 --- a/contrib/libs/grpc/src/cpp/client/credentials_cc.cc +++ b/contrib/libs/grpc/src/cpp/client/credentials_cc.cc @@ -21,7 +21,7 @@ namespace grpc { -static grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; ChannelCredentials::ChannelCredentials() { g_gli_initializer.summon(); } ChannelCredentials::~ChannelCredentials() {} diff --git a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc index e5bafff70a..8820efb1ee 100644 --- a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc +++ b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc @@ -29,22 +29,22 @@ namespace grpc { namespace { class InsecureChannelCredentialsImpl final : public ChannelCredentials { public: - std::shared_ptr<Channel> CreateChannelImpl( + std::shared_ptr<Channel> CreateChannelImpl( const TString& target, const ChannelArguments& args) override { return CreateChannelWithInterceptors( target, args, std::vector<std::unique_ptr< - grpc::experimental::ClientInterceptorFactoryInterface>>()); + grpc::experimental::ClientInterceptorFactoryInterface>>()); } - std::shared_ptr<Channel> CreateChannelWithInterceptors( + std::shared_ptr<Channel> CreateChannelWithInterceptors( const TString& target, const ChannelArguments& args, - std::vector<std::unique_ptr< - grpc::experimental::ClientInterceptorFactoryInterface>> + std::vector<std::unique_ptr< + grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) override { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); - return ::grpc::CreateChannelInternal( + return ::grpc::CreateChannelInternal( "", grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr), std::move(interceptor_creators)); diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc index 0f6db3caa5..c31d8fef8e 100644 --- a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc +++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc @@ -17,53 +17,53 @@ */ #include "src/cpp/client/secure_credentials.h" - -#include <grpc/impl/codegen/slice.h> -#include <grpc/slice.h> + +#include <grpc/impl/codegen/slice.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpcpp/channel.h> -#include <grpcpp/impl/codegen/status.h> +#include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/support/channel_arguments.h> - -#include "src/core/lib/gpr/env.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/executor.h" -#include "src/core/lib/iomgr/load_file.h" -#include "src/core/lib/json/json.h" -#include "src/core/lib/security/transport/auth_filters.h" -#include "src/core/lib/security/util/json_util.h" + +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/load_file.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/security/transport/auth_filters.h" +#include "src/core/lib/security/util/json_util.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/common/secure_auth_context.h" namespace grpc { -static grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; SecureChannelCredentials::SecureChannelCredentials( grpc_channel_credentials* c_creds) : c_creds_(c_creds) { g_gli_initializer.summon(); } -std::shared_ptr<Channel> SecureChannelCredentials::CreateChannelImpl( +std::shared_ptr<Channel> SecureChannelCredentials::CreateChannelImpl( const TString& target, const ChannelArguments& args) { return CreateChannelWithInterceptors( target, args, - std::vector<std::unique_ptr< - grpc::experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr< + grpc::experimental::ClientInterceptorFactoryInterface>>()); } -std::shared_ptr<Channel> +std::shared_ptr<Channel> SecureChannelCredentials::CreateChannelWithInterceptors( const TString& target, const ChannelArguments& args, std::vector< - std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); - return ::grpc::CreateChannelInternal( + return ::grpc::CreateChannelInternal( args.GetSslTargetNameOverride(), grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args, nullptr), @@ -96,7 +96,7 @@ std::shared_ptr<CallCredentials> WrapCallCredentials( } // namespace std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). return WrapChannelCredentials( grpc_google_default_credentials_create(nullptr)); } @@ -104,7 +104,7 @@ std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() { // Builds SSL Credentials given SSL specific options std::shared_ptr<ChannelCredentials> SslCredentials( const SslCredentialsOptions& options) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). grpc_ssl_pem_key_cert_pair pem_key_cert_pair = { options.pem_private_key.c_str(), options.pem_cert_chain.c_str()}; @@ -117,139 +117,139 @@ std::shared_ptr<ChannelCredentials> SslCredentials( namespace experimental { -namespace { - -void ClearStsCredentialsOptions(StsCredentialsOptions* options) { - if (options == nullptr) return; - options->token_exchange_service_uri.clear(); - options->resource.clear(); - options->audience.clear(); - options->scope.clear(); - options->requested_token_type.clear(); - options->subject_token_path.clear(); - options->subject_token_type.clear(); - options->actor_token_path.clear(); - options->actor_token_type.clear(); -} - -} // namespace - -// Builds STS credentials options from JSON. +namespace { + +void ClearStsCredentialsOptions(StsCredentialsOptions* options) { + if (options == nullptr) return; + options->token_exchange_service_uri.clear(); + options->resource.clear(); + options->audience.clear(); + options->scope.clear(); + options->requested_token_type.clear(); + options->subject_token_path.clear(); + options->subject_token_type.clear(); + options->actor_token_path.clear(); + options->actor_token_type.clear(); +} + +} // namespace + +// Builds STS credentials options from JSON. grpc::Status StsCredentialsOptionsFromJson(const TString& json_string, - StsCredentialsOptions* options) { - if (options == nullptr) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "options cannot be nullptr."); - } - ClearStsCredentialsOptions(options); + StsCredentialsOptions* options) { + if (options == nullptr) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "options cannot be nullptr."); + } + ClearStsCredentialsOptions(options); grpc_error* error = GRPC_ERROR_NONE; grpc_core::Json json = grpc_core::Json::Parse(json_string.c_str(), &error); if (error != GRPC_ERROR_NONE || json.type() != grpc_core::Json::Type::OBJECT) { GRPC_ERROR_UNREF(error); - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid json."); - } - - // Required fields. - const char* value = grpc_json_get_string_property( + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid json."); + } + + // Required fields. + const char* value = grpc_json_get_string_property( json, "token_exchange_service_uri", nullptr); - if (value == nullptr) { - ClearStsCredentialsOptions(options); - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "token_exchange_service_uri must be specified."); - } - options->token_exchange_service_uri.assign(value); + if (value == nullptr) { + ClearStsCredentialsOptions(options); + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "token_exchange_service_uri must be specified."); + } + options->token_exchange_service_uri.assign(value); value = grpc_json_get_string_property(json, "subject_token_path", nullptr); - if (value == nullptr) { - ClearStsCredentialsOptions(options); - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "subject_token_path must be specified."); - } - options->subject_token_path.assign(value); + if (value == nullptr) { + ClearStsCredentialsOptions(options); + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "subject_token_path must be specified."); + } + options->subject_token_path.assign(value); value = grpc_json_get_string_property(json, "subject_token_type", nullptr); - if (value == nullptr) { - ClearStsCredentialsOptions(options); - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "subject_token_type must be specified."); - } - options->subject_token_type.assign(value); - - // Optional fields. + if (value == nullptr) { + ClearStsCredentialsOptions(options); + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "subject_token_type must be specified."); + } + options->subject_token_type.assign(value); + + // Optional fields. value = grpc_json_get_string_property(json, "resource", nullptr); - if (value != nullptr) options->resource.assign(value); + if (value != nullptr) options->resource.assign(value); value = grpc_json_get_string_property(json, "audience", nullptr); - if (value != nullptr) options->audience.assign(value); + if (value != nullptr) options->audience.assign(value); value = grpc_json_get_string_property(json, "scope", nullptr); - if (value != nullptr) options->scope.assign(value); + if (value != nullptr) options->scope.assign(value); value = grpc_json_get_string_property(json, "requested_token_type", nullptr); - if (value != nullptr) options->requested_token_type.assign(value); + if (value != nullptr) options->requested_token_type.assign(value); value = grpc_json_get_string_property(json, "actor_token_path", nullptr); - if (value != nullptr) options->actor_token_path.assign(value); + if (value != nullptr) options->actor_token_path.assign(value); value = grpc_json_get_string_property(json, "actor_token_type", nullptr); - if (value != nullptr) options->actor_token_type.assign(value); - - return grpc::Status(); -} - -// Builds STS credentials Options from the $STS_CREDENTIALS env var. -grpc::Status StsCredentialsOptionsFromEnv(StsCredentialsOptions* options) { - if (options == nullptr) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - "options cannot be nullptr."); - } - ClearStsCredentialsOptions(options); - grpc_slice json_string = grpc_empty_slice(); - char* sts_creds_path = gpr_getenv("STS_CREDENTIALS"); - grpc_error* error = GRPC_ERROR_NONE; - grpc::Status status; - auto cleanup = [&json_string, &sts_creds_path, &error, &status]() { - grpc_slice_unref_internal(json_string); - gpr_free(sts_creds_path); - GRPC_ERROR_UNREF(error); - return status; - }; - - if (sts_creds_path == nullptr) { - status = grpc::Status(grpc::StatusCode::NOT_FOUND, - "STS_CREDENTIALS environment variable not set."); - return cleanup(); - } - error = grpc_load_file(sts_creds_path, 1, &json_string); - if (error != GRPC_ERROR_NONE) { - status = - grpc::Status(grpc::StatusCode::NOT_FOUND, grpc_error_string(error)); - return cleanup(); - } - status = StsCredentialsOptionsFromJson( - reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(json_string)), - options); - return cleanup(); -} - -// C++ to Core STS Credentials options. -grpc_sts_credentials_options StsCredentialsCppToCoreOptions( - const StsCredentialsOptions& options) { - grpc_sts_credentials_options opts; - memset(&opts, 0, sizeof(opts)); - opts.token_exchange_service_uri = options.token_exchange_service_uri.c_str(); - opts.resource = options.resource.c_str(); - opts.audience = options.audience.c_str(); - opts.scope = options.scope.c_str(); - opts.requested_token_type = options.requested_token_type.c_str(); - opts.subject_token_path = options.subject_token_path.c_str(); - opts.subject_token_type = options.subject_token_type.c_str(); - opts.actor_token_path = options.actor_token_path.c_str(); - opts.actor_token_type = options.actor_token_type.c_str(); - return opts; -} - -// Builds STS credentials. -std::shared_ptr<CallCredentials> StsCredentials( - const StsCredentialsOptions& options) { - auto opts = StsCredentialsCppToCoreOptions(options); - return WrapCallCredentials(grpc_sts_credentials_create(&opts, nullptr)); -} - + if (value != nullptr) options->actor_token_type.assign(value); + + return grpc::Status(); +} + +// Builds STS credentials Options from the $STS_CREDENTIALS env var. +grpc::Status StsCredentialsOptionsFromEnv(StsCredentialsOptions* options) { + if (options == nullptr) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "options cannot be nullptr."); + } + ClearStsCredentialsOptions(options); + grpc_slice json_string = grpc_empty_slice(); + char* sts_creds_path = gpr_getenv("STS_CREDENTIALS"); + grpc_error* error = GRPC_ERROR_NONE; + grpc::Status status; + auto cleanup = [&json_string, &sts_creds_path, &error, &status]() { + grpc_slice_unref_internal(json_string); + gpr_free(sts_creds_path); + GRPC_ERROR_UNREF(error); + return status; + }; + + if (sts_creds_path == nullptr) { + status = grpc::Status(grpc::StatusCode::NOT_FOUND, + "STS_CREDENTIALS environment variable not set."); + return cleanup(); + } + error = grpc_load_file(sts_creds_path, 1, &json_string); + if (error != GRPC_ERROR_NONE) { + status = + grpc::Status(grpc::StatusCode::NOT_FOUND, grpc_error_string(error)); + return cleanup(); + } + status = StsCredentialsOptionsFromJson( + reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(json_string)), + options); + return cleanup(); +} + +// C++ to Core STS Credentials options. +grpc_sts_credentials_options StsCredentialsCppToCoreOptions( + const StsCredentialsOptions& options) { + grpc_sts_credentials_options opts; + memset(&opts, 0, sizeof(opts)); + opts.token_exchange_service_uri = options.token_exchange_service_uri.c_str(); + opts.resource = options.resource.c_str(); + opts.audience = options.audience.c_str(); + opts.scope = options.scope.c_str(); + opts.requested_token_type = options.requested_token_type.c_str(); + opts.subject_token_path = options.subject_token_path.c_str(); + opts.subject_token_type = options.subject_token_type.c_str(); + opts.actor_token_path = options.actor_token_path.c_str(); + opts.actor_token_type = options.actor_token_type.c_str(); + return opts; +} + +// Builds STS credentials. +std::shared_ptr<CallCredentials> StsCredentials( + const StsCredentialsOptions& options) { + auto opts = StsCredentialsCppToCoreOptions(options); + return WrapCallCredentials(grpc_sts_credentials_create(&opts, nullptr)); +} + std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin( std::unique_ptr<MetadataCredentialsPlugin> plugin, grpc_security_level min_security_level) { @@ -268,12 +268,12 @@ std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin( // Builds ALTS Credentials given ALTS specific options std::shared_ptr<ChannelCredentials> AltsCredentials( const AltsCredentialsOptions& options) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). grpc_alts_credentials_options* c_options = grpc_alts_credentials_client_options_create(); - for (const auto& service_account : options.target_service_accounts) { + for (const auto& service_account : options.target_service_accounts) { grpc_alts_credentials_client_options_add_target_service_account( - c_options, service_account.c_str()); + c_options, service_account.c_str()); } grpc_channel_credentials* c_creds = grpc_alts_credentials_create(c_options); grpc_alts_credentials_options_destroy(c_options); @@ -283,22 +283,22 @@ std::shared_ptr<ChannelCredentials> AltsCredentials( // Builds Local Credentials std::shared_ptr<ChannelCredentials> LocalCredentials( grpc_local_connect_type type) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). return WrapChannelCredentials(grpc_local_credentials_create(type)); } -// Builds TLS Credentials given TLS options. -std::shared_ptr<ChannelCredentials> TlsCredentials( - const TlsCredentialsOptions& options) { - return WrapChannelCredentials( +// Builds TLS Credentials given TLS options. +std::shared_ptr<ChannelCredentials> TlsCredentials( + const TlsCredentialsOptions& options) { + return WrapChannelCredentials( grpc_tls_credentials_create(options.c_credentials_options())); -} - +} + } // namespace experimental // Builds credentials for use when running in GCE std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials( grpc_google_compute_engine_credentials_create(nullptr)); } @@ -306,7 +306,7 @@ std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() { // Builds JWT credentials. std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials( const TString& json_key, long token_lifetime_seconds) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). if (token_lifetime_seconds <= 0) { gpr_log(GPR_ERROR, "Trying to create JWTCredentials with non-positive lifetime"); @@ -321,7 +321,7 @@ std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials( // Builds refresh token credentials. std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials( const TString& json_refresh_token) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials(grpc_google_refresh_token_credentials_create( json_refresh_token.c_str(), nullptr)); } @@ -329,7 +329,7 @@ std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials( // Builds access token credentials. std::shared_ptr<CallCredentials> AccessTokenCredentials( const TString& access_token) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials( grpc_access_token_credentials_create(access_token.c_str(), nullptr)); } @@ -338,7 +338,7 @@ std::shared_ptr<CallCredentials> AccessTokenCredentials( std::shared_ptr<CallCredentials> GoogleIAMCredentials( const TString& authorization_token, const TString& authority_selector) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). + grpc::GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials(grpc_google_iam_credentials_create( authorization_token.c_str(), authority_selector.c_str(), nullptr)); } @@ -374,27 +374,27 @@ std::shared_ptr<CallCredentials> CompositeCallCredentials( return nullptr; } -std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin> plugin) { - grpc::GrpcLibraryCodegen init; // To call grpc_init(). - const char* type = plugin->GetType(); - grpc::MetadataCredentialsPluginWrapper* wrapper = - new grpc::MetadataCredentialsPluginWrapper(std::move(plugin)); - grpc_metadata_credentials_plugin c_plugin = { - grpc::MetadataCredentialsPluginWrapper::GetMetadata, +std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin> plugin) { + grpc::GrpcLibraryCodegen init; // To call grpc_init(). + const char* type = plugin->GetType(); + grpc::MetadataCredentialsPluginWrapper* wrapper = + new grpc::MetadataCredentialsPluginWrapper(std::move(plugin)); + grpc_metadata_credentials_plugin c_plugin = { + grpc::MetadataCredentialsPluginWrapper::GetMetadata, grpc::MetadataCredentialsPluginWrapper::DebugString, - grpc::MetadataCredentialsPluginWrapper::Destroy, wrapper, type}; + grpc::MetadataCredentialsPluginWrapper::Destroy, wrapper, type}; return WrapCallCredentials(grpc_metadata_credentials_create_from_plugin( c_plugin, GRPC_PRIVACY_AND_INTEGRITY, nullptr)); -} - -namespace { -void DeleteWrapper(void* wrapper, grpc_error* /*ignored*/) { +} + +namespace { +void DeleteWrapper(void* wrapper, grpc_error* /*ignored*/) { MetadataCredentialsPluginWrapper* w = static_cast<MetadataCredentialsPluginWrapper*>(wrapper); delete w; } -} // namespace +} // namespace char* MetadataCredentialsPluginWrapper::DebugString(void* wrapper) { GPR_ASSERT(wrapper); @@ -403,14 +403,14 @@ char* MetadataCredentialsPluginWrapper::DebugString(void* wrapper) { return gpr_strdup(w->plugin_->DebugString().c_str()); } -void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) { - if (wrapper == nullptr) return; - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(DeleteWrapper, wrapper, nullptr), - GRPC_ERROR_NONE); -} - +void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) { + if (wrapper == nullptr) return; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(DeleteWrapper, wrapper, nullptr), + GRPC_ERROR_NONE); +} + int MetadataCredentialsPluginWrapper::GetMetadata( void* wrapper, grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, void* user_data, @@ -424,18 +424,18 @@ int MetadataCredentialsPluginWrapper::GetMetadata( *num_creds_md = 0; *status = GRPC_STATUS_OK; *error_details = nullptr; - return 1; + return 1; } if (w->plugin_->IsBlocking()) { - // The internals of context may be destroyed if GetMetadata is cancelled. - // Make a copy for InvokePlugin. - grpc_auth_metadata_context context_copy = grpc_auth_metadata_context(); - grpc_auth_metadata_context_copy(&context, &context_copy); + // The internals of context may be destroyed if GetMetadata is cancelled. + // Make a copy for InvokePlugin. + grpc_auth_metadata_context context_copy = grpc_auth_metadata_context(); + grpc_auth_metadata_context_copy(&context, &context_copy); // Asynchronous return. - w->thread_pool_->Add([w, context_copy, cb, user_data]() mutable { + w->thread_pool_->Add([w, context_copy, cb, user_data]() mutable { w->MetadataCredentialsPluginWrapper::InvokePlugin( - context_copy, cb, user_data, nullptr, nullptr, nullptr, nullptr); - grpc_auth_metadata_context_reset(&context_copy); + context_copy, cb, user_data, nullptr, nullptr, nullptr, nullptr); + grpc_auth_metadata_context_reset(&context_copy); }); return 0; } else { @@ -449,9 +449,9 @@ int MetadataCredentialsPluginWrapper::GetMetadata( namespace { void UnrefMetadata(const std::vector<grpc_metadata>& md) { - for (const auto& metadatum : md) { - grpc_slice_unref(metadatum.key); - grpc_slice_unref(metadatum.value); + for (const auto& metadatum : md) { + grpc_slice_unref(metadatum.key); + grpc_slice_unref(metadatum.value); } } @@ -471,10 +471,10 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( Status status = plugin_->GetMetadata(context.service_url, context.method_name, cpp_channel_auth_context, &metadata); std::vector<grpc_metadata> md; - for (auto& metadatum : metadata) { + for (auto& metadatum : metadata) { grpc_metadata md_entry; - md_entry.key = SliceFromCopiedString(metadatum.first); - md_entry.value = SliceFromCopiedString(metadatum.second); + md_entry.key = SliceFromCopiedString(metadatum.first); + md_entry.value = SliceFromCopiedString(metadatum.second); md_entry.flags = 0; md.push_back(md_entry); } diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.h b/contrib/libs/grpc/src/cpp/client/secure_credentials.h index 4fc79346bf..16f6b9c805 100644 --- a/contrib/libs/grpc/src/cpp/client/secure_credentials.h +++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.h @@ -22,7 +22,7 @@ #include <grpc/grpc_security.h> #include <grpcpp/security/credentials.h> -#include <grpcpp/security/tls_credentials_options.h> +#include <grpcpp/security/tls_credentials_options.h> #include <grpcpp/support/config.h> #include "y_absl/strings/str_cat.h" @@ -31,8 +31,8 @@ namespace grpc { -class Channel; - +class Channel; + class SecureChannelCredentials final : public ChannelCredentials { public: explicit SecureChannelCredentials(grpc_channel_credentials* c_creds); @@ -41,16 +41,16 @@ class SecureChannelCredentials final : public ChannelCredentials { } grpc_channel_credentials* GetRawCreds() { return c_creds_; } - std::shared_ptr<Channel> CreateChannelImpl( + std::shared_ptr<Channel> CreateChannelImpl( const TString& target, const ChannelArguments& args) override; SecureChannelCredentials* AsSecureCredentials() override { return this; } private: - std::shared_ptr<Channel> CreateChannelWithInterceptors( + std::shared_ptr<Channel> CreateChannelWithInterceptors( const TString& target, const ChannelArguments& args, - std::vector<std::unique_ptr< - ::grpc::experimental::ClientInterceptorFactoryInterface>> + std::vector<std::unique_ptr< + ::grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) override; grpc_channel_credentials* const c_creds_; }; @@ -74,16 +74,16 @@ class SecureCallCredentials final : public CallCredentials { grpc_call_credentials* const c_creds_; }; -namespace experimental { - -// Transforms C++ STS Credentials options to core options. The pointers of the -// resulting core options point to the memory held by the C++ options so C++ -// options need to be kept alive until after the core credentials creation. -grpc_sts_credentials_options StsCredentialsCppToCoreOptions( - const StsCredentialsOptions& options); - -} // namespace experimental - +namespace experimental { + +// Transforms C++ STS Credentials options to core options. The pointers of the +// resulting core options point to the memory held by the C++ options so C++ +// options need to be kept alive until after the core credentials creation. +grpc_sts_credentials_options StsCredentialsCppToCoreOptions( + const StsCredentialsOptions& options); + +} // namespace experimental + class MetadataCredentialsPluginWrapper final : private GrpcLibraryCodegen { public: static void Destroy(void* wrapper); diff --git a/contrib/libs/grpc/src/cpp/codegen/codegen_init.cc b/contrib/libs/grpc/src/cpp/codegen/codegen_init.cc index e1e47cbb17..a79c211ba0 100644 --- a/contrib/libs/grpc/src/cpp/codegen/codegen_init.cc +++ b/contrib/libs/grpc/src/cpp/codegen/codegen_init.cc @@ -20,7 +20,7 @@ #include <grpcpp/impl/codegen/grpc_library.h> /// Null-initializes the global gRPC variables for the codegen library. These -/// stay null in the absence of grpc++ library. In this case, no gRPC +/// stay null in the absence of grpc++ library. In this case, no gRPC /// features such as the ability to perform calls will be available. Trying to /// perform them would result in a segmentation fault when trying to deference /// the following nulled globals. These should be associated with actual diff --git a/contrib/libs/grpc/src/cpp/common/alarm.cc b/contrib/libs/grpc/src/cpp/common/alarm.cc index a2612874b2..bfbf26ed4d 100644 --- a/contrib/libs/grpc/src/cpp/common/alarm.cc +++ b/contrib/libs/grpc/src/cpp/common/alarm.cc @@ -42,7 +42,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { grpc_timer_init_unset(&timer_); } ~AlarmImpl() {} - bool FinalizeResult(void** tag, bool* /*status*/) override { + bool FinalizeResult(void** tag, bool* /*status*/) override { *tag = tag_; Unref(); return true; @@ -54,23 +54,23 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); - GRPC_CLOSURE_INIT( - &on_alarm_, - [](void* arg, grpc_error* error) { - // queue the op on the completion queue - AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); - alarm->Ref(); - // Preserve the cq and reset the cq_ so that the alarm - // can be reset when the alarm tag is delivered. - grpc_completion_queue* cq = alarm->cq_; - alarm->cq_ = nullptr; - grpc_cq_end_op( - cq, alarm, error, - [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg, - &alarm->completion_); - GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); - }, - this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error* error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); + alarm->Ref(); + // Preserve the cq and reset the cq_ so that the alarm + // can be reset when the alarm tag is delivered. + grpc_completion_queue* cq = alarm->cq_; + alarm->cq_ = nullptr; + grpc_cq_end_op( + cq, alarm, error, + [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg, + &alarm->completion_); + GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } diff --git a/contrib/libs/grpc/src/cpp/common/channel_arguments.cc b/contrib/libs/grpc/src/cpp/common/channel_arguments.cc index 5a5dd91b5e..63232a5129 100644 --- a/contrib/libs/grpc/src/cpp/common/channel_arguments.cc +++ b/contrib/libs/grpc/src/cpp/common/channel_arguments.cc @@ -31,7 +31,7 @@ namespace grpc { ChannelArguments::ChannelArguments() { // This will be ignored if used on the server side. - SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING, "grpc-c++/" + grpc::Version()); + SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING, "grpc-c++/" + grpc::Version()); } ChannelArguments::ChannelArguments(const ChannelArguments& other) @@ -39,26 +39,26 @@ ChannelArguments::ChannelArguments(const ChannelArguments& other) args_.reserve(other.args_.size()); auto list_it_dst = strings_.begin(); auto list_it_src = other.strings_.begin(); - for (const auto& a : other.args_) { + for (const auto& a : other.args_) { grpc_arg ap; - ap.type = a.type; - GPR_ASSERT(list_it_src->c_str() == a.key); + ap.type = a.type; + GPR_ASSERT(list_it_src->c_str() == a.key); ap.key = const_cast<char*>(list_it_dst->c_str()); ++list_it_src; ++list_it_dst; - switch (a.type) { + switch (a.type) { case GRPC_ARG_INTEGER: - ap.value.integer = a.value.integer; + ap.value.integer = a.value.integer; break; case GRPC_ARG_STRING: - GPR_ASSERT(list_it_src->c_str() == a.value.string); + GPR_ASSERT(list_it_src->c_str() == a.value.string); ap.value.string = const_cast<char*>(list_it_dst->c_str()); ++list_it_src; ++list_it_dst; break; case GRPC_ARG_POINTER: - ap.value.pointer = a.value.pointer; - ap.value.pointer.p = a.value.pointer.vtable->copy(ap.value.pointer.p); + ap.value.pointer = a.value.pointer; + ap.value.pointer.p = a.value.pointer.vtable->copy(ap.value.pointer.p); break; } args_.push_back(ap); @@ -67,9 +67,9 @@ ChannelArguments::ChannelArguments(const ChannelArguments& other) ChannelArguments::~ChannelArguments() { grpc_core::ExecCtx exec_ctx; - for (auto& arg : args_) { - if (arg.type == GRPC_ARG_POINTER) { - arg.value.pointer.vtable->destroy(arg.value.pointer.p); + for (auto& arg : args_) { + if (arg.type == GRPC_ARG_POINTER) { + arg.value.pointer.vtable->destroy(arg.value.pointer.p); } } } @@ -95,12 +95,12 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { grpc_arg mutator_arg = grpc_socket_mutator_to_arg(mutator); bool replaced = false; grpc_core::ExecCtx exec_ctx; - for (auto& arg : args_) { - if (arg.type == mutator_arg.type && + for (auto& arg : args_) { + if (arg.type == mutator_arg.type && TString(arg.key) == TString(mutator_arg.key)) { GPR_ASSERT(!replaced); - arg.value.pointer.vtable->destroy(arg.value.pointer.p); - arg.value.pointer = mutator_arg.value.pointer; + arg.value.pointer.vtable->destroy(arg.value.pointer.p); + arg.value.pointer = mutator_arg.value.pointer; replaced = true; } } @@ -123,13 +123,13 @@ void ChannelArguments::SetUserAgentPrefix( } bool replaced = false; auto strings_it = strings_.begin(); - for (auto& arg : args_) { + for (auto& arg : args_) { ++strings_it; if (arg.type == GRPC_ARG_STRING) { if (TString(arg.key) == GRPC_ARG_PRIMARY_USER_AGENT_STRING) { GPR_ASSERT(arg.value.string == strings_it->c_str()); *(strings_it) = user_agent_prefix + " " + arg.value.string; - arg.value.string = const_cast<char*>(strings_it->c_str()); + arg.value.string = const_cast<char*>(strings_it->c_str()); replaced = true; break; } diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.h b/contrib/libs/grpc/src/cpp/common/channel_filter.h index 5ce720b307..f5d5dfbb6a 100644 --- a/contrib/libs/grpc/src/cpp/common/channel_filter.h +++ b/contrib/libs/grpc/src/cpp/common/channel_filter.h @@ -236,13 +236,13 @@ class ChannelData { // TODO(roth): Come up with a more C++-like API for the channel element. /// Initializes the channel data. - virtual grpc_error* Init(grpc_channel_element* /*elem*/, - grpc_channel_element_args* /*args*/) { + virtual grpc_error* Init(grpc_channel_element* /*elem*/, + grpc_channel_element_args* /*args*/) { return GRPC_ERROR_NONE; } // Called before destruction. - virtual void Destroy(grpc_channel_element* /*elem*/) {} + virtual void Destroy(grpc_channel_element* /*elem*/) {} virtual void StartTransportOp(grpc_channel_element* elem, TransportOp* op); @@ -259,15 +259,15 @@ class CallData { // TODO(roth): Come up with a more C++-like API for the call element. /// Initializes the call data. - virtual grpc_error* Init(grpc_call_element* /*elem*/, - const grpc_call_element_args* /*args*/) { + virtual grpc_error* Init(grpc_call_element* /*elem*/, + const grpc_call_element_args* /*args*/) { return GRPC_ERROR_NONE; } // Called before destruction. - virtual void Destroy(grpc_call_element* /*elem*/, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*then_call_closure*/) {} + virtual void Destroy(grpc_call_element* /*elem*/, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*then_call_closure*/) {} /// Starts a new stream operation. virtual void StartTransportStreamOpBatch(grpc_call_element* elem, diff --git a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc index 96a7105eaf..6cab616d53 100644 --- a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc +++ b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc @@ -58,8 +58,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto core_cq_tag = - static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag); + auto core_cq_tag = + static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; *tag = core_cq_tag; if (core_cq_tag->FinalizeResult(tag, ok)) { @@ -86,8 +86,8 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto core_cq_tag = - static_cast<::grpc::internal::CompletionQueueTag*>(res_tag); + auto core_cq_tag = + static_cast<::grpc::internal::CompletionQueueTag*>(res_tag); *ok = res == 1; if (core_cq_tag->FinalizeResult(tag, ok)) { return true; diff --git a/contrib/libs/grpc/src/cpp/common/core_codegen.cc b/contrib/libs/grpc/src/cpp/common/core_codegen.cc index 75383ed511..0966358d6f 100644 --- a/contrib/libs/grpc/src/cpp/common/core_codegen.cc +++ b/contrib/libs/grpc/src/cpp/common/core_codegen.cc @@ -81,7 +81,7 @@ void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); } void CoreCodegen::grpc_init() { ::grpc_init(); } void CoreCodegen::grpc_shutdown() { ::grpc_shutdown(); } -void CoreCodegen::gpr_mu_init(gpr_mu* mu) { ::gpr_mu_init(mu); } +void CoreCodegen::gpr_mu_init(gpr_mu* mu) { ::gpr_mu_init(mu); } void CoreCodegen::gpr_mu_destroy(gpr_mu* mu) { ::gpr_mu_destroy(mu); } void CoreCodegen::gpr_mu_lock(gpr_mu* mu) { ::gpr_mu_lock(mu); } void CoreCodegen::gpr_mu_unlock(gpr_mu* mu) { ::gpr_mu_unlock(mu); } @@ -123,9 +123,9 @@ void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); } void* CoreCodegen::grpc_call_arena_alloc(grpc_call* call, size_t length) { return ::grpc_call_arena_alloc(call, length); } -const char* CoreCodegen::grpc_call_error_to_string(grpc_call_error error) { - return ::grpc_call_error_to_string(error); -} +const char* CoreCodegen::grpc_call_error_to_string(grpc_call_error error) { + return ::grpc_call_error_to_string(error); +} int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) { @@ -142,11 +142,11 @@ int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, return ::grpc_byte_buffer_reader_next(reader, slice); } -int CoreCodegen::grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, - grpc_slice** slice) { - return ::grpc_byte_buffer_reader_peek(reader, slice); -} - +int CoreCodegen::grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, + grpc_slice** slice) { + return ::grpc_byte_buffer_reader_peek(reader, slice); +} + grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) { return ::grpc_raw_byte_buffer_create(slice, nslices); diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc b/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc index 7e435ac1de..cf5eec58a4 100644 --- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc +++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc @@ -1,98 +1,98 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + #include <grpc/support/alloc.h> -#include <grpcpp/security/tls_credentials_options.h> +#include <grpcpp/security/tls_credentials_options.h> #include "y_absl/container/inlined_vector.h" -#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h" -#include "src/cpp/common/tls_credentials_options_util.h" - +#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h" +#include "src/cpp/common/tls_credentials_options_util.h" + namespace grpc { -namespace experimental { - -/** TLS key materials config API implementation **/ +namespace experimental { + +/** TLS key materials config API implementation **/ void TlsKeyMaterialsConfig::set_pem_root_certs( const TString& pem_root_certs) { pem_root_certs_ = pem_root_certs; -} - -void TlsKeyMaterialsConfig::add_pem_key_cert_pair( - const PemKeyCertPair& pem_key_cert_pair) { - pem_key_cert_pair_list_.push_back(pem_key_cert_pair); -} - -void TlsKeyMaterialsConfig::set_key_materials( +} + +void TlsKeyMaterialsConfig::add_pem_key_cert_pair( + const PemKeyCertPair& pem_key_cert_pair) { + pem_key_cert_pair_list_.push_back(pem_key_cert_pair); +} + +void TlsKeyMaterialsConfig::set_key_materials( const TString& pem_root_certs, const std::vector<PemKeyCertPair>& pem_key_cert_pair_list) { pem_key_cert_pair_list_ = pem_key_cert_pair_list; pem_root_certs_ = pem_root_certs; -} - -/** TLS credential reload arg API implementation **/ -TlsCredentialReloadArg::TlsCredentialReloadArg( - grpc_tls_credential_reload_arg* arg) - : c_arg_(arg) { - if (c_arg_ != nullptr && c_arg_->context != nullptr) { - gpr_log(GPR_ERROR, "c_arg context has already been set"); - } - c_arg_->context = static_cast<void*>(this); - c_arg_->destroy_context = &TlsCredentialReloadArgDestroyContext; -} - -TlsCredentialReloadArg::~TlsCredentialReloadArg() {} - -void* TlsCredentialReloadArg::cb_user_data() const { - return c_arg_->cb_user_data; -} -bool TlsCredentialReloadArg::is_pem_key_cert_pair_list_empty() const { - return c_arg_->key_materials_config->pem_key_cert_pair_list().empty(); -} - -grpc_ssl_certificate_config_reload_status TlsCredentialReloadArg::status() - const { - return c_arg_->status; -} - +} + +/** TLS credential reload arg API implementation **/ +TlsCredentialReloadArg::TlsCredentialReloadArg( + grpc_tls_credential_reload_arg* arg) + : c_arg_(arg) { + if (c_arg_ != nullptr && c_arg_->context != nullptr) { + gpr_log(GPR_ERROR, "c_arg context has already been set"); + } + c_arg_->context = static_cast<void*>(this); + c_arg_->destroy_context = &TlsCredentialReloadArgDestroyContext; +} + +TlsCredentialReloadArg::~TlsCredentialReloadArg() {} + +void* TlsCredentialReloadArg::cb_user_data() const { + return c_arg_->cb_user_data; +} +bool TlsCredentialReloadArg::is_pem_key_cert_pair_list_empty() const { + return c_arg_->key_materials_config->pem_key_cert_pair_list().empty(); +} + +grpc_ssl_certificate_config_reload_status TlsCredentialReloadArg::status() + const { + return c_arg_->status; +} + TString TlsCredentialReloadArg::error_details() const { return c_arg_->error_details->error_details(); -} - -void TlsCredentialReloadArg::set_cb_user_data(void* cb_user_data) { - c_arg_->cb_user_data = cb_user_data; -} - -void TlsCredentialReloadArg::set_pem_root_certs( +} + +void TlsCredentialReloadArg::set_cb_user_data(void* cb_user_data) { + c_arg_->cb_user_data = cb_user_data; +} + +void TlsCredentialReloadArg::set_pem_root_certs( const TString& pem_root_certs) { - ::grpc_core::UniquePtr<char> c_pem_root_certs( - gpr_strdup(pem_root_certs.c_str())); - c_arg_->key_materials_config->set_pem_root_certs(std::move(c_pem_root_certs)); -} - + ::grpc_core::UniquePtr<char> c_pem_root_certs( + gpr_strdup(pem_root_certs.c_str())); + c_arg_->key_materials_config->set_pem_root_certs(std::move(c_pem_root_certs)); +} + namespace { ::grpc_core::PemKeyCertPair ConvertToCorePemKeyCertPair( const TlsKeyMaterialsConfig::PemKeyCertPair& pem_key_cert_pair) { - grpc_ssl_pem_key_cert_pair* ssl_pair = - (grpc_ssl_pem_key_cert_pair*)gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair)); - ssl_pair->private_key = gpr_strdup(pem_key_cert_pair.private_key.c_str()); - ssl_pair->cert_chain = gpr_strdup(pem_key_cert_pair.cert_chain.c_str()); + grpc_ssl_pem_key_cert_pair* ssl_pair = + (grpc_ssl_pem_key_cert_pair*)gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair)); + ssl_pair->private_key = gpr_strdup(pem_key_cert_pair.private_key.c_str()); + ssl_pair->cert_chain = gpr_strdup(pem_key_cert_pair.cert_chain.c_str()); return ::grpc_core::PemKeyCertPair(ssl_pair); } @@ -100,10 +100,10 @@ namespace { void TlsCredentialReloadArg::add_pem_key_cert_pair( const TlsKeyMaterialsConfig::PemKeyCertPair& pem_key_cert_pair) { - c_arg_->key_materials_config->add_pem_key_cert_pair( + c_arg_->key_materials_config->add_pem_key_cert_pair( ConvertToCorePemKeyCertPair(pem_key_cert_pair)); -} - +} + void TlsCredentialReloadArg::set_key_materials( const TString& pem_root_certs, std::vector<TlsKeyMaterialsConfig::PemKeyCertPair> pem_key_cert_pair_list) { @@ -124,163 +124,163 @@ void TlsCredentialReloadArg::set_key_materials( c_pem_key_cert_pair_list); } -void TlsCredentialReloadArg::set_key_materials_config( - const std::shared_ptr<TlsKeyMaterialsConfig>& key_materials_config) { - if (key_materials_config == nullptr) { - c_arg_->key_materials_config = nullptr; - return; - } +void TlsCredentialReloadArg::set_key_materials_config( + const std::shared_ptr<TlsKeyMaterialsConfig>& key_materials_config) { + if (key_materials_config == nullptr) { + c_arg_->key_materials_config = nullptr; + return; + } ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1> - c_pem_key_cert_pair_list; - for (const auto& key_cert_pair : - key_materials_config->pem_key_cert_pair_list()) { - grpc_ssl_pem_key_cert_pair* ssl_pair = - (grpc_ssl_pem_key_cert_pair*)gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair)); - ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str()); - ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str()); - ::grpc_core::PemKeyCertPair c_pem_key_cert_pair = - ::grpc_core::PemKeyCertPair(ssl_pair); - c_pem_key_cert_pair_list.emplace_back(std::move(c_pem_key_cert_pair)); - } - ::grpc_core::UniquePtr<char> c_pem_root_certs( - gpr_strdup(key_materials_config->pem_root_certs().c_str())); - if (c_arg_->key_materials_config == nullptr) { - c_arg_->key_materials_config = grpc_tls_key_materials_config_create(); - } - c_arg_->key_materials_config->set_key_materials( + c_pem_key_cert_pair_list; + for (const auto& key_cert_pair : + key_materials_config->pem_key_cert_pair_list()) { + grpc_ssl_pem_key_cert_pair* ssl_pair = + (grpc_ssl_pem_key_cert_pair*)gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair)); + ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str()); + ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str()); + ::grpc_core::PemKeyCertPair c_pem_key_cert_pair = + ::grpc_core::PemKeyCertPair(ssl_pair); + c_pem_key_cert_pair_list.emplace_back(std::move(c_pem_key_cert_pair)); + } + ::grpc_core::UniquePtr<char> c_pem_root_certs( + gpr_strdup(key_materials_config->pem_root_certs().c_str())); + if (c_arg_->key_materials_config == nullptr) { + c_arg_->key_materials_config = grpc_tls_key_materials_config_create(); + } + c_arg_->key_materials_config->set_key_materials( key_materials_config->pem_root_certs().c_str(), c_pem_key_cert_pair_list); - c_arg_->key_materials_config->set_version(key_materials_config->version()); -} - -void TlsCredentialReloadArg::set_status( - grpc_ssl_certificate_config_reload_status status) { - c_arg_->status = status; -} - -void TlsCredentialReloadArg::set_error_details( + c_arg_->key_materials_config->set_version(key_materials_config->version()); +} + +void TlsCredentialReloadArg::set_status( + grpc_ssl_certificate_config_reload_status status) { + c_arg_->status = status; +} + +void TlsCredentialReloadArg::set_error_details( const TString& error_details) { c_arg_->error_details->set_error_details(error_details.c_str()); -} - -void TlsCredentialReloadArg::OnCredentialReloadDoneCallback() { - if (c_arg_->cb == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg callback API is nullptr"); - return; - } - c_arg_->cb(c_arg_); -} - -/** gRPC TLS credential reload config API implementation **/ -TlsCredentialReloadConfig::TlsCredentialReloadConfig( - std::shared_ptr<TlsCredentialReloadInterface> credential_reload_interface) - : credential_reload_interface_(std::move(credential_reload_interface)) { - c_config_ = grpc_tls_credential_reload_config_create( - nullptr, &TlsCredentialReloadConfigCSchedule, - &TlsCredentialReloadConfigCCancel, nullptr); - c_config_->set_context(static_cast<void*>(this)); -} - -TlsCredentialReloadConfig::~TlsCredentialReloadConfig() {} - -/** gRPC TLS server authorization check arg API implementation **/ -TlsServerAuthorizationCheckArg::TlsServerAuthorizationCheckArg( - grpc_tls_server_authorization_check_arg* arg) - : c_arg_(arg) { - if (c_arg_ != nullptr && c_arg_->context != nullptr) { - gpr_log(GPR_ERROR, "c_arg context has already been set"); - } - c_arg_->context = static_cast<void*>(this); - c_arg_->destroy_context = &TlsServerAuthorizationCheckArgDestroyContext; -} - -TlsServerAuthorizationCheckArg::~TlsServerAuthorizationCheckArg() {} - -void* TlsServerAuthorizationCheckArg::cb_user_data() const { - return c_arg_->cb_user_data; -} - -int TlsServerAuthorizationCheckArg::success() const { return c_arg_->success; } - +} + +void TlsCredentialReloadArg::OnCredentialReloadDoneCallback() { + if (c_arg_->cb == nullptr) { + gpr_log(GPR_ERROR, "credential reload arg callback API is nullptr"); + return; + } + c_arg_->cb(c_arg_); +} + +/** gRPC TLS credential reload config API implementation **/ +TlsCredentialReloadConfig::TlsCredentialReloadConfig( + std::shared_ptr<TlsCredentialReloadInterface> credential_reload_interface) + : credential_reload_interface_(std::move(credential_reload_interface)) { + c_config_ = grpc_tls_credential_reload_config_create( + nullptr, &TlsCredentialReloadConfigCSchedule, + &TlsCredentialReloadConfigCCancel, nullptr); + c_config_->set_context(static_cast<void*>(this)); +} + +TlsCredentialReloadConfig::~TlsCredentialReloadConfig() {} + +/** gRPC TLS server authorization check arg API implementation **/ +TlsServerAuthorizationCheckArg::TlsServerAuthorizationCheckArg( + grpc_tls_server_authorization_check_arg* arg) + : c_arg_(arg) { + if (c_arg_ != nullptr && c_arg_->context != nullptr) { + gpr_log(GPR_ERROR, "c_arg context has already been set"); + } + c_arg_->context = static_cast<void*>(this); + c_arg_->destroy_context = &TlsServerAuthorizationCheckArgDestroyContext; +} + +TlsServerAuthorizationCheckArg::~TlsServerAuthorizationCheckArg() {} + +void* TlsServerAuthorizationCheckArg::cb_user_data() const { + return c_arg_->cb_user_data; +} + +int TlsServerAuthorizationCheckArg::success() const { return c_arg_->success; } + TString TlsServerAuthorizationCheckArg::target_name() const { TString cpp_target_name(c_arg_->target_name); - return cpp_target_name; -} - + return cpp_target_name; +} + TString TlsServerAuthorizationCheckArg::peer_cert() const { TString cpp_peer_cert(c_arg_->peer_cert); - return cpp_peer_cert; -} - + return cpp_peer_cert; +} + TString TlsServerAuthorizationCheckArg::peer_cert_full_chain() const { TString cpp_peer_cert_full_chain(c_arg_->peer_cert_full_chain); return cpp_peer_cert_full_chain; } -grpc_status_code TlsServerAuthorizationCheckArg::status() const { - return c_arg_->status; -} - +grpc_status_code TlsServerAuthorizationCheckArg::status() const { + return c_arg_->status; +} + TString TlsServerAuthorizationCheckArg::error_details() const { return c_arg_->error_details->error_details(); -} - -void TlsServerAuthorizationCheckArg::set_cb_user_data(void* cb_user_data) { - c_arg_->cb_user_data = cb_user_data; -} - -void TlsServerAuthorizationCheckArg::set_success(int success) { - c_arg_->success = success; -} - -void TlsServerAuthorizationCheckArg::set_target_name( +} + +void TlsServerAuthorizationCheckArg::set_cb_user_data(void* cb_user_data) { + c_arg_->cb_user_data = cb_user_data; +} + +void TlsServerAuthorizationCheckArg::set_success(int success) { + c_arg_->success = success; +} + +void TlsServerAuthorizationCheckArg::set_target_name( const TString& target_name) { - c_arg_->target_name = gpr_strdup(target_name.c_str()); -} - -void TlsServerAuthorizationCheckArg::set_peer_cert( + c_arg_->target_name = gpr_strdup(target_name.c_str()); +} + +void TlsServerAuthorizationCheckArg::set_peer_cert( const TString& peer_cert) { - c_arg_->peer_cert = gpr_strdup(peer_cert.c_str()); -} - + c_arg_->peer_cert = gpr_strdup(peer_cert.c_str()); +} + void TlsServerAuthorizationCheckArg::set_peer_cert_full_chain( const TString& peer_cert_full_chain) { c_arg_->peer_cert_full_chain = gpr_strdup(peer_cert_full_chain.c_str()); } -void TlsServerAuthorizationCheckArg::set_status(grpc_status_code status) { - c_arg_->status = status; -} - -void TlsServerAuthorizationCheckArg::set_error_details( +void TlsServerAuthorizationCheckArg::set_status(grpc_status_code status) { + c_arg_->status = status; +} + +void TlsServerAuthorizationCheckArg::set_error_details( const TString& error_details) { c_arg_->error_details->set_error_details(error_details.c_str()); -} - -void TlsServerAuthorizationCheckArg::OnServerAuthorizationCheckDoneCallback() { - if (c_arg_->cb == nullptr) { - gpr_log(GPR_ERROR, "server authorizaton check arg callback API is nullptr"); - return; - } - c_arg_->cb(c_arg_); -} - -/** gRPC TLS server authorization check config API implementation. **/ -TlsServerAuthorizationCheckConfig::TlsServerAuthorizationCheckConfig( - std::shared_ptr<TlsServerAuthorizationCheckInterface> - server_authorization_check_interface) - : server_authorization_check_interface_( - std::move(server_authorization_check_interface)) { - c_config_ = grpc_tls_server_authorization_check_config_create( - nullptr, &TlsServerAuthorizationCheckConfigCSchedule, - &TlsServerAuthorizationCheckConfigCCancel, nullptr); - c_config_->set_context(static_cast<void*>(this)); -} - -TlsServerAuthorizationCheckConfig::~TlsServerAuthorizationCheckConfig() {} - -/** gRPC TLS credential options API implementation **/ -TlsCredentialsOptions::TlsCredentialsOptions( +} + +void TlsServerAuthorizationCheckArg::OnServerAuthorizationCheckDoneCallback() { + if (c_arg_->cb == nullptr) { + gpr_log(GPR_ERROR, "server authorizaton check arg callback API is nullptr"); + return; + } + c_arg_->cb(c_arg_); +} + +/** gRPC TLS server authorization check config API implementation. **/ +TlsServerAuthorizationCheckConfig::TlsServerAuthorizationCheckConfig( + std::shared_ptr<TlsServerAuthorizationCheckInterface> + server_authorization_check_interface) + : server_authorization_check_interface_( + std::move(server_authorization_check_interface)) { + c_config_ = grpc_tls_server_authorization_check_config_create( + nullptr, &TlsServerAuthorizationCheckConfigCSchedule, + &TlsServerAuthorizationCheckConfigCCancel, nullptr); + c_config_->set_context(static_cast<void*>(this)); +} + +TlsServerAuthorizationCheckConfig::~TlsServerAuthorizationCheckConfig() {} + +/** gRPC TLS credential options API implementation **/ +TlsCredentialsOptions::TlsCredentialsOptions( grpc_tls_server_verification_option server_verification_option, std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config, @@ -292,7 +292,7 @@ TlsCredentialsOptions::TlsCredentialsOptions( std::move(server_authorization_check_config)) {} TlsCredentialsOptions::TlsCredentialsOptions( - grpc_ssl_client_certificate_request_type cert_request_type, + grpc_ssl_client_certificate_request_type cert_request_type, std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config) : TlsCredentialsOptions(cert_request_type, GRPC_TLS_SERVER_VERIFICATION, @@ -302,42 +302,42 @@ TlsCredentialsOptions::TlsCredentialsOptions( TlsCredentialsOptions::TlsCredentialsOptions( grpc_ssl_client_certificate_request_type cert_request_type, grpc_tls_server_verification_option server_verification_option, - std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, - std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config, - std::shared_ptr<TlsServerAuthorizationCheckConfig> - server_authorization_check_config) - : cert_request_type_(cert_request_type), + std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, + std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config, + std::shared_ptr<TlsServerAuthorizationCheckConfig> + server_authorization_check_config) + : cert_request_type_(cert_request_type), server_verification_option_(server_verification_option), - key_materials_config_(std::move(key_materials_config)), - credential_reload_config_(std::move(credential_reload_config)), - server_authorization_check_config_( - std::move(server_authorization_check_config)) { - c_credentials_options_ = grpc_tls_credentials_options_create(); - grpc_tls_credentials_options_set_cert_request_type(c_credentials_options_, - cert_request_type_); - if (key_materials_config_ != nullptr) { - grpc_tls_credentials_options_set_key_materials_config( - c_credentials_options_, - ConvertToCKeyMaterialsConfig(key_materials_config_)); - } - if (credential_reload_config_ != nullptr) { - grpc_tls_credentials_options_set_credential_reload_config( - c_credentials_options_, credential_reload_config_->c_config()); - } - if (server_authorization_check_config_ != nullptr) { - grpc_tls_credentials_options_set_server_authorization_check_config( - c_credentials_options_, server_authorization_check_config_->c_config()); - } + key_materials_config_(std::move(key_materials_config)), + credential_reload_config_(std::move(credential_reload_config)), + server_authorization_check_config_( + std::move(server_authorization_check_config)) { + c_credentials_options_ = grpc_tls_credentials_options_create(); + grpc_tls_credentials_options_set_cert_request_type(c_credentials_options_, + cert_request_type_); + if (key_materials_config_ != nullptr) { + grpc_tls_credentials_options_set_key_materials_config( + c_credentials_options_, + ConvertToCKeyMaterialsConfig(key_materials_config_)); + } + if (credential_reload_config_ != nullptr) { + grpc_tls_credentials_options_set_credential_reload_config( + c_credentials_options_, credential_reload_config_->c_config()); + } + if (server_authorization_check_config_ != nullptr) { + grpc_tls_credentials_options_set_server_authorization_check_config( + c_credentials_options_, server_authorization_check_config_->c_config()); + } grpc_tls_credentials_options_set_server_verification_option( c_credentials_options_, server_verification_option); -} - +} + /** Whenever a TlsCredentialsOptions instance is created, the caller takes * ownership of the c_credentials_options_ pointer (see e.g. the implementation * of the TlsCredentials API in secure_credentials.cc). For this reason, the * TlsCredentialsOptions destructor is not responsible for freeing * c_credentials_options_. **/ -TlsCredentialsOptions::~TlsCredentialsOptions() {} - -} // namespace experimental +TlsCredentialsOptions::~TlsCredentialsOptions() {} + +} // namespace experimental } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc index ed84003212..99d6eaf26c 100644 --- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc +++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc @@ -1,149 +1,149 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + #include "y_absl/container/inlined_vector.h" #include <grpcpp/security/tls_credentials_options.h> -#include "src/cpp/common/tls_credentials_options_util.h" - +#include "src/cpp/common/tls_credentials_options_util.h" + namespace grpc { -namespace experimental { - -/** Converts the Cpp key materials to C key materials; this allocates memory for - * the C key materials. Note that the user must free - * the underlying pointer to private key and cert chain duplicates; they are not - * freed when the grpc_core::UniquePtr<char> member variables of PemKeyCertPair - * are unused. Similarly, the user must free the underlying pointer to - * c_pem_root_certs. **/ -grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig( - const std::shared_ptr<TlsKeyMaterialsConfig>& config) { - if (config == nullptr) { - return nullptr; - } - grpc_tls_key_materials_config* c_config = - grpc_tls_key_materials_config_create(); +namespace experimental { + +/** Converts the Cpp key materials to C key materials; this allocates memory for + * the C key materials. Note that the user must free + * the underlying pointer to private key and cert chain duplicates; they are not + * freed when the grpc_core::UniquePtr<char> member variables of PemKeyCertPair + * are unused. Similarly, the user must free the underlying pointer to + * c_pem_root_certs. **/ +grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig( + const std::shared_ptr<TlsKeyMaterialsConfig>& config) { + if (config == nullptr) { + return nullptr; + } + grpc_tls_key_materials_config* c_config = + grpc_tls_key_materials_config_create(); ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1> - c_pem_key_cert_pair_list; - for (const auto& key_cert_pair : config->pem_key_cert_pair_list()) { - grpc_ssl_pem_key_cert_pair* ssl_pair = - (grpc_ssl_pem_key_cert_pair*)gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair)); - ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str()); - ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str()); - ::grpc_core::PemKeyCertPair c_pem_key_cert_pair = - ::grpc_core::PemKeyCertPair(ssl_pair); - c_pem_key_cert_pair_list.push_back(::std::move(c_pem_key_cert_pair)); - } + c_pem_key_cert_pair_list; + for (const auto& key_cert_pair : config->pem_key_cert_pair_list()) { + grpc_ssl_pem_key_cert_pair* ssl_pair = + (grpc_ssl_pem_key_cert_pair*)gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair)); + ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str()); + ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str()); + ::grpc_core::PemKeyCertPair c_pem_key_cert_pair = + ::grpc_core::PemKeyCertPair(ssl_pair); + c_pem_key_cert_pair_list.push_back(::std::move(c_pem_key_cert_pair)); + } c_config->set_key_materials(config->pem_root_certs().c_str(), c_pem_key_cert_pair_list); - c_config->set_version(config->version()); - return c_config; -} - -/** The C schedule and cancel functions for the credential reload config. - * They populate a C credential reload arg with the result of a C++ credential - * reload schedule/cancel API. **/ -int TlsCredentialReloadConfigCSchedule(void* /*config_user_data*/, - grpc_tls_credential_reload_arg* arg) { - if (arg == nullptr || arg->config == nullptr || - arg->config->context() == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg was not properly initialized"); - return 1; - } - TlsCredentialReloadConfig* cpp_config = - static_cast<TlsCredentialReloadConfig*>(arg->config->context()); - TlsCredentialReloadArg* cpp_arg = new TlsCredentialReloadArg(arg); - int schedule_result = cpp_config->Schedule(cpp_arg); - return schedule_result; -} - -void TlsCredentialReloadConfigCCancel(void* /*config_user_data*/, - grpc_tls_credential_reload_arg* arg) { - if (arg == nullptr || arg->config == nullptr || - arg->config->context() == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg was not properly initialized"); - return; - } - if (arg->context == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg schedule has already completed"); - return; - } - TlsCredentialReloadConfig* cpp_config = - static_cast<TlsCredentialReloadConfig*>(arg->config->context()); - TlsCredentialReloadArg* cpp_arg = - static_cast<TlsCredentialReloadArg*>(arg->context); - cpp_config->Cancel(cpp_arg); -} - -void TlsCredentialReloadArgDestroyContext(void* context) { - if (context != nullptr) { - TlsCredentialReloadArg* cpp_arg = - static_cast<TlsCredentialReloadArg*>(context); - delete cpp_arg; - } -} - -/** The C schedule and cancel functions for the server authorization check - * config. They populate a C server authorization check arg with the result - * of a C++ server authorization check schedule/cancel API. **/ -int TlsServerAuthorizationCheckConfigCSchedule( - void* /*config_user_data*/, grpc_tls_server_authorization_check_arg* arg) { - if (arg == nullptr || arg->config == nullptr || - arg->config->context() == nullptr) { - gpr_log(GPR_ERROR, - "server authorization check arg was not properly initialized"); - return 1; - } - TlsServerAuthorizationCheckConfig* cpp_config = - static_cast<TlsServerAuthorizationCheckConfig*>(arg->config->context()); - TlsServerAuthorizationCheckArg* cpp_arg = - new TlsServerAuthorizationCheckArg(arg); - int schedule_result = cpp_config->Schedule(cpp_arg); - return schedule_result; -} - -void TlsServerAuthorizationCheckConfigCCancel( - void* /*config_user_data*/, grpc_tls_server_authorization_check_arg* arg) { - if (arg == nullptr || arg->config == nullptr || - arg->config->context() == nullptr) { - gpr_log(GPR_ERROR, - "server authorization check arg was not properly initialized"); - return; - } - if (arg->context == nullptr) { - gpr_log(GPR_ERROR, - "server authorization check arg schedule has already completed"); - return; - } - TlsServerAuthorizationCheckConfig* cpp_config = - static_cast<TlsServerAuthorizationCheckConfig*>(arg->config->context()); - TlsServerAuthorizationCheckArg* cpp_arg = - static_cast<TlsServerAuthorizationCheckArg*>(arg->context); - cpp_config->Cancel(cpp_arg); -} - -void TlsServerAuthorizationCheckArgDestroyContext(void* context) { - if (context != nullptr) { - TlsServerAuthorizationCheckArg* cpp_arg = - static_cast<TlsServerAuthorizationCheckArg*>(context); - delete cpp_arg; - } -} - -} // namespace experimental + c_config->set_version(config->version()); + return c_config; +} + +/** The C schedule and cancel functions for the credential reload config. + * They populate a C credential reload arg with the result of a C++ credential + * reload schedule/cancel API. **/ +int TlsCredentialReloadConfigCSchedule(void* /*config_user_data*/, + grpc_tls_credential_reload_arg* arg) { + if (arg == nullptr || arg->config == nullptr || + arg->config->context() == nullptr) { + gpr_log(GPR_ERROR, "credential reload arg was not properly initialized"); + return 1; + } + TlsCredentialReloadConfig* cpp_config = + static_cast<TlsCredentialReloadConfig*>(arg->config->context()); + TlsCredentialReloadArg* cpp_arg = new TlsCredentialReloadArg(arg); + int schedule_result = cpp_config->Schedule(cpp_arg); + return schedule_result; +} + +void TlsCredentialReloadConfigCCancel(void* /*config_user_data*/, + grpc_tls_credential_reload_arg* arg) { + if (arg == nullptr || arg->config == nullptr || + arg->config->context() == nullptr) { + gpr_log(GPR_ERROR, "credential reload arg was not properly initialized"); + return; + } + if (arg->context == nullptr) { + gpr_log(GPR_ERROR, "credential reload arg schedule has already completed"); + return; + } + TlsCredentialReloadConfig* cpp_config = + static_cast<TlsCredentialReloadConfig*>(arg->config->context()); + TlsCredentialReloadArg* cpp_arg = + static_cast<TlsCredentialReloadArg*>(arg->context); + cpp_config->Cancel(cpp_arg); +} + +void TlsCredentialReloadArgDestroyContext(void* context) { + if (context != nullptr) { + TlsCredentialReloadArg* cpp_arg = + static_cast<TlsCredentialReloadArg*>(context); + delete cpp_arg; + } +} + +/** The C schedule and cancel functions for the server authorization check + * config. They populate a C server authorization check arg with the result + * of a C++ server authorization check schedule/cancel API. **/ +int TlsServerAuthorizationCheckConfigCSchedule( + void* /*config_user_data*/, grpc_tls_server_authorization_check_arg* arg) { + if (arg == nullptr || arg->config == nullptr || + arg->config->context() == nullptr) { + gpr_log(GPR_ERROR, + "server authorization check arg was not properly initialized"); + return 1; + } + TlsServerAuthorizationCheckConfig* cpp_config = + static_cast<TlsServerAuthorizationCheckConfig*>(arg->config->context()); + TlsServerAuthorizationCheckArg* cpp_arg = + new TlsServerAuthorizationCheckArg(arg); + int schedule_result = cpp_config->Schedule(cpp_arg); + return schedule_result; +} + +void TlsServerAuthorizationCheckConfigCCancel( + void* /*config_user_data*/, grpc_tls_server_authorization_check_arg* arg) { + if (arg == nullptr || arg->config == nullptr || + arg->config->context() == nullptr) { + gpr_log(GPR_ERROR, + "server authorization check arg was not properly initialized"); + return; + } + if (arg->context == nullptr) { + gpr_log(GPR_ERROR, + "server authorization check arg schedule has already completed"); + return; + } + TlsServerAuthorizationCheckConfig* cpp_config = + static_cast<TlsServerAuthorizationCheckConfig*>(arg->config->context()); + TlsServerAuthorizationCheckArg* cpp_arg = + static_cast<TlsServerAuthorizationCheckArg*>(arg->context); + cpp_config->Cancel(cpp_arg); +} + +void TlsServerAuthorizationCheckArgDestroyContext(void* context) { + if (context != nullptr) { + TlsServerAuthorizationCheckArg* cpp_arg = + static_cast<TlsServerAuthorizationCheckArg*>(context); + delete cpp_arg; + } +} + +} // namespace experimental } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h index 4ee04d15d7..889fa3b55c 100644 --- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h +++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h @@ -1,58 +1,58 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_INTERNAL_CPP_COMMON_TLS_CREDENTIALS_OPTIONS_UTIL_H -#define GRPC_INTERNAL_CPP_COMMON_TLS_CREDENTIALS_OPTIONS_UTIL_H - -#include <grpc/grpc_security.h> -#include <grpcpp/security/tls_credentials_options.h> - -#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h" - +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_COMMON_TLS_CREDENTIALS_OPTIONS_UTIL_H +#define GRPC_INTERNAL_CPP_COMMON_TLS_CREDENTIALS_OPTIONS_UTIL_H + +#include <grpc/grpc_security.h> +#include <grpcpp/security/tls_credentials_options.h> + +#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h" + namespace grpc { -namespace experimental { - -/** The following function is exposed for testing purposes. **/ -grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig( - const std::shared_ptr<TlsKeyMaterialsConfig>& config); - -/** The following 4 functions convert the user-provided schedule or cancel - * functions into C style schedule or cancel functions. These are internal - * functions, not meant to be accessed by the user. **/ -int TlsCredentialReloadConfigCSchedule(void* config_user_data, - grpc_tls_credential_reload_arg* arg); - -void TlsCredentialReloadConfigCCancel(void* config_user_data, - grpc_tls_credential_reload_arg* arg); - -int TlsServerAuthorizationCheckConfigCSchedule( - void* config_user_data, grpc_tls_server_authorization_check_arg* arg); - -void TlsServerAuthorizationCheckConfigCCancel( - void* config_user_data, grpc_tls_server_authorization_check_arg* arg); - -/** The following 2 functions cleanup data created in the above C schedule - * functions. **/ -void TlsCredentialReloadArgDestroyContext(void* context); - -void TlsServerAuthorizationCheckArgDestroyContext(void* context); - -} // namespace experimental +namespace experimental { + +/** The following function is exposed for testing purposes. **/ +grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig( + const std::shared_ptr<TlsKeyMaterialsConfig>& config); + +/** The following 4 functions convert the user-provided schedule or cancel + * functions into C style schedule or cancel functions. These are internal + * functions, not meant to be accessed by the user. **/ +int TlsCredentialReloadConfigCSchedule(void* config_user_data, + grpc_tls_credential_reload_arg* arg); + +void TlsCredentialReloadConfigCCancel(void* config_user_data, + grpc_tls_credential_reload_arg* arg); + +int TlsServerAuthorizationCheckConfigCSchedule( + void* config_user_data, grpc_tls_server_authorization_check_arg* arg); + +void TlsServerAuthorizationCheckConfigCCancel( + void* config_user_data, grpc_tls_server_authorization_check_arg* arg); + +/** The following 2 functions cleanup data created in the above C schedule + * functions. **/ +void TlsCredentialReloadArgDestroyContext(void* context); + +void TlsServerAuthorizationCheckArgDestroyContext(void* context); + +} // namespace experimental } // namespace grpc - -#endif // GRPC_INTERNAL_CPP_COMMON_TLS_CREDENTIALS_OPTIONS_UTIL_H + +#endif // GRPC_INTERNAL_CPP_COMMON_TLS_CREDENTIALS_OPTIONS_UTIL_H diff --git a/contrib/libs/grpc/src/cpp/common/validate_service_config.cc b/contrib/libs/grpc/src/cpp/common/validate_service_config.cc index f63cfbc68c..f263c95f03 100644 --- a/contrib/libs/grpc/src/cpp/common/validate_service_config.cc +++ b/contrib/libs/grpc/src/cpp/common/validate_service_config.cc @@ -1,40 +1,40 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/grpc.h> -#include <grpcpp/support/validate_service_config.h> - -#include "src/core/ext/filters/client_channel/service_config.h" - -namespace grpc { -namespace experimental { +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/grpc.h> +#include <grpcpp/support/validate_service_config.h> + +#include "src/core/ext/filters/client_channel/service_config.h" + +namespace grpc { +namespace experimental { TString ValidateServiceConfigJSON(const TString& service_config_json) { - grpc_init(); - grpc_error* error = GRPC_ERROR_NONE; + grpc_init(); + grpc_error* error = GRPC_ERROR_NONE; grpc_core::ServiceConfig::Create(/*args=*/nullptr, service_config_json.c_str(), &error); TString return_value; - if (error != GRPC_ERROR_NONE) { - return_value = grpc_error_string(error); - GRPC_ERROR_UNREF(error); - } - grpc_shutdown(); - return return_value; -} -} // namespace experimental -} // namespace grpc + if (error != GRPC_ERROR_NONE) { + return_value = grpc_error_string(error); + GRPC_ERROR_UNREF(error); + } + grpc_shutdown(); + return return_value; +} +} // namespace experimental +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc index 1b388210c0..070db6fd9d 100644 --- a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc +++ b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc @@ -94,17 +94,17 @@ Status ProtoServerReflection::ServerReflectionInfo( void ProtoServerReflection::FillErrorResponse(const Status& status, ErrorResponse* error_response) { error_response->set_error_code(status.error_code()); - error_response->set_error_message(TProtoStringType(status.error_message())); + error_response->set_error_message(TProtoStringType(status.error_message())); } -Status ProtoServerReflection::ListService(ServerContext* /*context*/, +Status ProtoServerReflection::ListService(ServerContext* /*context*/, ListServiceResponse* response) { if (services_ == nullptr) { return Status(StatusCode::NOT_FOUND, "Services not found."); } - for (const auto& value : *services_) { + for (const auto& value : *services_) { ServiceResponse* service_response = response->add_service(); - service_response->set_name(TProtoStringType(value)); + service_response->set_name(TProtoStringType(value)); } return Status::OK; } @@ -117,7 +117,7 @@ Status ProtoServerReflection::GetFileByName( } const protobuf::FileDescriptor* file_desc = - descriptor_pool_->FindFileByName(TProtoStringType(filename)); + descriptor_pool_->FindFileByName(TProtoStringType(filename)); if (file_desc == nullptr) { return Status(StatusCode::NOT_FOUND, "File not found."); } @@ -134,7 +134,7 @@ Status ProtoServerReflection::GetFileContainingSymbol( } const protobuf::FileDescriptor* file_desc = - descriptor_pool_->FindFileContainingSymbol(TProtoStringType(symbol)); + descriptor_pool_->FindFileContainingSymbol(TProtoStringType(symbol)); if (file_desc == nullptr) { return Status(StatusCode::NOT_FOUND, "Symbol not found."); } @@ -144,7 +144,7 @@ Status ProtoServerReflection::GetFileContainingSymbol( } Status ProtoServerReflection::GetFileContainingExtension( - ServerContext* /*context*/, const ExtensionRequest* request, + ServerContext* /*context*/, const ExtensionRequest* request, ServerReflectionResponse* response) { if (descriptor_pool_ == nullptr) { return Status::CANCELLED; @@ -175,17 +175,17 @@ Status ProtoServerReflection::GetAllExtensionNumbers( } const protobuf::Descriptor* desc = - descriptor_pool_->FindMessageTypeByName(TProtoStringType(type)); + descriptor_pool_->FindMessageTypeByName(TProtoStringType(type)); if (desc == nullptr) { return Status(StatusCode::NOT_FOUND, "Type not found."); } std::vector<const protobuf::FieldDescriptor*> extensions; descriptor_pool_->FindAllExtensions(desc, &extensions); - for (const auto& value : extensions) { - response->add_extension_number(value->number()); + for (const auto& value : extensions) { + response->add_extension_number(value->number()); } - response->set_base_type_name(TProtoStringType(type)); + response->set_base_type_name(TProtoStringType(type)); return Status::OK; } @@ -199,7 +199,7 @@ void ProtoServerReflection::FillFileDescriptorResponse( seen_files->insert(file_desc->name()); protobuf::FileDescriptorProto file_desc_proto; - TProtoStringType data; + TProtoStringType data; file_desc->CopyTo(&file_desc_proto); file_desc_proto.SerializeToString(&data); response->mutable_file_descriptor_response()->add_file_descriptor_proto(data); diff --git a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection_plugin.cc b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection_plugin.cc index 007193d7f7..b580ce1502 100644 --- a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection_plugin.cc +++ b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection_plugin.cc @@ -42,7 +42,7 @@ void ProtoServerReflectionPlugin::Finish(grpc::ServerInitializer* si) { } void ProtoServerReflectionPlugin::ChangeArguments(const TString& /*name*/, - void* /*value*/) {} + void* /*value*/) {} bool ProtoServerReflectionPlugin::has_sync_methods() const { if (reflection_service_) { diff --git a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc index 9aad932429..00bbe73d88 100644 --- a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc +++ b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc @@ -31,8 +31,8 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( args->SetString(name_, value_); } virtual void UpdatePlugins( - std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) - override {} + std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) + override {} private: const TString name_; @@ -52,8 +52,8 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( args->SetInt(name_, value_); } virtual void UpdatePlugins( - std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) - override {} + std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) + override {} private: const TString name_; diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc index 6dcf84bf40..e061c62ef4 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc @@ -27,25 +27,25 @@ namespace grpc { namespace { -grpc::protobuf::util::Status ParseJson(const char* json_str, - grpc::protobuf::Message* message) { - grpc::protobuf::json::JsonParseOptions options; +grpc::protobuf::util::Status ParseJson(const char* json_str, + grpc::protobuf::Message* message) { + grpc::protobuf::json::JsonParseOptions options; options.case_insensitive_enum_parsing = true; - return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); -} + return grpc::protobuf::json::JsonStringToMessage(json_str, message, options); +} } // namespace Status ChannelzService::GetTopChannels( - ServerContext* /*unused*/, - const channelz::v1::GetTopChannelsRequest* request, + ServerContext* /*unused*/, + const channelz::v1::GetTopChannelsRequest* request, channelz::v1::GetTopChannelsResponse* response) { char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); if (json_str == nullptr) { return Status(StatusCode::INTERNAL, "grpc_channelz_get_top_channels returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -54,14 +54,14 @@ Status ChannelzService::GetTopChannels( } Status ChannelzService::GetServers( - ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request, + ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request, channelz::v1::GetServersResponse* response) { char* json_str = grpc_channelz_get_servers(request->start_server_id()); if (json_str == nullptr) { return Status(StatusCode::INTERNAL, "grpc_channelz_get_servers returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -69,7 +69,7 @@ Status ChannelzService::GetServers( return Status::OK; } -Status ChannelzService::GetServer(ServerContext* /*unused*/, +Status ChannelzService::GetServer(ServerContext* /*unused*/, const channelz::v1::GetServerRequest* request, channelz::v1::GetServerResponse* response) { char* json_str = grpc_channelz_get_server(request->server_id()); @@ -77,7 +77,7 @@ Status ChannelzService::GetServer(ServerContext* /*unused*/, return Status(StatusCode::INTERNAL, "grpc_channelz_get_server returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -86,8 +86,8 @@ Status ChannelzService::GetServer(ServerContext* /*unused*/, } Status ChannelzService::GetServerSockets( - ServerContext* /*unused*/, - const channelz::v1::GetServerSocketsRequest* request, + ServerContext* /*unused*/, + const channelz::v1::GetServerSocketsRequest* request, channelz::v1::GetServerSocketsResponse* response) { char* json_str = grpc_channelz_get_server_sockets( request->server_id(), request->start_socket_id(), request->max_results()); @@ -95,7 +95,7 @@ Status ChannelzService::GetServerSockets( return Status(StatusCode::INTERNAL, "grpc_channelz_get_server_sockets returned null"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -104,13 +104,13 @@ Status ChannelzService::GetServerSockets( } Status ChannelzService::GetChannel( - ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request, + ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request, channelz::v1::GetChannelResponse* response) { char* json_str = grpc_channelz_get_channel(request->channel_id()); if (json_str == nullptr) { return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -119,15 +119,15 @@ Status ChannelzService::GetChannel( } Status ChannelzService::GetSubchannel( - ServerContext* /*unused*/, - const channelz::v1::GetSubchannelRequest* request, + ServerContext* /*unused*/, + const channelz::v1::GetSubchannelRequest* request, channelz::v1::GetSubchannelResponse* response) { char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); if (json_str == nullptr) { return Status(StatusCode::NOT_FOUND, "No object found for that SubchannelId"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); @@ -135,14 +135,14 @@ Status ChannelzService::GetSubchannel( return Status::OK; } -Status ChannelzService::GetSocket(ServerContext* /*unused*/, +Status ChannelzService::GetSocket(ServerContext* /*unused*/, const channelz::v1::GetSocketRequest* request, channelz::v1::GetSocketResponse* response) { char* json_str = grpc_channelz_get_socket(request->socket_id()); if (json_str == nullptr) { return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); } - grpc::protobuf::util::Status s = ParseJson(json_str, response); + grpc::protobuf::util::Status s = ParseJson(json_str, response); gpr_free(json_str); if (!s.ok()) { return Status(StatusCode::INTERNAL, s.ToString()); diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc index ae26a447ab..fd1ce81424 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc @@ -39,7 +39,7 @@ class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin { si->RegisterService(channelz_service_); } - void Finish(grpc::ServerInitializer* /*si*/) override {} + void Finish(grpc::ServerInitializer* /*si*/) override {} void ChangeArguments(const TString& /*name*/, void* /*value*/) override {} @@ -67,13 +67,13 @@ CreateChannelzServicePlugin() { new ChannelzServicePlugin()); } -} // namespace experimental -} // namespace channelz -} // namespace grpc -namespace grpc_impl { -namespace channelz { -namespace experimental { - +} // namespace experimental +} // namespace channelz +} // namespace grpc +namespace grpc_impl { +namespace channelz { +namespace experimental { + void InitChannelzService() { static struct Initializer { Initializer() { @@ -85,4 +85,4 @@ void InitChannelzService() { } // namespace experimental } // namespace channelz -} // namespace grpc_impl +} // namespace grpc_impl diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc index 77c5d6a263..004154730b 100644 --- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc +++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc @@ -19,7 +19,7 @@ #include "src/cpp/server/dynamic_thread_pool.h" #include <grpc/support/log.h> -#include <grpcpp/impl/codegen/sync.h> +#include <grpcpp/impl/codegen/sync.h> #include "src/core/lib/gprpp/thd.h" @@ -39,27 +39,27 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); } void DynamicThreadPool::DynamicThread::ThreadFunc() { pool_->ThreadFunc(); // Now that we have killed ourselves, we should reduce the thread count - grpc_core::MutexLock lock(&pool_->mu_); + grpc_core::MutexLock lock(&pool_->mu_); pool_->nthreads_--; // Move ourselves to dead list pool_->dead_threads_.push_back(this); if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { - pool_->shutdown_cv_.Signal(); + pool_->shutdown_cv_.Signal(); } } void DynamicThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. - grpc_core::ReleasableMutexLock lock(&mu_); + grpc_core::ReleasableMutexLock lock(&mu_); if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { break; } threads_waiting_++; - cv_.Wait(&mu_); + cv_.Wait(&mu_); threads_waiting_--; } // Drain callbacks before considering shutdown to ensure all work @@ -67,7 +67,7 @@ void DynamicThreadPool::ThreadFunc() { if (!callbacks_.empty()) { auto cb = callbacks_.front(); callbacks_.pop(); - lock.Unlock(); + lock.Unlock(); cb(); } else if (shutdown_) { break; @@ -81,7 +81,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) nthreads_(0), threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); nthreads_++; new DynamicThread(this); } @@ -94,17 +94,17 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { } DynamicThreadPool::~DynamicThreadPool() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); shutdown_ = true; - cv_.Broadcast(); + cv_.Broadcast(); while (nthreads_ != 0) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.Wait(&mu_); } ReapThreads(&dead_threads_); } void DynamicThreadPool::Add(const std::function<void()>& callback) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); // Add works to the callbacks list callbacks_.push(callback); // Increase pool size or notify as needed @@ -113,7 +113,7 @@ void DynamicThreadPool::Add(const std::function<void()>& callback) { nthreads_++; new DynamicThread(this); } else { - cv_.Signal(); + cv_.Signal(); } // Also use this chance to harvest dead threads if (!dead_threads_.empty()) { diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h index 6f9f943bc3..ea60265165 100644 --- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h +++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h @@ -25,7 +25,7 @@ #include <grpcpp/support/config.h> -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/cpp/server/thread_pool_interface.h" @@ -49,9 +49,9 @@ class DynamicThreadPool final : public ThreadPoolInterface { grpc_core::Thread thd_; void ThreadFunc(); }; - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - grpc_core::CondVar shutdown_cv_; + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + grpc_core::CondVar shutdown_cv_; bool shutdown_; std::queue<std::function<void()>> callbacks_; int reserve_threads_; diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc index 09d2a9d3b5..68bc20638e 100644 --- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc +++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc @@ -1,96 +1,96 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/cpp/server/external_connection_acceptor_impl.h" - -#include <memory> - +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/cpp/server/external_connection_acceptor_impl.h" + +#include <memory> + #include <grpcpp/server_builder.h> -#include <grpcpp/support/channel_arguments.h> - -namespace grpc { -namespace internal { -namespace { -// The actual type to return to user. It co-owns the internal impl object with -// the server. -class AcceptorWrapper : public experimental::ExternalConnectionAcceptor { - public: - explicit AcceptorWrapper(std::shared_ptr<ExternalConnectionAcceptorImpl> impl) - : impl_(std::move(impl)) {} - void HandleNewConnection(NewConnectionParameters* p) override { - impl_->HandleNewConnection(p); - } - - private: - std::shared_ptr<ExternalConnectionAcceptorImpl> impl_; -}; -} // namespace - -ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl( +#include <grpcpp/support/channel_arguments.h> + +namespace grpc { +namespace internal { +namespace { +// The actual type to return to user. It co-owns the internal impl object with +// the server. +class AcceptorWrapper : public experimental::ExternalConnectionAcceptor { + public: + explicit AcceptorWrapper(std::shared_ptr<ExternalConnectionAcceptorImpl> impl) + : impl_(std::move(impl)) {} + void HandleNewConnection(NewConnectionParameters* p) override { + impl_->HandleNewConnection(p); + } + + private: + std::shared_ptr<ExternalConnectionAcceptorImpl> impl_; +}; +} // namespace + +ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl( const TString& name, - ServerBuilder::experimental_type::ExternalConnectionType type, - std::shared_ptr<ServerCredentials> creds) - : name_(name), creds_(std::move(creds)) { - GPR_ASSERT(type == - ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD); -} - -std::unique_ptr<experimental::ExternalConnectionAcceptor> -ExternalConnectionAcceptorImpl::GetAcceptor() { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!has_acceptor_); - has_acceptor_ = true; - return std::unique_ptr<experimental::ExternalConnectionAcceptor>( - new AcceptorWrapper(shared_from_this())); -} - -void ExternalConnectionAcceptorImpl::HandleNewConnection( - experimental::ExternalConnectionAcceptor::NewConnectionParameters* p) { - grpc_core::MutexLock lock(&mu_); - if (shutdown_ || !started_) { - // TODO(yangg) clean up. - gpr_log( - GPR_ERROR, - "NOT handling external connection with fd %d, started %d, shutdown %d", - p->fd, started_, shutdown_); - return; - } - if (handler_) { - handler_->Handle(p->listener_fd, p->fd, p->read_buffer.c_buffer()); - } -} - -void ExternalConnectionAcceptorImpl::Shutdown() { - grpc_core::MutexLock lock(&mu_); - shutdown_ = true; -} - -void ExternalConnectionAcceptorImpl::Start() { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(!started_); - GPR_ASSERT(has_acceptor_); - GPR_ASSERT(!shutdown_); - started_ = true; -} - -void ExternalConnectionAcceptorImpl::SetToChannelArgs(ChannelArguments* args) { - args->SetPointer(name_.c_str(), &handler_); -} - -} // namespace internal -} // namespace grpc + ServerBuilder::experimental_type::ExternalConnectionType type, + std::shared_ptr<ServerCredentials> creds) + : name_(name), creds_(std::move(creds)) { + GPR_ASSERT(type == + ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD); +} + +std::unique_ptr<experimental::ExternalConnectionAcceptor> +ExternalConnectionAcceptorImpl::GetAcceptor() { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(!has_acceptor_); + has_acceptor_ = true; + return std::unique_ptr<experimental::ExternalConnectionAcceptor>( + new AcceptorWrapper(shared_from_this())); +} + +void ExternalConnectionAcceptorImpl::HandleNewConnection( + experimental::ExternalConnectionAcceptor::NewConnectionParameters* p) { + grpc_core::MutexLock lock(&mu_); + if (shutdown_ || !started_) { + // TODO(yangg) clean up. + gpr_log( + GPR_ERROR, + "NOT handling external connection with fd %d, started %d, shutdown %d", + p->fd, started_, shutdown_); + return; + } + if (handler_) { + handler_->Handle(p->listener_fd, p->fd, p->read_buffer.c_buffer()); + } +} + +void ExternalConnectionAcceptorImpl::Shutdown() { + grpc_core::MutexLock lock(&mu_); + shutdown_ = true; +} + +void ExternalConnectionAcceptorImpl::Start() { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(!started_); + GPR_ASSERT(has_acceptor_); + GPR_ASSERT(!shutdown_); + started_ = true; +} + +void ExternalConnectionAcceptorImpl::SetToChannelArgs(ChannelArguments* args) { + args->SetPointer(name_.c_str(), &handler_); +} + +} // namespace internal +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h index 430c72862e..2340c6e248 100644 --- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h +++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h @@ -1,71 +1,71 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ -#define SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ - -#include <memory> - -#include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/security/server_credentials.h> -#include <grpcpp/server_builder.h> -#include <grpcpp/support/channel_arguments.h> - -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/tcp_server.h" - -namespace grpc { -namespace internal { - -class ExternalConnectionAcceptorImpl - : public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> { - public: - ExternalConnectionAcceptorImpl( +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ +#define SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ + +#include <memory> + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpcpp/security/server_credentials.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/support/channel_arguments.h> + +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/tcp_server.h" + +namespace grpc { +namespace internal { + +class ExternalConnectionAcceptorImpl + : public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> { + public: + ExternalConnectionAcceptorImpl( const TString& name, - ServerBuilder::experimental_type::ExternalConnectionType type, - std::shared_ptr<ServerCredentials> creds); - // Should only be called once. - std::unique_ptr<experimental::ExternalConnectionAcceptor> GetAcceptor(); - - void HandleNewConnection( - experimental::ExternalConnectionAcceptor::NewConnectionParameters* p); - - void Shutdown(); - - void Start(); - - const char* name() { return name_.c_str(); } - - ServerCredentials* GetCredentials() { return creds_.get(); } - - void SetToChannelArgs(::grpc::ChannelArguments* args); - - private: + ServerBuilder::experimental_type::ExternalConnectionType type, + std::shared_ptr<ServerCredentials> creds); + // Should only be called once. + std::unique_ptr<experimental::ExternalConnectionAcceptor> GetAcceptor(); + + void HandleNewConnection( + experimental::ExternalConnectionAcceptor::NewConnectionParameters* p); + + void Shutdown(); + + void Start(); + + const char* name() { return name_.c_str(); } + + ServerCredentials* GetCredentials() { return creds_.get(); } + + void SetToChannelArgs(::grpc::ChannelArguments* args); + + private: const TString name_; - std::shared_ptr<ServerCredentials> creds_; - grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned - grpc_core::Mutex mu_; - bool has_acceptor_ = false; - bool started_ = false; - bool shutdown_ = false; -}; - -} // namespace internal -} // namespace grpc - -#endif // SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ + std::shared_ptr<ServerCredentials> creds_; + grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned + grpc_core::Mutex mu_; + bool has_acceptor_ = false; + bool started_ = false; + bool shutdown_ = false; +}; + +} // namespace internal +} // namespace grpc + +#endif // SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_ diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index 3cc508d0cb..133a13e028 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -26,11 +26,11 @@ #include <grpcpp/impl/codegen/method_handler.h> #include "src/cpp/server/health/default_health_check_service.h" -#include "src/proto/grpc/health/v1/health.upb.h" +#include "src/proto/grpc/health/v1/health.upb.h" #include "upb/upb.hpp" -#define MAX_SERVICE_NAME_LENGTH 200 - +#define MAX_SERVICE_NAME_LENGTH 200 + namespace grpc { // @@ -43,7 +43,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const TString& service_name, bool serving) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -53,7 +53,7 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -64,7 +64,7 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -78,7 +78,7 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const TString& service_name) const { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) { return NOT_FOUND; @@ -90,7 +90,7 @@ DefaultHealthCheckService::GetServingStatus( void DefaultHealthCheckService::RegisterCallHandler( const TString& service_name, std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; service_data.AddCallHandler(handler /* copies ref */); HealthCheckServiceImpl::CallHandler* h = handler.get(); @@ -100,7 +100,7 @@ void DefaultHealthCheckService::RegisterCallHandler( void DefaultHealthCheckService::UnregisterCallHandler( const TString& service_name, const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; @@ -168,7 +168,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { // We will reach here after the server starts shutting down. shutdown_ = true; { - grpc_core::MutexLock lock(&cq_shutdown_mu_); + grpc_core::MutexLock lock(&cq_shutdown_mu_); cq_->Shutdown(); } thread_->Join(); @@ -216,43 +216,43 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( copy_to += slices[i].size(); } } - upb::Arena arena; - grpc_health_v1_HealthCheckRequest* request_struct = - grpc_health_v1_HealthCheckRequest_parse( - reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); + upb::Arena arena; + grpc_health_v1_HealthCheckRequest* request_struct = + grpc_health_v1_HealthCheckRequest_parse( + reinterpret_cast<char*>(request_bytes), request_size, arena.ptr()); if (slices.size() > 1) { gpr_free(request_bytes); } - if (request_struct == nullptr) { - return false; - } - upb_strview service = - grpc_health_v1_HealthCheckRequest_service(request_struct); - if (service.size > MAX_SERVICE_NAME_LENGTH) { - return false; - } - service_name->assign(service.data, service.size); + if (request_struct == nullptr) { + return false; + } + upb_strview service = + grpc_health_v1_HealthCheckRequest_service(request_struct); + if (service.size > MAX_SERVICE_NAME_LENGTH) { + return false; + } + service_name->assign(service.data, service.size); return true; } bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( ServingStatus status, ByteBuffer* response) { - upb::Arena arena; - grpc_health_v1_HealthCheckResponse* response_struct = - grpc_health_v1_HealthCheckResponse_new(arena.ptr()); - grpc_health_v1_HealthCheckResponse_set_status( - response_struct, + upb::Arena arena; + grpc_health_v1_HealthCheckResponse* response_struct = + grpc_health_v1_HealthCheckResponse_new(arena.ptr()); + grpc_health_v1_HealthCheckResponse_set_status( + response_struct, status == NOT_FOUND - ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN - : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING - : grpc_health_v1_HealthCheckResponse_NOT_SERVING); - size_t buf_length; - char* buf = grpc_health_v1_HealthCheckResponse_serialize( - response_struct, arena.ptr(), &buf_length); - if (buf == nullptr) { - return false; - } - grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length); + ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN + : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING + : grpc_health_v1_HealthCheckResponse_NOT_SERVING); + size_t buf_length; + char* buf = grpc_health_v1_HealthCheckResponse_serialize( + response_struct, arena.ptr(), &buf_length); + if (buf == nullptr) { + return false; + } + grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length); Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); @@ -271,7 +271,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: std::make_shared<CheckCallHandler>(cq, database, service); CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; // Request a Check() call. handler->next_ = @@ -316,7 +316,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: } // Send response. { - grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); if (!service_->shutdown_) { next_ = CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, @@ -352,7 +352,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: std::make_shared<WatchCallHandler>(cq, database, service); WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; // Request AsyncNotifyWhenDone(). handler->on_done_notified_ = @@ -407,7 +407,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { - grpc_core::MutexLock lock(&send_mu_); + grpc_core::MutexLock lock(&send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. if (send_in_flight_) { @@ -425,7 +425,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: ByteBuffer response; bool success = service_->EncodeResponse(status, &response); // Grab shutdown lock and send response. - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { SendFinishLocked(std::move(self), Status::CANCELLED); return; @@ -447,7 +447,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::move(self), Status::CANCELLED); return; } - grpc_core::MutexLock lock(&send_mu_); + grpc_core::MutexLock lock(&send_mu_); send_in_flight_ = false; // If we got a new status since we started the last send, start a // new send for it. @@ -461,7 +461,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { if (finish_called_) return; - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) return; SendFinishLocked(std::move(self), status); } diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h index 9da1dfc15f..ad6a583294 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h @@ -27,11 +27,11 @@ #include <grpcpp/health_check_service_interface.h> #include <grpcpp/impl/codegen/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> -#include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" namespace grpc { @@ -119,8 +119,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { HealthCheckServiceImpl* service); // Not used for Check. - void SendHealth(std::shared_ptr<CallHandler> /*self*/, - ServingStatus /*status*/) override {} + void SendHealth(std::shared_ptr<CallHandler> /*self*/, + ServingStatus /*status*/) override {} private: // Called when we receive a call. @@ -198,7 +198,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { GenericServerAsyncWriter stream_; ServerContext ctx_; - grpc_core::Mutex send_mu_; + grpc_core::Mutex send_mu_; bool send_in_flight_ = false; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. @@ -227,7 +227,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // To synchronize the operations related to shutdown state of cq_, so that // we don't enqueue new tags into cq_ after it is already shut down. - grpc_core::Mutex cq_shutdown_mu_; + grpc_core::Mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; }; @@ -273,7 +273,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { const TString& service_name, const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); - mutable grpc_core::Mutex mu_; + mutable grpc_core::Mutex mu_; bool shutdown_ = false; // Guarded by mu_. std::map<TString, ServiceData> services_map_; // Guarded by mu_. std::unique_ptr<HealthCheckServiceImpl> impl_; diff --git a/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc b/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc index 3fa384ace9..1d47d67a32 100644 --- a/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc +++ b/contrib/libs/grpc/src/cpp/server/health/health_check_service_server_builder_option.cc @@ -30,6 +30,6 @@ void HealthCheckServiceServerBuilderOption::UpdateArguments( } void HealthCheckServiceServerBuilderOption::UpdatePlugins( - std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) {} + std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) {} } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc index 3f33f4e045..1564bf6f4f 100644 --- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc @@ -29,7 +29,7 @@ class InsecureServerCredentialsImpl final : public ServerCredentials { return grpc_server_add_insecure_http2_port(server, addr.c_str()); } void SetAuthMetadataProcessor( - const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override { + const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override { (void)processor; GPR_ASSERT(0); // Should not be called on InsecureServerCredentials. } diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc index 561d4f5048..d5f7f6973b 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc @@ -32,10 +32,10 @@ std::pair<uint64_t, uint64_t> GetCpuStatsImpl() { FILE* fp; fp = fopen("/proc/stat", "r"); uint64_t user, nice, system, idle; - if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { - // Something bad happened with the information, so assume it's all invalid - user = nice = system = idle = 0; - } + if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) { + // Something bad happened with the information, so assume it's all invalid + user = nice = system = idle = 0; + } fclose(fp); busy = user + nice + system; total = busy + idle; diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index 732602bcb7..580e00b99b 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -70,18 +70,18 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( &response_metadata); std::vector<grpc_metadata> consumed_md; - for (const auto& consumed : consumed_metadata) { + for (const auto& consumed : consumed_metadata) { grpc_metadata md_entry; - md_entry.key = SliceReferencingString(consumed.first); - md_entry.value = SliceReferencingString(consumed.second); + md_entry.key = SliceReferencingString(consumed.first); + md_entry.value = SliceReferencingString(consumed.second); md_entry.flags = 0; consumed_md.push_back(md_entry); } std::vector<grpc_metadata> response_md; - for (const auto& response : response_metadata) { + for (const auto& response : response_metadata) { grpc_metadata md_entry; - md_entry.key = SliceReferencingString(response.first); - md_entry.value = SliceReferencingString(response.second); + md_entry.key = SliceReferencingString(response.first); + md_entry.value = SliceReferencingString(response.second); md_entry.flags = 0; response_md.push_back(md_entry); } @@ -98,19 +98,19 @@ int SecureServerCredentials::AddPortToServer(const TString& addr, } void SecureServerCredentials::SetAuthMetadataProcessor( - const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) { - auto* wrapper = new grpc::AuthMetadataProcessorAyncWrapper(processor); + const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) { + auto* wrapper = new grpc::AuthMetadataProcessorAyncWrapper(processor); grpc_server_credentials_set_auth_metadata_processor( - creds_, {grpc::AuthMetadataProcessorAyncWrapper::Process, - grpc::AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); + creds_, {grpc::AuthMetadataProcessorAyncWrapper::Process, + grpc::AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); } std::shared_ptr<ServerCredentials> SslServerCredentials( - const grpc::SslServerCredentialsOptions& options) { + const grpc::SslServerCredentialsOptions& options) { std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs; - for (const auto& key_cert_pair : options.pem_key_cert_pairs) { - grpc_ssl_pem_key_cert_pair p = {key_cert_pair.private_key.c_str(), - key_cert_pair.cert_chain.c_str()}; + for (const auto& key_cert_pair : options.pem_key_cert_pairs) { + grpc_ssl_pem_key_cert_pair p = {key_cert_pair.private_key.c_str(), + key_cert_pair.cert_chain.c_str()}; pem_key_cert_pairs.push_back(p); } grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create_ex( @@ -128,7 +128,7 @@ std::shared_ptr<ServerCredentials> SslServerCredentials( namespace experimental { std::shared_ptr<ServerCredentials> AltsServerCredentials( - const AltsServerCredentialsOptions& /* options */) { + const AltsServerCredentialsOptions& /* options */) { grpc_alts_credentials_options* c_options = grpc_alts_credentials_server_options_create(); grpc_server_credentials* c_creds = @@ -144,12 +144,12 @@ std::shared_ptr<ServerCredentials> LocalServerCredentials( new SecureServerCredentials(grpc_local_server_credentials_create(type))); } -std::shared_ptr<ServerCredentials> TlsServerCredentials( +std::shared_ptr<ServerCredentials> TlsServerCredentials( const grpc::experimental::TlsCredentialsOptions& options) { grpc::GrpcLibraryCodegen init; return std::shared_ptr<ServerCredentials>(new SecureServerCredentials( grpc_tls_server_credentials_create(options.c_credentials_options()))); -} - +} + } // namespace experimental } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h index 9e3fb3f9eb..945bb445cc 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h @@ -22,16 +22,16 @@ #include <memory> #include <grpcpp/security/server_credentials.h> -#include <grpcpp/security/tls_credentials_options.h> +#include <grpcpp/security/tls_credentials_options.h> #include <grpc/grpc_security.h> #include "src/cpp/server/thread_pool_interface.h" namespace grpc { - -class SecureServerCredentials; - + +class SecureServerCredentials; + class AuthMetadataProcessorAyncWrapper final { public: static void Destroy(void* wrapper); @@ -42,11 +42,11 @@ class AuthMetadataProcessorAyncWrapper final { AuthMetadataProcessorAyncWrapper( const std::shared_ptr<AuthMetadataProcessor>& processor) - : processor_(processor) { - if (processor && processor->IsBlocking()) { - thread_pool_.reset(CreateDefaultThreadPool()); - } - } + : processor_(processor) { + if (processor && processor->IsBlocking()) { + thread_pool_.reset(CreateDefaultThreadPool()); + } + } private: void InvokeProcessor(grpc_auth_context* context, const grpc_metadata* md, @@ -67,11 +67,11 @@ class SecureServerCredentials final : public ServerCredentials { int AddPortToServer(const TString& addr, grpc_server* server) override; void SetAuthMetadataProcessor( - const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override; + const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override; private: grpc_server_credentials* creds_; - std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; + std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; }; } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 0cc00b365f..92324cb5e6 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -27,9 +27,9 @@ #include <utility> #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" -#include "src/cpp/server/external_connection_acceptor_impl.h" +#include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/thread_pool_interface.h" namespace grpc { @@ -47,10 +47,10 @@ ServerBuilder::ServerBuilder() : max_receive_message_size_(INT_MIN), max_send_message_size_(INT_MIN), sync_server_settings_(SyncServerSettings()), - resource_quota_(nullptr) { + resource_quota_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); - for (const auto& value : *g_plugin_factory_list) { - plugins_.emplace_back(value()); + for (const auto& value : *g_plugin_factory_list) { + plugins_.emplace_back(value()); } // all compression algorithms enabled by default. @@ -91,9 +91,9 @@ ServerBuilder& ServerBuilder::RegisterService(const TString& addr, ServerBuilder& ServerBuilder::RegisterAsyncGenericService( AsyncGenericService* service) { - if (generic_service_ || callback_generic_service_) { + if (generic_service_ || callback_generic_service_) { gpr_log(GPR_ERROR, - "Adding multiple generic services is unsupported for now. " + "Adding multiple generic services is unsupported for now. " "Dropping the service %p", (void*)service); } else { @@ -116,33 +116,33 @@ ServerBuilder& ServerBuilder::RegisterCallbackGenericService( return *this; } #else -ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( +ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( experimental::CallbackGenericService* service) { - if (builder_->generic_service_ || builder_->callback_generic_service_) { - gpr_log(GPR_ERROR, - "Adding multiple generic services is unsupported for now. " - "Dropping the service %p", - (void*)service); - } else { - builder_->callback_generic_service_ = service; - } - return *builder_; -} + if (builder_->generic_service_ || builder_->callback_generic_service_) { + gpr_log(GPR_ERROR, + "Adding multiple generic services is unsupported for now. " + "Dropping the service %p", + (void*)service); + } else { + builder_->callback_generic_service_ = service; + } + return *builder_; +} #endif - -std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> -ServerBuilder::experimental_type::AddExternalConnectionAcceptor( - experimental_type::ExternalConnectionType type, - std::shared_ptr<ServerCredentials> creds) { + +std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> +ServerBuilder::experimental_type::AddExternalConnectionAcceptor( + experimental_type::ExternalConnectionType type, + std::shared_ptr<ServerCredentials> creds) { TString name_prefix("external:"); - char count_str[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str); - builder_->acceptors_.emplace_back( - std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>( - name_prefix.append(count_str), type, creds)); - return builder_->acceptors_.back()->GetAcceptor(); -} - + char count_str[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str); + builder_->acceptors_.emplace_back( + std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>( + name_prefix.append(count_str), type, creds)); + return builder_->acceptors_.back()->GetAcceptor(); +} + ServerBuilder& ServerBuilder::SetOption( std::unique_ptr<ServerBuilderOption> option) { options_.push_back(std::move(option)); @@ -217,8 +217,8 @@ ServerBuilder& ServerBuilder::AddListeningPort( return *this; } -std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { - grpc::ChannelArguments args; +std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { + grpc::ChannelArguments args; if (max_receive_message_size_ >= -1) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } @@ -252,16 +252,16 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { // == Determine if the server has any syncrhonous methods == bool has_sync_methods = false; - for (const auto& value : services_) { - if (value->service->has_synchronous_methods()) { + for (const auto& value : services_) { + if (value->service->has_synchronous_methods()) { has_sync_methods = true; break; } } if (!has_sync_methods) { - for (const auto& value : plugins_) { - if (value->has_sync_methods()) { + for (const auto& value : plugins_) { + if (value->has_sync_methods()) { has_sync_methods = true; break; } @@ -280,26 +280,26 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { std::make_shared< std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>()); - bool has_frequently_polled_cqs = false; - for (const auto& cq : cqs_) { - if (cq->IsFrequentlyPolled()) { - has_frequently_polled_cqs = true; - break; - } - } - - // == Determine if the server has any callback methods == - bool has_callback_methods = false; - for (const auto& service : services_) { - if (service->service->has_callback_methods()) { - has_callback_methods = true; - has_frequently_polled_cqs = true; - break; + bool has_frequently_polled_cqs = false; + for (const auto& cq : cqs_) { + if (cq->IsFrequentlyPolled()) { + has_frequently_polled_cqs = true; + break; } } - const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs; + // == Determine if the server has any callback methods == + bool has_callback_methods = false; + for (const auto& service : services_) { + if (service->service->has_callback_methods()) { + has_callback_methods = true; + has_frequently_polled_cqs = true; + break; + } + } + const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs; + if (has_sync_methods) { grpc_cq_polling_type polling_type = is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; @@ -328,7 +328,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { gpr_log(GPR_INFO, "Callback server."); } - std::unique_ptr<grpc::Server> server(new grpc::Server( + std::unique_ptr<grpc::Server> server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, std::move(acceptors_), resource_quota_, @@ -343,10 +343,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { for (const auto& cq : *sync_server_cqs) { grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); - has_frequently_polled_cqs = true; + has_frequently_polled_cqs = true; } - if (has_callback_methods || callback_generic_service_ != nullptr) { + if (has_callback_methods || callback_generic_service_ != nullptr) { auto* cq = server->CallbackCQ(); grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); } @@ -363,29 +363,29 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { cq->RegisterServer(server.get()); } - if (!has_frequently_polled_cqs) { + if (!has_frequently_polled_cqs) { gpr_log(GPR_ERROR, "At least one of the completion queues must be frequently polled"); return nullptr; } - for (const auto& value : services_) { - if (!server->RegisterService(value->host.get(), value->service)) { + for (const auto& value : services_) { + if (!server->RegisterService(value->host.get(), value->service)) { return nullptr; } } - for (const auto& value : plugins_) { - value->InitServer(initializer); + for (const auto& value : plugins_) { + value->InitServer(initializer); } if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); - } else if (callback_generic_service_) { - server->RegisterCallbackGenericService(callback_generic_service_); + } else if (callback_generic_service_) { + server->RegisterCallbackGenericService(callback_generic_service_); } else { - for (const auto& value : services_) { - if (value->service->has_generic_methods()) { + for (const auto& value : services_) { + if (value->service->has_generic_methods()) { gpr_log(GPR_ERROR, "Some methods were marked generic but there is no " "generic service registered."); @@ -394,22 +394,22 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { } } - for (auto& port : ports_) { - int r = server->AddListeningPort(port.addr, port.creds.get()); + for (auto& port : ports_) { + int r = server->AddListeningPort(port.addr, port.creds.get()); if (!r) { server->Shutdown(); return nullptr; } - if (port.selected_port != nullptr) { - *port.selected_port = r; + if (port.selected_port != nullptr) { + *port.selected_port = r; } } auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; server->Start(cqs_data, cqs_.size()); - for (const auto& value : plugins_) { - value->Finish(initializer); + for (const auto& value : plugins_) { + value->Finish(initializer); } return server; diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc index 40aef8e735..04a776fac1 100644 --- a/contrib/libs/grpc/src/cpp/server/server_callback.cc +++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc @@ -1,29 +1,29 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + #include <grpcpp/impl/codegen/server_callback.h> - -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/executor.h" - + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" + namespace grpc { -namespace internal { - +namespace internal { + void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { if (inline_ondone) { CallOnDone(); @@ -50,18 +50,18 @@ void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { } } -void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { - if (reactor->InternalInlineable()) { - reactor->OnCancel(); - } else { +void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { + if (reactor->InternalInlineable()) { + reactor->OnCancel(); + } else { // Ref to make sure that the closure executes before the whole call gets // destructed, and Unref within the closure. - Ref(); - grpc_core::ExecCtx exec_ctx; + Ref(); + grpc_core::ExecCtx exec_ctx; struct ClosureWithArg { grpc_closure closure; - ServerCallbackCall* call; - ServerReactor* reactor; + ServerCallbackCall* call; + ServerReactor* reactor; ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg) : call(call_arg), reactor(reactor_arg) { GRPC_CLOSURE_INIT(&closure, @@ -74,11 +74,11 @@ void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { }, this, grpc_schedule_on_exec_ctx); } - }; + }; ClosureWithArg* arg = new ClosureWithArg(this, reactor); grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); - } -} - -} // namespace internal + } +} + +} // namespace internal } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index c2a911c7f7..5cf50d9266 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -19,7 +19,7 @@ #include <cstdlib> #include <sstream> -#include <type_traits> +#include <type_traits> #include <utility> #include <grpc/grpc.h> @@ -29,10 +29,10 @@ #include <grpcpp/completion_queue.h> #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> -#include <grpcpp/impl/codegen/byte_buffer.h> +#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> -#include <grpcpp/impl/codegen/method_handler.h> +#include <grpcpp/impl/codegen/method_handler.h> #include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_service_method.h> @@ -49,7 +49,7 @@ #include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" -#include "src/cpp/server/external_connection_acceptor_impl.h" +#include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -68,8 +68,8 @@ namespace { class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} - void PreSynchronousRequest(ServerContext* /*context*/) override {} - void PostSynchronousRequest(ServerContext* /*context*/) override {} + void PreSynchronousRequest(ServerContext* /*context*/) override {} + void PostSynchronousRequest(ServerContext* /*context*/) override {} }; std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; @@ -83,12 +83,12 @@ void InitGlobalCallbacks() { class ShutdownTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } }; class DummyTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } }; class UnimplementedAsyncRequestContext { @@ -110,187 +110,187 @@ using ::grpc::experimental::GenericCallbackServerContext; } // namespace -ServerInterface::BaseAsyncRequest::BaseAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : server_(server), - context_(context), - stream_(stream), - call_cq_(call_cq), - notification_cq_(notification_cq), - tag_(tag), - delete_on_finalize_(delete_on_finalize), - call_(nullptr), - done_intercepting_(false) { - /* Set up interception state partially for the receive ops. call_wrapper_ is - * not filled at this point, but it will be filled before the interceptors are - * run. */ - interceptor_methods_.SetCall(&call_wrapper_); - interceptor_methods_.SetReverse(); - call_cq_->RegisterAvalanching(); // This op will trigger more ops -} - -ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { - call_cq_->CompleteAvalanching(); -} - -bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, - bool* status) { - if (done_intercepting_) { - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; - } - context_->set_call(call_); - context_->cq_ = call_cq_; - if (call_wrapper_.call() == nullptr) { - // Fill it since it is empty. - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); - } - - // just the pointers inside call are copied here - stream_->BindCall(&call_wrapper_); - - if (*status && call_ && call_wrapper_.server_rpc_info()) { - done_intercepting_ = true; - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueFinalizeResultAfterInterception(); })) { - // There are no interceptors to run. Continue - } else { - // There were interceptors to be run, so - // ContinueFinalizeResultAfterInterception will be run when interceptors - // are done. - return false; - } - } - if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - } - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; -} - -void ServerInterface::BaseAsyncRequest:: - ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - // Queue a tag which will be returned immediately - grpc_core::ExecCtx exec_ctx; - grpc_cq_begin_op(notification_cq_->cq(), this); - grpc_cq_end_op( - notification_cq_->cq(), this, GRPC_ERROR_NONE, - [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, - nullptr, new grpc_cq_completion()); -} - -ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, const char* name, - internal::RpcMethod::RpcType type) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - true), - name_(name), - type_(type) {} - -void ServerInterface::RegisteredAsyncRequest::IssueRequest( - void* registered_method, grpc_byte_buffer** payload, - ServerCompletionQueue* notification_cq) { - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_server_request_registered_call( - server_->server(), registered_method, &call_, - &context_->deadline_, context_->client_metadata_.arr(), - payload, call_cq_->cq(), notification_cq->cq(), - this) == GRPC_CALL_OK); -} - -ServerInterface::GenericAsyncRequest::GenericAsyncRequest( - ServerInterface* server, GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - delete_on_finalize) { - grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, - context->client_metadata_.arr(), - call_cq->cq(), notification_cq->cq(), - this) == GRPC_CALL_OK); -} - -bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, - bool* status) { - // If we are done intercepting, there is nothing more for us to do - if (done_intercepting_) { - return BaseAsyncRequest::FinalizeResult(tag, status); - } - // TODO(yangg) remove the copy here. - if (*status) { - static_cast<GenericServerContext*>(context_)->method_ = - StringFromCopiedSlice(call_details_.method); - static_cast<GenericServerContext*>(context_)->host_ = - StringFromCopiedSlice(call_details_.host); - context_->deadline_ = call_details_.deadline; - } - grpc_slice_unref(call_details_.method); - grpc_slice_unref(call_details_.host); - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info( - static_cast<GenericServerContext*>(context_)->method_.c_str(), - internal::RpcMethod::BIDI_STREAMING, - *server_->interceptor_creators())); - return BaseAsyncRequest::FinalizeResult(tag, status); -} - -namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { - public: - ShutdownCallback() { - functor_run = &ShutdownCallback::Run; - // Set inlineable to true since this callback is trivial and thus does not - // need to be run from the executor (triggering a thread hop). This should - // only be used by internal callbacks like this and not by user application - // code. - inlineable = true; - } - // TakeCQ takes ownership of the cq into the shutdown callback - // so that the shutdown callback will be responsible for destroying it - void TakeCQ(CompletionQueue* cq) { cq_ = cq; } - - // The Run function will get invoked by the completion queue library - // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { - auto* callback = static_cast<ShutdownCallback*>(cb); - delete callback->cq_; - delete callback; - } - - private: - CompletionQueue* cq_ = nullptr; -}; -} // namespace - +ServerInterface::BaseAsyncRequest::BaseAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + notification_cq_(notification_cq), + tag_(tag), + delete_on_finalize_(delete_on_finalize), + call_(nullptr), + done_intercepting_(false) { + /* Set up interception state partially for the receive ops. call_wrapper_ is + * not filled at this point, but it will be filled before the interceptors are + * run. */ + interceptor_methods_.SetCall(&call_wrapper_); + interceptor_methods_.SetReverse(); + call_cq_->RegisterAvalanching(); // This op will trigger more ops +} + +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (done_intercepting_) { + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; + } + context_->set_call(call_); + context_->cq_ = call_cq_; + if (call_wrapper_.call() == nullptr) { + // Fill it since it is empty. + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); + } + + // just the pointers inside call are copied here + stream_->BindCall(&call_wrapper_); + + if (*status && call_ && call_wrapper_.server_rpc_info()) { + done_intercepting_ = true; + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueFinalizeResultAfterInterception(); })) { + // There are no interceptors to run. Continue + } else { + // There were interceptors to be run, so + // ContinueFinalizeResultAfterInterception will be run when interceptors + // are done. + return false; + } + } + if (*status && call_) { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + } + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; +} + +void ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception() { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + // Queue a tag which will be returned immediately + grpc_core::ExecCtx exec_ctx; + grpc_cq_begin_op(notification_cq_->cq(), this); + grpc_cq_end_op( + notification_cq_->cq(), this, GRPC_ERROR_NONE, + [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; }, + nullptr, new grpc_cq_completion()); +} + +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, const char* name, + internal::RpcMethod::RpcType type) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + true), + name_(name), + type_(type) {} + +void ServerInterface::RegisteredAsyncRequest::IssueRequest( + void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq) { + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_server_request_registered_call( + server_->server(), registered_method, &call_, + &context_->deadline_, context_->client_metadata_.arr(), + payload, call_cq_->cq(), notification_cq->cq(), + this) == GRPC_CALL_OK); +} + +ServerInterface::GenericAsyncRequest::GenericAsyncRequest( + ServerInterface* server, GenericServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + delete_on_finalize) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_, + context->client_metadata_.arr(), + call_cq->cq(), notification_cq->cq(), + this) == GRPC_CALL_OK); +} + +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, + bool* status) { + // If we are done intercepting, there is nothing more for us to do + if (done_intercepting_) { + return BaseAsyncRequest::FinalizeResult(tag, status); + } + // TODO(yangg) remove the copy here. + if (*status) { + static_cast<GenericServerContext*>(context_)->method_ = + StringFromCopiedSlice(call_details_.method); + static_cast<GenericServerContext*>(context_)->host_ = + StringFromCopiedSlice(call_details_.host); + context_->deadline_ = call_details_.deadline; + } + grpc_slice_unref(call_details_.method); + grpc_slice_unref(call_details_.host); + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), + context_->set_server_rpc_info( + static_cast<GenericServerContext*>(context_)->method_.c_str(), + internal::RpcMethod::BIDI_STREAMING, + *server_->interceptor_creators())); + return BaseAsyncRequest::FinalizeResult(tag, status); +} + +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { + functor_run = &ShutdownCallback::Run; + // Set inlineable to true since this callback is trivial and thus does not + // need to be run from the executor (triggering a thread hop). This should + // only be used by internal callbacks like this and not by user application + // code. + inlineable = true; + } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + /// Use private inheritance rather than composition only to establish order /// of construction, since the public base class should be constructed after the /// elements belonging to the private base class are constructed. This is not /// possible using true composition. class Server::UnimplementedAsyncRequest final - : private grpc::UnimplementedAsyncRequestContext, + : private grpc::UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: UnimplementedAsyncRequest(ServerInterface* server, @@ -300,25 +300,25 @@ class Server::UnimplementedAsyncRequest final bool FinalizeResult(void** tag, bool* status) override; - grpc::ServerContext* context() { return &server_context_; } - grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + grpc::ServerContext* context() { return &server_context_; } + grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } }; /// UnimplementedAsyncResponse should not post user-visible completions to the /// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus> { + : public grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - if (grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, - status)) { + if (grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, + status)) { delete this; } else { // The tag was swallowed due to interception. We will see it again. @@ -330,16 +330,16 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { +class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: - SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) + SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), method_tag_(method_tag), in_flight_(false), - has_request_payload_(method->method_type() == - grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -395,7 +395,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { } } - bool FinalizeResult(void** /*tag*/, bool* status) override { + bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); cq_ = nullptr; @@ -450,8 +450,8 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); if (has_request_payload_) { @@ -459,11 +459,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_, nullptr); + &request_status_, nullptr); request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); interceptor_methods_.SetRecvMessage(request_, nullptr); } @@ -482,40 +482,40 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr, nullptr)); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_, request_status_, nullptr, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); cq_.Shutdown(); - grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ - grpc::DummyTag ignored_tag; + grpc::DummyTag ignored_tag; GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); } delete this; } private: - grpc::CompletionQueue cq_; - grpc::ServerContext ctx_; + grpc::CompletionQueue cq_; + grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; - grpc::Status request_status_; - grpc::internal::RpcServiceMethod* const method_; - grpc::internal::Call call_; + grpc::Status request_status_; + grpc::internal::RpcServiceMethod* const method_; + grpc::internal::Call call_; Server* server_; std::shared_ptr<GlobalCallbacks> global_callbacks_; bool resources_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: - grpc::internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; void* const method_tag_; bool in_flight_; const bool has_request_payload_; @@ -527,14 +527,14 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { grpc_completion_queue* cq_; }; -template <class ServerContextType> +template <class ServerContextType> class Server::CallbackRequest final : public grpc::internal::CompletionQueueTag { - public: + public: static_assert( std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, "ServerContextType must be derived from CallbackServerContext"); - + // For codegen services, the value of method represents the defined // characteristics of the method being requested. For generic services, method // is nullptr since these services don't have pre-defined methods. @@ -578,26 +578,26 @@ class Server::CallbackRequest final server_->UnrefWithPossibleNotify(); } - // Needs specialization to account for different processing of metadata - // in generic API - bool FinalizeResult(void** tag, bool* status) override; + // Needs specialization to account for different processing of metadata + // in generic API + bool FinalizeResult(void** tag, bool* status) override; private: - // method_name needs to be specialized between named method and generic - const char* method_name() const; - + // method_name needs to be specialized between named method and generic + const char* method_name() const; + class CallbackCallTag : public grpc_experimental_completion_queue_functor { public: - CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) - : req_(req) { + CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) + : req_(req) { functor_run = &CallbackCallTag::StaticRun; - // Set inlineable to true since this callback is internally-controlled - // without taking any locks, and thus does not need to be run from the - // executor (which triggers a thread hop). This should only be used by - // internal callbacks like this and not by user application code. The work - // here is actually non-trivial, but there is no chance of having user - // locks conflict with each other so it's ok to run inlined. - inlineable = true; + // Set inlineable to true since this callback is internally-controlled + // without taking any locks, and thus does not need to be run from the + // executor (which triggers a thread hop). This should only be used by + // internal callbacks like this and not by user application code. The work + // here is actually non-trivial, but there is no chance of having user + // locks conflict with each other so it's ok to run inlined. + inlineable = true; } // force_run can not be performed on a tag if operations using this tag @@ -606,8 +606,8 @@ class Server::CallbackRequest final void force_run(bool ok) { Run(ok); } private: - Server::CallbackRequest<ServerContextType>* req_; - grpc::internal::Call* call_; + Server::CallbackRequest<ServerContextType>* req_; + grpc::internal::Call* call_; static void StaticRun(grpc_experimental_completion_queue_functor* cb, int ok) { @@ -634,35 +634,35 @@ class Server::CallbackRequest final req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call - call_ = - new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) - grpc::internal::Call( - req_->call_, req_->server_, req_->cq_, - req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( - req_->method_name(), - (req_->method_ != nullptr) - ? req_->method_->method_type() - : grpc::internal::RpcMethod::BIDI_STREAMING, - req_->server_->interceptor_creators_)); + call_ = + new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) + grpc::internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_name(), + (req_->method_ != nullptr) + ? req_->method_->method_type() + : grpc::internal::RpcMethod::BIDI_STREAMING, + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( &req_->ctx_.client_metadata_); if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE req_->request_ = req_->method_->handler()->Deserialize( - req_->call_, req_->request_payload_, &req_->request_status_, - &req_->handler_data_); + req_->call_, req_->request_payload_, &req_->request_status_, + &req_->handler_data_); req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); } @@ -675,11 +675,11 @@ class Server::CallbackRequest final } } void ContinueRunAfterInterception() { - auto* handler = (req_->method_ != nullptr) - ? req_->method_->handler() - : req_->server_->generic_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, + auto* handler = (req_->method_ != nullptr) + ? req_->method_->handler() + : req_->server_->generic_handler_.get(); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); } }; @@ -694,61 +694,61 @@ class Server::CallbackRequest final } Server* const server_; - grpc::internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; const bool has_request_payload_; grpc_byte_buffer* request_payload_ = nullptr; void* request_ = nullptr; void* handler_data_ = nullptr; - grpc::Status request_status_; + grpc::Status request_status_; grpc_call_details* const call_details_ = nullptr; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; CallbackCallTag tag_; - ServerContextType ctx_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + ServerContextType ctx_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; -template <> +template <> bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult( void** /*tag*/, bool* /*status*/) { - return false; -} - -template <> + return false; +} + +template <> bool Server::CallbackRequest< grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/, bool* status) { - if (*status) { + if (*status) { deadline_ = call_details_->deadline; - // TODO(yangg) remove the copy here - ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); - ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); - } - grpc_slice_unref(call_details_->method); - grpc_slice_unref(call_details_->host); - return false; -} - -template <> + // TODO(yangg) remove the copy here + ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); + ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); + } + grpc_slice_unref(call_details_->method); + grpc_slice_unref(call_details_->host); + return false; +} + +template <> const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() const { - return method_->name(); -} - -template <> -const char* Server::CallbackRequest< + return method_->name(); +} + +template <> +const char* Server::CallbackRequest< grpc::GenericCallbackServerContext>::method_name() const { - return ctx_.method().c_str(); -} - + return ctx_.method().c_str(); +} + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers -class Server::SyncRequestThreadManager : public grpc::ThreadManager { +class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: - SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, + SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, grpc_resource_quota* rq, int min_pollers, int max_pollers, int cq_timeout_msec) @@ -767,11 +767,11 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); switch (server_cq_->AsyncNext(tag, ok, deadline)) { - case grpc::CompletionQueue::TIMEOUT: + case grpc::CompletionQueue::TIMEOUT: return TIMEOUT; - case grpc::CompletionQueue::SHUTDOWN: + case grpc::CompletionQueue::SHUTDOWN: return SHUTDOWN; - case grpc::CompletionQueue::GOT_EVENT: + case grpc::CompletionQueue::GOT_EVENT: return WORK_FOUND; } @@ -806,15 +806,15 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { // object } - void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { + void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new grpc::internal::RpcServiceMethod( - "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, - new grpc::internal::UnknownMethodHandler)); + unknown_method_.reset(new grpc::internal::RpcServiceMethod( + "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, + new grpc::internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -847,9 +847,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void Start() { if (!sync_requests_.empty()) { - for (const auto& value : sync_requests_) { - value->SetupRequest(); - value->Request(server_->c_server(), server_cq_->cq()); + for (const auto& value : sync_requests_) { + value->SetupRequest(); + value->Request(server_->c_server(), server_cq_->cq()); } Initialize(); // ThreadManager's Initialize() @@ -858,27 +858,27 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { private: Server* server_; - grpc::CompletionQueue* server_cq_; + grpc::CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; -static grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( grpc::ChannelArguments* args, - std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> + std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, - std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> - acceptors, + std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> + acceptors, grpc_resource_quota* server_rq, std::vector< - std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> interceptor_creators) - : acceptors_(std::move(acceptors)), - interceptor_creators_(std::move(interceptor_creators)), + : acceptors_(std::move(acceptors)), + interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(INT_MIN), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -888,8 +888,8 @@ Server::Server( server_initializer_(new ServerInitializer(this)), health_check_service_disabled_(false) { g_gli_initializer.summon(); - gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); - global_callbacks_ = grpc::g_callbacks; + gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); + global_callbacks_ = grpc::g_callbacks; global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { @@ -912,22 +912,22 @@ Server::Server( } } - for (auto& acceptor : acceptors_) { - acceptor->SetToChannelArgs(args); - } - + for (auto& acceptor : acceptors_) { + acceptor->SetToChannelArgs(args); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); for (size_t i = 0; i < channel_args.num_args; i++) { - if (0 == strcmp(channel_args.args[i].key, - grpc::kHealthCheckServiceInterfaceArg)) { + if (0 == strcmp(channel_args.args[i].key, + grpc::kHealthCheckServiceInterfaceArg)) { if (channel_args.args[i].value.pointer.p == nullptr) { health_check_service_disabled_ = true; } else { - health_check_service_.reset( - static_cast<grpc::HealthCheckServiceInterface*>( - channel_args.args[i].value.pointer.p)); + health_check_service_.reset( + static_cast<grpc::HealthCheckServiceInterface*>( + channel_args.args[i].value.pointer.p)); } } if (0 == @@ -940,19 +940,19 @@ Server::Server( Server::~Server() { { - grpc::internal::ReleasableMutexLock lock(&mu_); + grpc::internal::ReleasableMutexLock lock(&mu_); if (started_ && !shutdown_) { - lock.Unlock(); + lock.Unlock(); Shutdown(); } else if (!started_) { // Shutdown the completion queues - for (const auto& value : sync_req_mgrs_) { - value->Shutdown(); - } - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; + for (const auto& value : sync_req_mgrs_) { + value->Shutdown(); } + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + callback_cq_ = nullptr; + } } } // Destroy health check service before we destroy the C server so that @@ -963,43 +963,43 @@ Server::~Server() { } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { - GPR_ASSERT(!grpc::g_callbacks); + GPR_ASSERT(!grpc::g_callbacks); GPR_ASSERT(callbacks); - grpc::g_callbacks.reset(callbacks); + grpc::g_callbacks.reset(callbacks); } grpc_server* Server::c_server() { return server_; } -std::shared_ptr<grpc::Channel> Server::InProcessChannel( - const grpc::ChannelArguments& args) { +std::shared_ptr<grpc::Channel> Server::InProcessChannel( + const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); - return grpc::CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), - std::vector<std::unique_ptr< - grpc::experimental::ClientInterceptorFactoryInterface>>()); + std::vector<std::unique_ptr< + grpc::experimental::ClientInterceptorFactoryInterface>>()); } -std::shared_ptr<grpc::Channel> +std::shared_ptr<grpc::Channel> Server::experimental_type::InProcessChannelWithInterceptors( - const grpc::ChannelArguments& args, + const grpc::ChannelArguments& args, std::vector< - std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> + std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args = args.c_channel_args(); - return grpc::CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), std::move(interceptor_creators)); } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - grpc::internal::RpcServiceMethod* method) { + grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { - case grpc::internal::RpcMethod::NORMAL_RPC: - case grpc::internal::RpcMethod::SERVER_STREAMING: + case grpc::internal::RpcMethod::NORMAL_RPC: + case grpc::internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case grpc::internal::RpcMethod::CLIENT_STREAMING: - case grpc::internal::RpcMethod::BIDI_STREAMING: + case grpc::internal::RpcMethod::CLIENT_STREAMING: + case grpc::internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -1015,14 +1015,14 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { const char* method_name = nullptr; - for (const auto& method : service->methods_) { - if (method.get() == nullptr) { // Handled by generic service if any. + for (const auto& method : service->methods_) { + if (method.get() == nullptr) { // Handled by generic service if any. continue; } void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, - PayloadHandlingForMethod(method.get()), 0); + PayloadHandlingForMethod(method.get()), 0); if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -1032,9 +1032,9 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { if (method->handler() == nullptr) { // Async method without handler method->set_server_tag(method_registration_tag); } else if (method->api_type() == - grpc::internal::RpcServiceMethod::ApiType::SYNC) { - for (const auto& value : sync_req_mgrs_) { - value->AddSyncMethod(method.get(), method_registration_tag); + grpc::internal::RpcServiceMethod::ApiType::SYNC) { + for (const auto& value : sync_req_mgrs_) { + value->AddSyncMethod(method.get(), method_registration_tag); } } else { has_callback_methods_ = true; @@ -1064,32 +1064,32 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { return true; } -void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { +void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; - has_async_generic_service_ = true; + has_async_generic_service_ = true; } -void Server::RegisterCallbackGenericService( +void Server::RegisterCallbackGenericService( grpc::CallbackGenericService* service) { - GPR_ASSERT( - service->server_ == nullptr && - "Can only register a callback generic service against one server."); - service->server_ = this; - has_callback_generic_service_ = true; - generic_handler_.reset(service->Handler()); - + GPR_ASSERT( + service->server_ == nullptr && + "Can only register a callback generic service against one server."); + service->server_ = this; + has_callback_generic_service_ = true; + generic_handler_.reset(service->Handler()); + grpc::CompletionQueue* cq = CallbackCQ(); server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] { grpc_core::Server::BatchCallAllocation result; new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); return result; }); -} - +} + int Server::AddListeningPort(const TString& addr, - grpc::ServerCredentials* creds) { + grpc::ServerCredentials* creds) { GPR_ASSERT(!started_); int port = creds->AddPortToServer(addr, server_); global_callbacks_->AddPort(this, addr, creds, port); @@ -1121,46 +1121,46 @@ void Server::UnrefAndWaitLocked() { shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); } -void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { +void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); started_ = true; // Only create default health check service when user did not provide an // explicit one. - grpc::ServerCompletionQueue* health_check_cq = nullptr; - grpc::DefaultHealthCheckService::HealthCheckServiceImpl* + grpc::ServerCompletionQueue* health_check_cq = nullptr; + grpc::DefaultHealthCheckService::HealthCheckServiceImpl* default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && - grpc::DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new grpc::DefaultHealthCheckService; + grpc::DefaultHealthCheckServiceEnabled()) { + auto* default_hc_service = new grpc::DefaultHealthCheckService; health_check_service_.reset(default_hc_service); // We create a non-polling CQ to avoid impacting application // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = new grpc::ServerCompletionQueue( - GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + health_check_cq = new grpc::ServerCompletionQueue( + GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = default_hc_service->GetHealthCheckService( - std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); + std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq)); RegisterService(nullptr, default_health_check_service_impl); } - for (auto& acceptor : acceptors_) { - acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); - } - - // If this server uses callback methods, then create a callback generic - // service to handle any unimplemented methods using the default reactor - // creator + for (auto& acceptor : acceptors_) { + acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); + } + + // If this server uses callback methods, then create a callback generic + // service to handle any unimplemented methods using the default reactor + // creator if (has_callback_methods_ && !has_callback_generic_service_) { unimplemented_service_.reset(new grpc::CallbackGenericService); - RegisterCallbackGenericService(unimplemented_service_.get()); - } - + RegisterCallbackGenericService(unimplemented_service_.get()); + } + #ifndef NDEBUG for (size_t i = 0; i < num_cqs; i++) { cq_list_.push_back(cqs[i]); @@ -1169,9 +1169,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); - if (!has_async_generic_service_ && !has_callback_generic_service_) { - for (const auto& value : sync_req_mgrs_) { - value->AddUnknownSyncMethod(); + if (!has_async_generic_service_ && !has_callback_generic_service_) { + for (const auto& value : sync_req_mgrs_) { + value->AddUnknownSyncMethod(); } for (size_t i = 0; i < num_cqs; i++) { @@ -1188,50 +1188,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // server CQs), make sure that we have a ResourceExhausted handler // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset( - new grpc::internal::ResourceExhaustedHandler); + resource_exhausted_handler_.reset( + new grpc::internal::ResourceExhaustedHandler); } - for (const auto& value : sync_req_mgrs_) { - value->Start(); + for (const auto& value : sync_req_mgrs_) { + value->Start(); } if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } - - for (auto& acceptor : acceptors_) { - acceptor->Start(); - } + + for (auto& acceptor : acceptors_) { + acceptor->Start(); + } } void Server::ShutdownInternal(gpr_timespec deadline) { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); if (shutdown_) { return; } shutdown_ = true; - for (auto& acceptor : acceptors_) { - acceptor->Shutdown(); - } - + for (auto& acceptor : acceptors_) { + acceptor->Shutdown(); + } + /// The completion queue to use for server shutdown completion notification - grpc::CompletionQueue shutdown_cq; - grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc::CompletionQueue shutdown_cq; + grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); shutdown_cq.Shutdown(); void* tag; bool ok; - grpc::CompletionQueue::NextStatus status = + grpc::CompletionQueue::NextStatus status = shutdown_cq.AsyncNext(&tag, &ok, deadline); // If this timed out, it means we are done with the grace period for a clean // shutdown. We should force a shutdown now by cancelling all inflight calls - if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { + if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { grpc_server_cancel_all_calls(server_); } // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has @@ -1239,25 +1239,25 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Shutdown all ThreadManagers. This will try to gracefully stop all the // threads in the ThreadManagers (once they process any inflight requests) - for (const auto& value : sync_req_mgrs_) { - value->Shutdown(); // ThreadManager's Shutdown() + for (const auto& value : sync_req_mgrs_) { + value->Shutdown(); // ThreadManager's Shutdown() } // Wait for threads in all ThreadManagers to terminate - for (const auto& value : sync_req_mgrs_) { - value->Wait(); + for (const auto& value : sync_req_mgrs_) { + value->Wait(); } // Drop the shutdown ref and wait for all other refs to drop as well. UnrefAndWaitLocked(); - // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it - // will delete itself at true shutdown. - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; - } - + // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it + // will delete itself at true shutdown. + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + callback_cq_ = nullptr; + } + // Drain the shutdown queue (if the previous call to AsyncNext() timed out // and we didn't remove the tag from the queue yet) while (shutdown_cq.Next(&tag, &ok)) { @@ -1265,7 +1265,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.Broadcast(); #ifndef NDEBUG // Unregister this server with the CQs passed into it by the user so that @@ -1278,14 +1278,14 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } void Server::Wait() { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); while (started_ && !shutdown_notified_) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.Wait(&mu_); } } -void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, - grpc::internal::Call* call) { +void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, + grpc::internal::Call* call) { ops->FillOps(call); } @@ -1310,19 +1310,19 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { - grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); - grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); + grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); + grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } -grpc::ServerInitializer* Server::initializer() { - return server_initializer_.get(); -} +grpc::ServerInitializer* Server::initializer() { + return server_initializer_.get(); +} -grpc::CompletionQueue* Server::CallbackCQ() { +grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered - grpc::internal::MutexLock l(&mu_); + grpc::internal::MutexLock l(&mu_); if (callback_cq_ != nullptr) { return callback_cq_; } @@ -1335,6 +1335,6 @@ grpc::CompletionQueue* Server::CallbackCQ() { shutdown_callback->TakeCQ(callback_cq_); return callback_cq_; -} +} } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index 458ac20d87..4a4181d39f 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -28,18 +28,18 @@ #include <grpc/support/log.h> #include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/completion_queue.h> -#include <grpcpp/support/server_callback.h> +#include <grpcpp/support/server_callback.h> #include <grpcpp/support/time.h> -#include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/surface/call.h" namespace grpc { // CompletionOp -class ServerContextBase::CompletionOp final +class ServerContextBase::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq @@ -47,7 +47,7 @@ class ServerContextBase::CompletionOp final CompletionOp(internal::Call* call, ::grpc::internal::ServerCallbackCall* callback_controller) : call_(*call), - callback_controller_(callback_controller), + callback_controller_(callback_controller), has_tag_(false), tag_(nullptr), core_cq_tag_(this), @@ -73,10 +73,10 @@ class ServerContextBase::CompletionOp final // This should always be arena allocated in the call, so override delete. // But this class is not trivially destructible, so must actually call delete // before allowing the arena to be freed - static void operator delete(void* /*ptr*/, std::size_t size) { - // Use size to avoid unused-parameter warning since assert seems to be - // compiled out and treated as unused in some gcc optimized versions. - (void)size; + static void operator delete(void* /*ptr*/, std::size_t size) { + // Use size to avoid unused-parameter warning since assert seems to be + // compiled out and treated as unused in some gcc optimized versions. + (void)size; assert(size == sizeof(CompletionOp)); } @@ -110,7 +110,7 @@ class ServerContextBase::CompletionOp final // RPC. This should set hijacking state for each of the ops. void SetHijackingState() override { /* Servers don't allow hijacking */ - GPR_ASSERT(false); + GPR_ASSERT(false); } /* Should be called after interceptors are done running */ @@ -122,17 +122,17 @@ class ServerContextBase::CompletionOp final done_intercepting_ = true; if (!has_tag_) { /* We don't have a tag to return. */ - Unref(); + Unref(); return; } /* Start a dummy op so that we can return the tag */ - GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, - nullptr) == GRPC_CALL_OK); + GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, + nullptr) == GRPC_CALL_OK); } private: bool CheckCancelledNoPluck() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); return finalized_ ? (cancelled_ != 0) : false; } @@ -141,16 +141,16 @@ class ServerContextBase::CompletionOp final bool has_tag_; void* tag_; void* core_cq_tag_; - grpc_core::RefCount refs_; - grpc_core::Mutex mu_; + grpc_core::RefCount refs_; + grpc_core::Mutex mu_; bool finalized_; int cancelled_; // This is an int (not bool) because it is passed to core bool done_intercepting_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; -void ServerContextBase::CompletionOp::Unref() { - if (refs_.Unref()) { +void ServerContextBase::CompletionOp::Unref() { + if (refs_.Unref()) { grpc_call* call = call_.call(); delete this; grpc_call_unref(call); @@ -166,14 +166,14 @@ void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); - // The following call_start_batch is internally-generated so no need for an - // explanatory log on failure. - GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, - nullptr) == GRPC_CALL_OK); + // The following call_start_batch is internally-generated so no need for an + // explanatory log on failure. + GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, + nullptr) == GRPC_CALL_OK); /* No interceptors to run here */ } -bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { +bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { // Decide whether to call the cancel callback within the lock bool call_cancel; @@ -201,8 +201,8 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { // Release the lock since we may call a callback and interceptors. } - if (call_cancel && callback_controller_ != nullptr) { - callback_controller_->MaybeCallOnCancel(); + if (call_cancel && callback_controller_ != nullptr) { + callback_controller_->MaybeCallOnCancel(); } /* Add interception point and run through interceptors */ interceptor_methods_.AddInterceptionHookPoint( @@ -213,26 +213,26 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { if (has_tag) { *tag = tag_; } - Unref(); + Unref(); return has_tag; } // There are interceptors to be run. Return false for now. return false; } -// ServerContextBase body +// ServerContextBase body ServerContextBase::ServerContextBase() : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {} -ServerContextBase::ServerContextBase(gpr_timespec deadline, +ServerContextBase::ServerContextBase(gpr_timespec deadline, grpc_metadata_array* arr) : deadline_(deadline) { std::swap(*client_metadata_.arr(), *arr); } -void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, - grpc_metadata_array* arr) { +void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, + grpc_metadata_array* arr) { deadline_ = deadline; std::swap(*client_metadata_.arr(), *arr); } @@ -244,9 +244,9 @@ ServerContextBase::~ServerContextBase() { if (rpc_info_) { rpc_info_->Unref(); } - if (default_reactor_used_.load(std::memory_order_relaxed)) { + if (default_reactor_used_.load(std::memory_order_relaxed)) { reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor(); - } + } } ServerContextBase::CallWrapper::~CallWrapper() { @@ -257,7 +257,7 @@ ServerContextBase::CallWrapper::~CallWrapper() { } } -void ServerContextBase::BeginCompletionOp( +void ServerContextBase::BeginCompletionOp( internal::Call* call, std::function<void(bool)> callback, ::grpc::internal::ServerCallbackCall* callback_controller) { GPR_ASSERT(!completion_op_); @@ -267,10 +267,10 @@ void ServerContextBase::BeginCompletionOp( grpc_call_ref(call->call()); completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) - CompletionOp(call, callback_controller); - if (callback_controller != nullptr) { - completion_tag_.Set(call->call(), std::move(callback), completion_op_, - true); + CompletionOp(call, callback_controller); + if (callback_controller != nullptr) { + completion_tag_.Set(call->call(), std::move(callback), completion_op_, + true); completion_op_->set_core_cq_tag(&completion_tag_); completion_op_->set_tag(completion_op_); } else if (has_notify_when_done_tag_) { @@ -293,7 +293,7 @@ void ServerContextBase::AddTrailingMetadata(const TString& key, trailing_metadata_.insert(std::make_pair(key, value)); } -void ServerContextBase::TryCancel() const { +void ServerContextBase::TryCancel() const { internal::CancelInterceptorBatchMethods cancel_methods; if (rpc_info_) { for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { @@ -308,7 +308,7 @@ void ServerContextBase::TryCancel() const { } } -bool ServerContextBase::IsCancelled() const { +bool ServerContextBase::IsCancelled() const { if (completion_tag_) { // When using callback API, this result is always valid. return completion_op_->CheckCancelledAsync(); @@ -322,7 +322,7 @@ bool ServerContextBase::IsCancelled() const { } } -void ServerContextBase::set_compression_algorithm( +void ServerContextBase::set_compression_algorithm( grpc_compression_algorithm algorithm) { compression_algorithm_ = algorithm; const char* algorithm_name = nullptr; @@ -345,12 +345,12 @@ TString ServerContextBase::peer() const { return peer; } -const struct census_context* ServerContextBase::census_context() const { +const struct census_context* ServerContextBase::census_context() const { return call_.call == nullptr ? nullptr : grpc_census_call_get_context(call_.call); } -void ServerContextBase::SetLoadReportingCosts( +void ServerContextBase::SetLoadReportingCosts( const std::vector<TString>& cost_data) { if (call_.call == nullptr) return; for (const auto& cost_datum : cost_data) { diff --git a/contrib/libs/grpc/src/cpp/server/server_posix.cc b/contrib/libs/grpc/src/cpp/server/server_posix.cc index c3d40d4fa2..85709b0c70 100644 --- a/contrib/libs/grpc/src/cpp/server/server_posix.cc +++ b/contrib/libs/grpc/src/cpp/server/server_posix.cc @@ -24,7 +24,7 @@ namespace grpc { #ifdef GPR_SUPPORT_CHANNELS_FROM_FD -void AddInsecureChannelFromFd(grpc::Server* server, int fd) { +void AddInsecureChannelFromFd(grpc::Server* server, int fd) { grpc_server_add_insecure_channel_from_fd(server->c_server(), nullptr, fd); } diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc index c8560aa81d..cac4b57eb2 100644 --- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc +++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc @@ -33,10 +33,10 @@ ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) thd_ = grpc_core::Thread( "grpcpp_sync_server", [](void* th) { static_cast<ThreadManager::WorkerThread*>(th)->Run(); }, - this, &created_); - if (!created_) { - gpr_log(GPR_ERROR, "Could not create grpc_sync_server worker-thread"); - } + this, &created_); + if (!created_) { + gpr_log(GPR_ERROR, "Could not create grpc_sync_server worker-thread"); + } } void ThreadManager::WorkerThread::Run() { @@ -63,7 +63,7 @@ ThreadManager::ThreadManager(const char* name, ThreadManager::~ThreadManager() { { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); GPR_ASSERT(num_threads_ == 0); } @@ -73,38 +73,38 @@ ThreadManager::~ThreadManager() { } void ThreadManager::Wait() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); while (num_threads_ != 0) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.Wait(&mu_); } } void ThreadManager::Shutdown() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); shutdown_ = true; } bool ThreadManager::IsShutdown() { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); return shutdown_; } int ThreadManager::GetMaxActiveThreadsSoFar() { - grpc_core::MutexLock list_lock(&list_mu_); + grpc_core::MutexLock list_lock(&list_mu_); return max_active_threads_sofar_; } void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { - grpc_core::MutexLock list_lock(&list_mu_); + grpc_core::MutexLock list_lock(&list_mu_); completed_threads_.push_back(thd); } { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); num_threads_--; if (num_threads_ == 0) { - shutdown_cv_.Signal(); + shutdown_cv_.Signal(); } } @@ -117,7 +117,7 @@ void ThreadManager::CleanupCompletedThreads() { { // swap out the completed threads list: allows other threads to clean up // more quickly - grpc_core::MutexLock lock(&list_mu_); + grpc_core::MutexLock lock(&list_mu_); completed_threads.swap(completed_threads_); } for (auto thd : completed_threads) delete thd; @@ -133,16 +133,16 @@ void ThreadManager::Initialize() { } { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; max_active_threads_sofar_ = min_pollers_; } for (int i = 0; i < min_pollers_; i++) { - WorkerThread* worker = new WorkerThread(this); - GPR_ASSERT(worker->created()); // Must be able to create the minimum - worker->Start(); + WorkerThread* worker = new WorkerThread(this); + GPR_ASSERT(worker->created()); // Must be able to create the minimum + worker->Start(); } } @@ -152,7 +152,7 @@ void ThreadManager::MainWorkLoop() { bool ok; WorkStatus work_status = PollForWork(&tag, &ok); - grpc_core::ReleasableMutexLock lock(&mu_); + grpc_core::ReleasableMutexLock lock(&mu_); // Reduce the number of pollers by 1 and check what happened with the poll num_pollers_--; bool done = false; @@ -179,40 +179,40 @@ void ThreadManager::MainWorkLoop() { max_active_threads_sofar_ = num_threads_; } // Drop lock before spawning thread to avoid contention - lock.Unlock(); - WorkerThread* worker = new WorkerThread(this); - if (worker->created()) { - worker->Start(); - } else { + lock.Unlock(); + WorkerThread* worker = new WorkerThread(this); + if (worker->created()) { + worker->Start(); + } else { // Get lock again to undo changes to poller/thread counters. grpc_core::MutexLock failure_lock(&mu_); - num_pollers_--; - num_threads_--; - resource_exhausted = true; - delete worker; - } + num_pollers_--; + num_threads_--; + resource_exhausted = true; + delete worker; + } } else if (num_pollers_ > 0) { // There is still at least some thread polling, so we can go on // even though we are below the number of pollers that we would // like to have (min_pollers_) - lock.Unlock(); + lock.Unlock(); } else { // There are no pollers to spare and we couldn't allocate // a new thread, so resources are exhausted! - lock.Unlock(); + lock.Unlock(); resource_exhausted = true; } } else { // There are a sufficient number of pollers available so we can do // the work and continue polling with our existing poller threads - lock.Unlock(); + lock.Unlock(); } // Lock is always released at this point - do the application work // or return resource exhausted if there is new work but we couldn't // get a thread in which to do it. DoWork(tag, ok, !resource_exhausted); // Take the lock again to check post conditions - lock.Lock(); + lock.Lock(); // If we're shutdown, we should finish at this point. if (shutdown_) done = true; break; diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h index 43f1fd5585..bfdaf3bfda 100644 --- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h +++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h @@ -24,7 +24,7 @@ #include <grpcpp/support/config.h> -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/resource_quota.h" @@ -54,7 +54,7 @@ class ThreadManager { // DoWork() // // If the return value is SHUTDOWN:, - // - ThreadManager WILL NOT call DoWork() and terminates the thread + // - ThreadManager WILL NOT call DoWork() and terminates the thread // // If the return value is TIMEOUT:, // - ThreadManager WILL NOT call DoWork() @@ -122,9 +122,9 @@ class ThreadManager { WorkerThread(ThreadManager* thd_mgr); ~WorkerThread(); - bool created() const { return created_; } - void Start() { thd_.Start(); } - + bool created() const { return created_; } + void Start() { thd_.Start(); } + private: // Calls thd_mgr_->MainWorkLoop() and once that completes, calls // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed @@ -132,10 +132,10 @@ class ThreadManager { ThreadManager* const thd_mgr_; grpc_core::Thread thd_; - bool created_; + bool created_; }; - // The main function in ThreadManager + // The main function in ThreadManager void MainWorkLoop(); void MarkAsCompleted(WorkerThread* thd); @@ -143,16 +143,16 @@ class ThreadManager { // Protects shutdown_, num_pollers_, num_threads_ and // max_active_threads_sofar_ - grpc_core::Mutex mu_; + grpc_core::Mutex mu_; bool shutdown_; - grpc_core::CondVar shutdown_cv_; + grpc_core::CondVar shutdown_cv_; // The resource user object to use when requesting quota to create threads // // Note: The user of this ThreadManager object must create grpc_resource_quota // object (that contains the actual max thread quota) and a grpc_resource_user - // object through which quota is requested whenever new threads need to be + // object through which quota is requested whenever new threads need to be // created grpc_resource_user* resource_user_; @@ -172,7 +172,7 @@ class ThreadManager { // ever set so far int max_active_threads_sofar_; - grpc_core::Mutex list_mu_; + grpc_core::Mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; diff --git a/contrib/libs/grpc/src/cpp/util/error_details.cc b/contrib/libs/grpc/src/cpp/util/error_details.cc index dfd3351be1..009dd97483 100644 --- a/contrib/libs/grpc/src/cpp/util/error_details.cc +++ b/contrib/libs/grpc/src/cpp/util/error_details.cc @@ -22,29 +22,29 @@ namespace grpc { -grpc::Status ExtractErrorDetails(const grpc::Status& from, - ::google::rpc::Status* to) { +grpc::Status ExtractErrorDetails(const grpc::Status& from, + ::google::rpc::Status* to) { if (to == nullptr) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, ""); + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, ""); } - if (!to->ParseFromString(TProtoStringType(from.error_details()))) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, ""); + if (!to->ParseFromString(TProtoStringType(from.error_details()))) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, ""); } - return grpc::Status::OK; + return grpc::Status::OK; } -grpc::Status SetErrorDetails(const ::google::rpc::Status& from, - grpc::Status* to) { +grpc::Status SetErrorDetails(const ::google::rpc::Status& from, + grpc::Status* to) { if (to == nullptr) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, ""); + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, ""); } - grpc::StatusCode code = grpc::StatusCode::UNKNOWN; - if (from.code() >= grpc::StatusCode::OK && - from.code() <= grpc::StatusCode::UNAUTHENTICATED) { - code = static_cast<grpc::StatusCode>(from.code()); + grpc::StatusCode code = grpc::StatusCode::UNKNOWN; + if (from.code() >= grpc::StatusCode::OK && + from.code() <= grpc::StatusCode::UNAUTHENTICATED) { + code = static_cast<grpc::StatusCode>(from.code()); } - *to = grpc::Status(code, from.message(), from.SerializeAsString()); - return grpc::Status::OK; + *to = grpc::Status(code, from.message(), from.SerializeAsString()); + return grpc::Status::OK; } } // namespace grpc |