aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp
diff options
context:
space:
mode:
authorneksard <neksard@yandex-team.ru>2022-02-10 16:45:33 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:33 +0300
commit1d9c550e7c38e051d7961f576013a482003a70d9 (patch)
treeb2cc84ee7850122e7ccf51d0ea21e4fa7e7a5685 /contrib/libs/grpc/src/cpp
parent8f7cf138264e0caa318144bf8a2c950e0b0a8593 (diff)
downloadydb-1d9c550e7c38e051d7961f576013a482003a70d9.tar.gz
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp')
-rwxr-xr-xcontrib/libs/grpc/src/cpp/README.md80
-rw-r--r--contrib/libs/grpc/src/cpp/client/channel_cc.cc138
-rw-r--r--contrib/libs/grpc/src/cpp/client/client_context.cc34
-rw-r--r--contrib/libs/grpc/src/cpp/client/client_interceptor.cc68
-rw-r--r--contrib/libs/grpc/src/cpp/client/create_channel.cc56
-rw-r--r--contrib/libs/grpc/src/cpp/client/create_channel_internal.cc8
-rw-r--r--contrib/libs/grpc/src/cpp/client/create_channel_internal.h6
-rw-r--r--contrib/libs/grpc/src/cpp/client/create_channel_posix.cc34
-rw-r--r--contrib/libs/grpc/src/cpp/client/insecure_credentials.cc16
-rw-r--r--contrib/libs/grpc/src/cpp/client/secure_credentials.cc44
-rw-r--r--contrib/libs/grpc/src/cpp/client/secure_credentials.h18
-rw-r--r--contrib/libs/grpc/src/cpp/common/alarm.cc78
-rw-r--r--contrib/libs/grpc/src/cpp/common/channel_arguments.cc2
-rw-r--r--contrib/libs/grpc/src/cpp/common/channel_filter.h34
-rw-r--r--contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc6
-rw-r--r--contrib/libs/grpc/src/cpp/common/core_codegen.cc22
-rw-r--r--contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc8
-rw-r--r--contrib/libs/grpc/src/cpp/common/secure_auth_context.cc32
-rw-r--r--contrib/libs/grpc/src/cpp/common/secure_auth_context.h12
-rw-r--r--contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc6
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc250
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h128
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc142
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc768
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h442
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/constants.h162
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h72
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc88
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc90
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc80
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc110
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc114
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h32
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/util.cc86
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc22
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_builder.cc46
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc612
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_context.cc216
-rw-r--r--contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc114
-rw-r--r--contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h92
40 files changed, 2184 insertions, 2184 deletions
diff --git a/contrib/libs/grpc/src/cpp/README.md b/contrib/libs/grpc/src/cpp/README.md
index 9405d38d927..967a0a43b7f 100755
--- a/contrib/libs/grpc/src/cpp/README.md
+++ b/contrib/libs/grpc/src/cpp/README.md
@@ -2,44 +2,44 @@
This directory contains the C++ implementation of gRPC.
-# To start using gRPC C++
+# To start using gRPC C++
This section describes how to add gRPC as a dependency to your C++ project.
-In the C++ world, there's no universally accepted standard for managing project dependencies.
-Therefore, gRPC supports several major build systems, which should satisfy most users.
+In the C++ world, there's no universally accepted standard for managing project dependencies.
+Therefore, gRPC supports several major build systems, which should satisfy most users.
## Bazel
-
+
Bazel is the primary build system used by the core gRPC development team. Bazel
provides fast builds and it easily handles dependencies that support bazel.
-
-To add gRPC as a dependency in bazel:
-1. determine commit SHA for the grpc release you want to use
+
+To add gRPC as a dependency in bazel:
+1. determine commit SHA for the grpc release you want to use
2. Use the [http_archive](https://docs.bazel.build/versions/master/repo/http.html#http_archive) bazel rule to include gRPC source
- ```
- http_archive(
- name = "com_github_grpc_grpc",
- urls = [
- "https://github.com/grpc/grpc/archive/YOUR_GRPC_COMMIT_SHA.tar.gz",
- ],
- strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA",
- )
-
- load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")
-
- grpc_deps()
- ```
-
+ ```
+ http_archive(
+ name = "com_github_grpc_grpc",
+ urls = [
+ "https://github.com/grpc/grpc/archive/YOUR_GRPC_COMMIT_SHA.tar.gz",
+ ],
+ strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA",
+ )
+
+ load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")
+
+ grpc_deps()
+ ```
+
## CMake
-
+
`cmake` is your best option if you cannot use bazel. It supports building on Linux,
MacOS and Windows (official support) but also has a good chance of working on
other platforms (no promises!). `cmake` has good support for crosscompiling and
can be used for targeting the Android platform.
-
+
To build gRPC C++ from source, follow the [BUILDING guide](../../BUILDING.md).
-
+
### find_package
The canonical way to discover dependencies in CMake is the
@@ -127,22 +127,22 @@ first install gRPC C++ using CMake, and have your non-CMake project rely on the
The default choice for building on UNIX based systems used to be `make`, but we are no longer recommending it.
You should use `bazel` or `cmake` instead.
-To install gRPC for C++ on your system using `make`, follow the [Building gRPC C++](../../BUILDING.md)
-instructions to build from source and then install locally using `make install`.
-This also installs the protocol buffer compiler `protoc` (if you don't have it already),
-and the C++ gRPC plugin for `protoc`.
-
-WARNING: After installing with `make install` there is no easy way to uninstall, which can cause issues
-if you later want to remove the grpc and/or protobuf installation or upgrade to a newer version.
-
-## Packaging systems
-
+To install gRPC for C++ on your system using `make`, follow the [Building gRPC C++](../../BUILDING.md)
+instructions to build from source and then install locally using `make install`.
+This also installs the protocol buffer compiler `protoc` (if you don't have it already),
+and the C++ gRPC plugin for `protoc`.
+
+WARNING: After installing with `make install` there is no easy way to uninstall, which can cause issues
+if you later want to remove the grpc and/or protobuf installation or upgrade to a newer version.
+
+## Packaging systems
+
We do not officially support any packaging system for C++, but there are some community-maintained packages that are kept up-to-date
and are known to work well. More contributions and support for popular packaging systems are welcome!
-
+
### Install using vcpkg package
gRPC is available using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager:
-
+
```
# install vcpkg package manager on your system using the official instructions
git clone https://github.com/Microsoft/vcpkg.git
@@ -157,8 +157,8 @@ vcpkg install grpc
The gRPC port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository.
-## Examples & Additional Documentation
-
+## Examples & Additional Documentation
+
You can find out how to build and run our simplest gRPC C++ example in our
[C++ quick start](../../examples/cpp).
@@ -175,6 +175,6 @@ documentation site at [grpc.io](https://grpc.io), specifically:
APIs.
-# To start developing gRPC C++
+# To start developing gRPC C++
-For instructions on how to build gRPC C++ from source, follow the [Building gRPC C++](../../BUILDING.md) instructions.
+For instructions on how to build gRPC C++ from source, follow the [Building gRPC C++](../../BUILDING.md) instructions.
diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
index fab05f2ac47..ac95c29efcd 100644
--- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc
+++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
@@ -18,7 +18,7 @@
#include <grpcpp/channel.h>
-#include <cstring>
+#include <cstring>
#include <memory>
#include <grpc/grpc.h>
@@ -30,7 +30,7 @@
#include <grpcpp/client_context.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/call.h>
-#include <grpcpp/impl/codegen/call_op_set.h>
+#include <grpcpp/impl/codegen/call_op_set.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/rpc_method.h>
@@ -39,7 +39,7 @@
#include <grpcpp/support/config.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
@@ -49,23 +49,23 @@ Channel::Channel(const TString& host, grpc_channel* channel,
::grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators)
: host_(host), c_channel_(channel) {
- interceptor_creators_ = std::move(interceptor_creators);
+ interceptor_creators_ = std::move(interceptor_creators);
g_gli_initializer.summon();
}
-Channel::~Channel() {
- grpc_channel_destroy(c_channel_);
- if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
- }
-}
+Channel::~Channel() {
+ grpc_channel_destroy(c_channel_);
+ if (callback_cq_ != nullptr) {
+ callback_cq_->Shutdown();
+ }
+}
namespace {
-inline grpc_slice SliceFromArray(const char* arr, size_t len) {
+inline grpc_slice SliceFromArray(const char* arr, size_t len) {
return g_core_codegen_interface->grpc_slice_from_copied_buffer(arr, len);
-}
-
+}
+
TString GetChannelInfoField(grpc_channel* channel,
grpc_channel_info* channel_info,
char*** channel_info_field) {
@@ -93,14 +93,14 @@ TString Channel::GetServiceConfigJSON() const {
&channel_info.service_config_json);
}
-namespace experimental {
-
-void ChannelResetConnectionBackoff(Channel* channel) {
- grpc_channel_reset_connect_backoff(channel->c_channel_);
-}
-
-} // namespace experimental
-
+namespace experimental {
+
+void ChannelResetConnectionBackoff(Channel* channel) {
+ grpc_channel_reset_connect_backoff(channel->c_channel_);
+}
+
+} // namespace experimental
+
::grpc::internal::Call Channel::CreateCallInternal(
const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
::grpc::CompletionQueue* cq, size_t interceptor_pos) {
@@ -113,13 +113,13 @@ void ChannelResetConnectionBackoff(Channel* channel) {
method.channel_tag(), context->raw_deadline(), nullptr);
} else {
const ::TString* host_str = nullptr;
- if (!context->authority_.empty()) {
- host_str = &context->authority_;
+ if (!context->authority_.empty()) {
+ host_str = &context->authority_;
} else if (!host_.empty()) {
- host_str = &host_;
+ host_str = &host_;
}
- grpc_slice method_slice =
- SliceFromArray(method.name(), strlen(method.name()));
+ grpc_slice method_slice =
+ SliceFromArray(method.name(), strlen(method.name()));
grpc_slice host_slice;
if (host_str != nullptr) {
host_slice = ::grpc::SliceFromCopiedString(*host_str);
@@ -135,28 +135,28 @@ void ChannelResetConnectionBackoff(Channel* channel) {
}
}
grpc_census_call_set_context(c_call, context->census_context());
-
- // ClientRpcInfo should be set before call because set_call also checks
- // whether the call has been cancelled, and if the call was cancelled, we
+
+ // ClientRpcInfo should be set before call because set_call also checks
+ // whether the call has been cancelled, and if the call was cancelled, we
// should notify the interceptors too.
- auto* info =
- context->set_client_rpc_info(method.name(), method.method_type(), this,
- interceptor_creators_, interceptor_pos);
+ auto* info =
+ context->set_client_rpc_info(method.name(), method.method_type(), this,
+ interceptor_creators_, interceptor_pos);
context->set_call(c_call, shared_from_this());
-
+
return ::grpc::internal::Call(c_call, this, cq, info);
}
::grpc::internal::Call Channel::CreateCall(
const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
CompletionQueue* cq) {
- return CreateCallInternal(method, context, cq, 0);
-}
-
+ return CreateCallInternal(method, context, cq, 0);
+}
+
void Channel::PerformOpsOnCall(::grpc::internal::CallOpSetInterface* ops,
::grpc::internal::Call* call) {
- ops->FillOps(
- call); // Make a copy of call. It's fine since Call just has pointers
+ ops->FillOps(
+ call); // Make a copy of call. It's fine since Call just has pointers
}
void* Channel::RegisterMethod(const char* method) {
@@ -205,9 +205,9 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
return ok;
}
-namespace {
-class ShutdownCallback : public grpc_experimental_completion_queue_functor {
- public:
+namespace {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
+ public:
ShutdownCallback() {
functor_run = &ShutdownCallback::Run;
// Set inlineable to true since this callback is trivial and thus does not
@@ -216,37 +216,37 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
// code.
inlineable = true;
}
- // TakeCQ takes ownership of the cq into the shutdown callback
- // so that the shutdown callback will be responsible for destroying it
+ // TakeCQ takes ownership of the cq into the shutdown callback
+ // so that the shutdown callback will be responsible for destroying it
void TakeCQ(::grpc::CompletionQueue* cq) { cq_ = cq; }
-
- // The Run function will get invoked by the completion queue library
- // when the shutdown is actually complete
- static void Run(grpc_experimental_completion_queue_functor* cb, int) {
- auto* callback = static_cast<ShutdownCallback*>(cb);
- delete callback->cq_;
- delete callback;
- }
-
- private:
+
+ // The Run function will get invoked by the completion queue library
+ // when the shutdown is actually complete
+ static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ delete callback->cq_;
+ delete callback;
+ }
+
+ private:
::grpc::CompletionQueue* cq_ = nullptr;
-};
-} // namespace
-
+};
+} // namespace
+
::grpc::CompletionQueue* Channel::CallbackCQ() {
- // TODO(vjpai): Consider using a single global CQ for the default CQ
- // if there is no explicit per-channel CQ registered
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-channel CQ registered
grpc::internal::MutexLock l(&mu_);
- if (callback_cq_ == nullptr) {
- auto* shutdown_callback = new ShutdownCallback;
+ if (callback_cq_ == nullptr) {
+ auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
- shutdown_callback});
-
- // Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
- }
- return callback_cq_;
-}
-
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ }
+ return callback_cq_;
+}
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/client/client_context.cc b/contrib/libs/grpc/src/cpp/client/client_context.cc
index b8b123ef379..b75343d0895 100644
--- a/contrib/libs/grpc/src/cpp/client/client_context.cc
+++ b/contrib/libs/grpc/src/cpp/client/client_context.cc
@@ -24,7 +24,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include <grpcpp/impl/codegen/interceptor_common.h>
+#include <grpcpp/impl/codegen/interceptor_common.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/security/credentials.h>
@@ -44,10 +44,10 @@ class DefaultGlobalClientCallbacks final
};
static internal::GrpcLibraryInitializer g_gli_initializer;
-static DefaultGlobalClientCallbacks* g_default_client_callbacks =
- new DefaultGlobalClientCallbacks();
+static DefaultGlobalClientCallbacks* g_default_client_callbacks =
+ new DefaultGlobalClientCallbacks();
static ClientContext::GlobalCallbacks* g_client_callbacks =
- g_default_client_callbacks;
+ g_default_client_callbacks;
ClientContext::ClientContext()
: initial_metadata_received_(false),
@@ -60,7 +60,7 @@ ClientContext::ClientContext()
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
census_context_(nullptr),
propagate_from_call_(nullptr),
- compression_algorithm_(GRPC_COMPRESS_NONE),
+ compression_algorithm_(GRPC_COMPRESS_NONE),
initial_metadata_corked_(false) {
g_client_callbacks->DefaultConstructor(this);
}
@@ -119,13 +119,13 @@ void ClientContext::set_call(grpc_call* call,
call_ = call;
channel_ = channel;
if (creds_ && !creds_->ApplyToCall(call_)) {
- // TODO(yashykt): should interceptors also see this status?
- SendCancelToInterceptors();
+ // TODO(yashykt): should interceptors also see this status?
+ SendCancelToInterceptors();
grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED,
"Failed to set credentials to rpc.", nullptr);
}
if (call_canceled_) {
- SendCancelToInterceptors();
+ SendCancelToInterceptors();
grpc_call_cancel(call_, nullptr);
}
}
@@ -146,20 +146,20 @@ void ClientContext::set_compression_algorithm(
void ClientContext::TryCancel() {
internal::MutexLock lock(&mu_);
if (call_) {
- SendCancelToInterceptors();
+ SendCancelToInterceptors();
grpc_call_cancel(call_, nullptr);
} else {
call_canceled_ = true;
}
}
-void ClientContext::SendCancelToInterceptors() {
+void ClientContext::SendCancelToInterceptors() {
internal::CancelInterceptorBatchMethods cancel_methods;
- for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) {
- rpc_info_.RunInterceptor(&cancel_methods, i);
- }
-}
-
+ for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) {
+ rpc_info_.RunInterceptor(&cancel_methods, i);
+ }
+}
+
TString ClientContext::peer() const {
TString peer;
if (call_) {
@@ -171,9 +171,9 @@ TString ClientContext::peer() const {
}
void ClientContext::SetGlobalCallbacks(GlobalCallbacks* client_callbacks) {
- GPR_ASSERT(g_client_callbacks == g_default_client_callbacks);
+ GPR_ASSERT(g_client_callbacks == g_default_client_callbacks);
GPR_ASSERT(client_callbacks != nullptr);
- GPR_ASSERT(client_callbacks != g_default_client_callbacks);
+ GPR_ASSERT(client_callbacks != g_default_client_callbacks);
g_client_callbacks = client_callbacks;
}
diff --git a/contrib/libs/grpc/src/cpp/client/client_interceptor.cc b/contrib/libs/grpc/src/cpp/client/client_interceptor.cc
index 9d98a60171b..a91950cae2d 100644
--- a/contrib/libs/grpc/src/cpp/client/client_interceptor.cc
+++ b/contrib/libs/grpc/src/cpp/client/client_interceptor.cc
@@ -1,44 +1,44 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpcpp/impl/codegen/client_interceptor.h>
-
-namespace grpc {
-
-namespace internal {
-experimental::ClientInterceptorFactoryInterface*
- g_global_client_interceptor_factory = nullptr;
-}
-
-namespace experimental {
-void RegisterGlobalClientInterceptorFactory(
- ClientInterceptorFactoryInterface* factory) {
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/impl/codegen/client_interceptor.h>
+
+namespace grpc {
+
+namespace internal {
+experimental::ClientInterceptorFactoryInterface*
+ g_global_client_interceptor_factory = nullptr;
+}
+
+namespace experimental {
+void RegisterGlobalClientInterceptorFactory(
+ ClientInterceptorFactoryInterface* factory) {
if (internal::g_global_client_interceptor_factory != nullptr) {
GPR_ASSERT(false &&
"It is illegal to call RegisterGlobalClientInterceptorFactory "
"multiple times.");
}
- internal::g_global_client_interceptor_factory = factory;
-}
+ internal::g_global_client_interceptor_factory = factory;
+}
// For testing purposes only.
void TestOnlyResetGlobalClientInterceptorFactory() {
internal::g_global_client_interceptor_factory = nullptr;
}
-} // namespace experimental
-} // namespace grpc
+} // namespace experimental
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/client/create_channel.cc b/contrib/libs/grpc/src/cpp/client/create_channel.cc
index 941c2e92902..97327490ed2 100644
--- a/contrib/libs/grpc/src/cpp/client/create_channel.cc
+++ b/contrib/libs/grpc/src/cpp/client/create_channel.cc
@@ -41,45 +41,45 @@ std::shared_ptr<grpc::Channel> CreateCustomChannel(
init_lib; // We need to call init in case of bad creds.
return creds ? creds->CreateChannelImpl(target, args)
: grpc::CreateChannelInternal(
- "",
- grpc_lame_client_channel_create(
- nullptr, GRPC_STATUS_INVALID_ARGUMENT,
- "Invalid credentials."),
- std::vector<std::unique_ptr<
+ "",
+ grpc_lame_client_channel_create(
+ nullptr, GRPC_STATUS_INVALID_ARGUMENT,
+ "Invalid credentials."),
+ std::vector<std::unique_ptr<
grpc::experimental::
ClientInterceptorFactoryInterface>>());
}
-namespace experimental {
-/// Create a new \em custom \a Channel pointing to \a target with \a
-/// interceptors being invoked per call.
-///
-/// \warning For advanced use and testing ONLY. Override default channel
-/// arguments only if necessary.
-///
-/// \param target The URI of the endpoint to connect to.
-/// \param creds Credentials to use for the created channel. If it does not
-/// hold an object or is invalid, a lame channel (one on which all operations
-/// fail) is returned.
-/// \param args Options for channel creation.
+namespace experimental {
+/// Create a new \em custom \a Channel pointing to \a target with \a
+/// interceptors being invoked per call.
+///
+/// \warning For advanced use and testing ONLY. Override default channel
+/// arguments only if necessary.
+///
+/// \param target The URI of the endpoint to connect to.
+/// \param creds Credentials to use for the created channel. If it does not
+/// hold an object or is invalid, a lame channel (one on which all operations
+/// fail) is returned.
+/// \param args Options for channel creation.
std::shared_ptr<grpc::Channel> CreateCustomChannelWithInterceptors(
const TString& target,
const std::shared_ptr<grpc::ChannelCredentials>& creds,
const grpc::ChannelArguments& args,
- std::vector<
+ std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
+ interceptor_creators) {
grpc::GrpcLibraryCodegen
init_lib; // We need to call init in case of bad creds.
- return creds ? creds->CreateChannelWithInterceptors(
- target, args, std::move(interceptor_creators))
+ return creds ? creds->CreateChannelWithInterceptors(
+ target, args, std::move(interceptor_creators))
: grpc::CreateChannelInternal(
- "",
- grpc_lame_client_channel_create(
- nullptr, GRPC_STATUS_INVALID_ARGUMENT,
- "Invalid credentials."),
+ "",
+ grpc_lame_client_channel_create(
+ nullptr, GRPC_STATUS_INVALID_ARGUMENT,
+ "Invalid credentials."),
std::move(interceptor_creators));
-}
-} // namespace experimental
-
+}
+} // namespace experimental
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc b/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc
index 8e85dbc7abb..da2a878a227 100644
--- a/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc
+++ b/contrib/libs/grpc/src/cpp/client/create_channel_internal.cc
@@ -24,13 +24,13 @@ struct grpc_channel;
namespace grpc {
-std::shared_ptr<Channel> CreateChannelInternal(
+std::shared_ptr<Channel> CreateChannelInternal(
const TString& host, grpc_channel* c_channel,
std::vector<std::unique_ptr<
::grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
- return std::shared_ptr<Channel>(
- new Channel(host, c_channel, std::move(interceptor_creators)));
+ interceptor_creators) {
+ return std::shared_ptr<Channel>(
+ new Channel(host, c_channel, std::move(interceptor_creators)));
}
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_internal.h b/contrib/libs/grpc/src/cpp/client/create_channel_internal.h
index d19a63b3c78..09d4e56b023 100644
--- a/contrib/libs/grpc/src/cpp/client/create_channel_internal.h
+++ b/contrib/libs/grpc/src/cpp/client/create_channel_internal.h
@@ -22,18 +22,18 @@
#include <memory>
#include <grpcpp/channel.h>
-#include <grpcpp/impl/codegen/client_interceptor.h>
+#include <grpcpp/impl/codegen/client_interceptor.h>
#include <grpcpp/support/config.h>
struct grpc_channel;
namespace grpc {
-std::shared_ptr<Channel> CreateChannelInternal(
+std::shared_ptr<Channel> CreateChannelInternal(
const TString& host, grpc_channel* c_channel,
std::vector<std::unique_ptr<
::grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators);
+ interceptor_creators);
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc b/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc
index a10abb759e8..db09eda8a66 100644
--- a/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc
+++ b/contrib/libs/grpc/src/cpp/client/create_channel_posix.cc
@@ -35,7 +35,7 @@ std::shared_ptr<Channel> CreateInsecureChannelFromFd(const TString& target,
grpc::internal::GrpcLibrary init_lib;
init_lib.init();
return CreateChannelInternal(
- "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, nullptr),
+ "", grpc_insecure_channel_create_from_fd(target.c_str(), fd, nullptr),
std::vector<
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>());
}
@@ -47,31 +47,31 @@ std::shared_ptr<Channel> CreateCustomInsecureChannelFromFd(
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return CreateChannelInternal(
- "",
- grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args),
+ "",
+ grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args),
std::vector<
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>());
}
-namespace experimental {
-
+namespace experimental {
+
std::shared_ptr<Channel> CreateCustomInsecureChannelWithInterceptorsFromFd(
const TString& target, int fd, const ChannelArguments& args,
- std::vector<
+ std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
+ interceptor_creators) {
grpc::internal::GrpcLibrary init_lib;
- init_lib.init();
- grpc_channel_args channel_args;
- args.SetChannelArgs(&channel_args);
+ init_lib.init();
+ grpc_channel_args channel_args;
+ args.SetChannelArgs(&channel_args);
return CreateChannelInternal(
- "",
- grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args),
- std::move(interceptor_creators));
-}
-
-} // namespace experimental
-
+ "",
+ grpc_insecure_channel_create_from_fd(target.c_str(), fd, &channel_args),
+ std::move(interceptor_creators));
+}
+
+} // namespace experimental
+
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc
index ac429445200..e5bafff70af 100644
--- a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc
@@ -31,23 +31,23 @@ class InsecureChannelCredentialsImpl final : public ChannelCredentials {
public:
std::shared_ptr<Channel> CreateChannelImpl(
const TString& target, const ChannelArguments& args) override {
- return CreateChannelWithInterceptors(
- target, args,
- std::vector<std::unique_ptr<
+ return CreateChannelWithInterceptors(
+ target, args,
+ std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
- }
-
+ }
+
std::shared_ptr<Channel> CreateChannelWithInterceptors(
const TString& target, const ChannelArguments& args,
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) override {
+ interceptor_creators) override {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return ::grpc::CreateChannelInternal(
"",
- grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr),
- std::move(interceptor_creators));
+ grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr),
+ std::move(interceptor_creators));
}
SecureChannelCredentials* AsSecureCredentials() override { return nullptr; }
diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc
index cb9ffd3e846..0f6db3caa53 100644
--- a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc
@@ -49,25 +49,25 @@ SecureChannelCredentials::SecureChannelCredentials(
std::shared_ptr<Channel> SecureChannelCredentials::CreateChannelImpl(
const TString& target, const ChannelArguments& args) {
- return CreateChannelWithInterceptors(
- target, args,
+ return CreateChannelWithInterceptors(
+ target, args,
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
-}
-
+}
+
std::shared_ptr<Channel>
-SecureChannelCredentials::CreateChannelWithInterceptors(
+SecureChannelCredentials::CreateChannelWithInterceptors(
const TString& target, const ChannelArguments& args,
- std::vector<
+ std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
+ interceptor_creators) {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return ::grpc::CreateChannelInternal(
args.GetSslTargetNameOverride(),
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args,
- nullptr),
- std::move(interceptor_creators));
+ nullptr),
+ std::move(interceptor_creators));
}
SecureCallCredentials::SecureCallCredentials(grpc_call_credentials* c_creds)
@@ -110,8 +110,8 @@ std::shared_ptr<ChannelCredentials> SslCredentials(
grpc_channel_credentials* c_creds = grpc_ssl_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
- options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr,
- nullptr);
+ options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr,
+ nullptr);
return WrapChannelCredentials(c_creds);
}
@@ -280,13 +280,13 @@ std::shared_ptr<ChannelCredentials> AltsCredentials(
return WrapChannelCredentials(c_creds);
}
-// Builds Local Credentials
-std::shared_ptr<ChannelCredentials> LocalCredentials(
- grpc_local_connect_type type) {
+// Builds Local Credentials
+std::shared_ptr<ChannelCredentials> LocalCredentials(
+ grpc_local_connect_type type) {
grpc::GrpcLibraryCodegen init; // To call grpc_init().
- return WrapChannelCredentials(grpc_local_credentials_create(type));
-}
-
+ return WrapChannelCredentials(grpc_local_credentials_create(type));
+}
+
// Builds TLS Credentials given TLS options.
std::shared_ptr<ChannelCredentials> TlsCredentials(
const TlsCredentialsOptions& options) {
@@ -433,10 +433,10 @@ int MetadataCredentialsPluginWrapper::GetMetadata(
grpc_auth_metadata_context_copy(&context, &context_copy);
// Asynchronous return.
w->thread_pool_->Add([w, context_copy, cb, user_data]() mutable {
- w->MetadataCredentialsPluginWrapper::InvokePlugin(
+ w->MetadataCredentialsPluginWrapper::InvokePlugin(
context_copy, cb, user_data, nullptr, nullptr, nullptr, nullptr);
grpc_auth_metadata_context_reset(&context_copy);
- });
+ });
return 0;
} else {
// Synchronous return.
@@ -463,10 +463,10 @@ void MetadataCredentialsPluginWrapper::InvokePlugin(
grpc_status_code* status_code, const char** error_details) {
std::multimap<TString, TString> metadata;
- // const_cast is safe since the SecureAuthContext only inc/dec the refcount
- // and the object is passed as a const ref to plugin_->GetMetadata.
+ // const_cast is safe since the SecureAuthContext only inc/dec the refcount
+ // and the object is passed as a const ref to plugin_->GetMetadata.
SecureAuthContext cpp_channel_auth_context(
- const_cast<grpc_auth_context*>(context.channel_auth_context));
+ const_cast<grpc_auth_context*>(context.channel_auth_context));
Status status = plugin_->GetMetadata(context.service_url, context.method_name,
cpp_channel_auth_context, &metadata);
diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.h b/contrib/libs/grpc/src/cpp/client/secure_credentials.h
index 329c003474f..4fc79346bf4 100644
--- a/contrib/libs/grpc/src/cpp/client/secure_credentials.h
+++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.h
@@ -26,7 +26,7 @@
#include <grpcpp/support/config.h>
#include "y_absl/strings/str_cat.h"
-#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/credentials/credentials.h"
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
@@ -36,14 +36,14 @@ class Channel;
class SecureChannelCredentials final : public ChannelCredentials {
public:
explicit SecureChannelCredentials(grpc_channel_credentials* c_creds);
- ~SecureChannelCredentials() {
- if (c_creds_ != nullptr) c_creds_->Unref();
- }
+ ~SecureChannelCredentials() {
+ if (c_creds_ != nullptr) c_creds_->Unref();
+ }
grpc_channel_credentials* GetRawCreds() { return c_creds_; }
std::shared_ptr<Channel> CreateChannelImpl(
const TString& target, const ChannelArguments& args) override;
-
+
SecureChannelCredentials* AsSecureCredentials() override { return this; }
private:
@@ -51,16 +51,16 @@ class SecureChannelCredentials final : public ChannelCredentials {
const TString& target, const ChannelArguments& args,
std::vector<std::unique_ptr<
::grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) override;
+ interceptor_creators) override;
grpc_channel_credentials* const c_creds_;
};
class SecureCallCredentials final : public CallCredentials {
public:
explicit SecureCallCredentials(grpc_call_credentials* c_creds);
- ~SecureCallCredentials() {
- if (c_creds_ != nullptr) c_creds_->Unref();
- }
+ ~SecureCallCredentials() {
+ if (c_creds_ != nullptr) c_creds_->Unref();
+ }
grpc_call_credentials* GetRawCreds() { return c_creds_; }
bool ApplyToCall(grpc_call* call) override;
diff --git a/contrib/libs/grpc/src/cpp/common/alarm.cc b/contrib/libs/grpc/src/cpp/common/alarm.cc
index 83de2f2d3f1..a2612874b20 100644
--- a/contrib/libs/grpc/src/cpp/common/alarm.cc
+++ b/contrib/libs/grpc/src/cpp/common/alarm.cc
@@ -35,25 +35,25 @@
namespace grpc {
namespace internal {
-class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
+class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
public:
AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
- }
- ~AlarmImpl() {}
+ }
+ ~AlarmImpl() {}
bool FinalizeResult(void** tag, bool* /*status*/) override {
- *tag = tag_;
- Unref();
- return true;
- }
- void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
- grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
- grpc_core::ExecCtx exec_ctx;
- GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
- cq_ = cq->cq();
- tag_ = tag;
- GPR_ASSERT(grpc_cq_begin_op(cq_, this));
+ *tag = tag_;
+ Unref();
+ return true;
+ }
+ void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
+ cq_ = cq->cq();
+ tag_ = tag;
+ GPR_ASSERT(grpc_cq_begin_op(cq_, this));
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error* error) {
@@ -71,17 +71,17 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
},
this, grpc_schedule_on_exec_ctx);
- grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
- &on_alarm_);
+ grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
+ &on_alarm_);
}
- void Set(gpr_timespec deadline, std::function<void(bool)> f) {
- grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ void Set(gpr_timespec deadline, std::function<void(bool)> f) {
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
- // Don't use any CQ at all. Instead just use the timer to fire the function
- callback_ = std::move(f);
- Ref();
- GRPC_CLOSURE_INIT(&on_alarm_,
- [](void* arg, grpc_error* error) {
+ // Don't use any CQ at all. Instead just use the timer to fire the function
+ callback_ = std::move(f);
+ Ref();
+ GRPC_CLOSURE_INIT(&on_alarm_,
+ [](void* arg, grpc_error* error) {
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* error) {
@@ -92,13 +92,13 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
},
arg, nullptr),
error);
- },
- this, grpc_schedule_on_exec_ctx);
+ },
+ this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
&on_alarm_);
}
void Cancel() {
- grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
+ grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_timer_cancel(&timer_);
}
@@ -122,18 +122,18 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
// completion queue where events about this alarm will be posted
grpc_completion_queue* cq_;
void* tag_;
- std::function<void(bool)> callback_;
+ std::function<void(bool)> callback_;
};
} // namespace internal
-static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer;
+static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer;
Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {
g_gli_initializer.summon();
}
-void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline,
- void* tag) {
+void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline,
+ void* tag) {
// Note that we know that alarm_ is actually an internal::AlarmImpl
// but we declared it as the base pointer to avoid a forward declaration
// or exposing core data structures in the C++ public headers.
@@ -142,15 +142,15 @@ void Alarm::SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline,
static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
}
-void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
- // Note that we know that alarm_ is actually an internal::AlarmImpl
- // but we declared it as the base pointer to avoid a forward declaration
- // or exposing core data structures in the C++ public headers.
- // Thus it is safe to use a static_cast to the subclass here, and the
- // C++ style guide allows us to do so in this case
- static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
-}
-
+void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
+ // Note that we know that alarm_ is actually an internal::AlarmImpl
+ // but we declared it as the base pointer to avoid a forward declaration
+ // or exposing core data structures in the C++ public headers.
+ // Thus it is safe to use a static_cast to the subclass here, and the
+ // C++ style guide allows us to do so in this case
+ static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
+}
+
Alarm::~Alarm() {
if (alarm_ != nullptr) {
static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
diff --git a/contrib/libs/grpc/src/cpp/common/channel_arguments.cc b/contrib/libs/grpc/src/cpp/common/channel_arguments.cc
index 23be69f72a6..5a5dd91b5ec 100644
--- a/contrib/libs/grpc/src/cpp/common/channel_arguments.cc
+++ b/contrib/libs/grpc/src/cpp/common/channel_arguments.cc
@@ -108,7 +108,7 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
if (!replaced) {
strings_.push_back(TString(mutator_arg.key));
args_.push_back(mutator_arg);
- args_.back().key = const_cast<char*>(strings_.back().c_str());
+ args_.back().key = const_cast<char*>(strings_.back().c_str());
}
}
diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.h b/contrib/libs/grpc/src/cpp/common/channel_filter.h
index fab1c7744fe..5ce720b3075 100644
--- a/contrib/libs/grpc/src/cpp/common/channel_filter.h
+++ b/contrib/libs/grpc/src/cpp/common/channel_filter.h
@@ -207,18 +207,18 @@ class TransportStreamOpBatch {
op_->payload->context[GRPC_CONTEXT_TRACING].value);
}
- const gpr_atm* get_peer_string() const {
- if (op_->send_initial_metadata &&
- op_->payload->send_initial_metadata.peer_string != nullptr) {
- return op_->payload->send_initial_metadata.peer_string;
- } else if (op_->recv_initial_metadata &&
- op_->payload->recv_initial_metadata.peer_string != nullptr) {
- return op_->payload->recv_initial_metadata.peer_string;
- } else {
- return nullptr;
- }
- }
-
+ const gpr_atm* get_peer_string() const {
+ if (op_->send_initial_metadata &&
+ op_->payload->send_initial_metadata.peer_string != nullptr) {
+ return op_->payload->send_initial_metadata.peer_string;
+ } else if (op_->recv_initial_metadata &&
+ op_->payload->recv_initial_metadata.peer_string != nullptr) {
+ return op_->payload->recv_initial_metadata.peer_string;
+ } else {
+ return nullptr;
+ }
+ }
+
private:
grpc_transport_stream_op_batch* op_; // Not owned.
MetadataBatch send_initial_metadata_;
@@ -366,11 +366,11 @@ void ChannelFilterPluginShutdown();
/// The \a include_filter argument specifies a function that will be called
/// to determine at run-time whether or not to add the filter. If the
/// value is nullptr, the filter will be added unconditionally.
-/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should
-/// ensure that subchannels with different filter lists will always have
-/// different channel args. This requires setting a channel arg in case the
-/// registration function relies on some condition other than channel args to
-/// decide whether to add a filter or not.
+/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should
+/// ensure that subchannels with different filter lists will always have
+/// different channel args. This requires setting a channel arg in case the
+/// registration function relies on some condition other than channel args to
+/// decide whether to add a filter or not.
template <typename ChannelDataType, typename CallDataType>
void RegisterChannelFilter(
const char* name, grpc_channel_stack_type stack_type, int priority,
diff --git a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc
index 109b2ceb02b..96a7105eaf4 100644
--- a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc
+++ b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc
@@ -61,8 +61,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
auto core_cq_tag =
static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
- *tag = core_cq_tag;
- if (core_cq_tag->FinalizeResult(tag, ok)) {
+ *tag = core_cq_tag;
+ if (core_cq_tag->FinalizeResult(tag, ok)) {
return GOT_EVENT;
}
break;
@@ -89,7 +89,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
auto core_cq_tag =
static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
*ok = res == 1;
- if (core_cq_tag->FinalizeResult(tag, ok)) {
+ if (core_cq_tag->FinalizeResult(tag, ok)) {
return true;
}
}
diff --git a/contrib/libs/grpc/src/cpp/common/core_codegen.cc b/contrib/libs/grpc/src/cpp/common/core_codegen.cc
index 794154f79c5..75383ed5110 100644
--- a/contrib/libs/grpc/src/cpp/common/core_codegen.cc
+++ b/contrib/libs/grpc/src/cpp/common/core_codegen.cc
@@ -59,10 +59,10 @@ grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck(
return ::grpc_completion_queue_create_for_pluck(reserved);
}
-void CoreCodegen::grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
- ::grpc_completion_queue_shutdown(cq);
-}
-
+void CoreCodegen::grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
+ ::grpc_completion_queue_shutdown(cq);
+}
+
void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
::grpc_completion_queue_destroy(cq);
}
@@ -106,13 +106,13 @@ size_t CoreCodegen::grpc_byte_buffer_length(grpc_byte_buffer* bb) {
return ::grpc_byte_buffer_length(bb);
}
-grpc_call_error CoreCodegen::grpc_call_start_batch(grpc_call* call,
- const grpc_op* ops,
- size_t nops, void* tag,
- void* reserved) {
- return ::grpc_call_start_batch(call, ops, nops, tag, reserved);
-}
-
+grpc_call_error CoreCodegen::grpc_call_start_batch(grpc_call* call,
+ const grpc_op* ops,
+ size_t nops, void* tag,
+ void* reserved) {
+ return ::grpc_call_start_batch(call, ops, nops, tag, reserved);
+}
+
grpc_call_error CoreCodegen::grpc_call_cancel_with_status(
grpc_call* call, grpc_status_code status, const char* description,
void* reserved) {
diff --git a/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc b/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc
index a4afbae45a8..64abff96338 100644
--- a/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc
+++ b/contrib/libs/grpc/src/cpp/common/resource_quota_cc.cc
@@ -33,8 +33,8 @@ ResourceQuota& ResourceQuota::Resize(size_t new_size) {
return *this;
}
-ResourceQuota& ResourceQuota::SetMaxThreads(int new_max_threads) {
- grpc_resource_quota_set_max_threads(impl_, new_max_threads);
- return *this;
-}
+ResourceQuota& ResourceQuota::SetMaxThreads(int new_max_threads) {
+ grpc_resource_quota_set_max_threads(impl_, new_max_threads);
+ return *this;
+}
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc b/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc
index 96903583434..e1f97889c8e 100644
--- a/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc
+++ b/contrib/libs/grpc/src/cpp/common/secure_auth_context.cc
@@ -23,11 +23,11 @@
namespace grpc {
std::vector<grpc::string_ref> SecureAuthContext::GetPeerIdentity() const {
- if (ctx_ == nullptr) {
+ if (ctx_ == nullptr) {
return std::vector<grpc::string_ref>();
}
- grpc_auth_property_iterator iter =
- grpc_auth_context_peer_identity(ctx_.get());
+ grpc_auth_property_iterator iter =
+ grpc_auth_context_peer_identity(ctx_.get());
std::vector<grpc::string_ref> identity;
const grpc_auth_property* property = nullptr;
while ((property = grpc_auth_property_iterator_next(&iter))) {
@@ -38,20 +38,20 @@ std::vector<grpc::string_ref> SecureAuthContext::GetPeerIdentity() const {
}
TString SecureAuthContext::GetPeerIdentityPropertyName() const {
- if (ctx_ == nullptr) {
+ if (ctx_ == nullptr) {
return "";
}
- const char* name = grpc_auth_context_peer_identity_property_name(ctx_.get());
+ const char* name = grpc_auth_context_peer_identity_property_name(ctx_.get());
return name == nullptr ? "" : name;
}
std::vector<grpc::string_ref> SecureAuthContext::FindPropertyValues(
const TString& name) const {
- if (ctx_ == nullptr) {
+ if (ctx_ == nullptr) {
return std::vector<grpc::string_ref>();
}
grpc_auth_property_iterator iter =
- grpc_auth_context_find_properties_by_name(ctx_.get(), name.c_str());
+ grpc_auth_context_find_properties_by_name(ctx_.get(), name.c_str());
const grpc_auth_property* property = nullptr;
std::vector<grpc::string_ref> values;
while ((property = grpc_auth_property_iterator_next(&iter))) {
@@ -61,9 +61,9 @@ std::vector<grpc::string_ref> SecureAuthContext::FindPropertyValues(
}
AuthPropertyIterator SecureAuthContext::begin() const {
- if (ctx_ != nullptr) {
+ if (ctx_ != nullptr) {
grpc_auth_property_iterator iter =
- grpc_auth_context_property_iterator(ctx_.get());
+ grpc_auth_context_property_iterator(ctx_.get());
const grpc_auth_property* property =
grpc_auth_property_iterator_next(&iter);
return AuthPropertyIterator(property, &iter);
@@ -78,20 +78,20 @@ AuthPropertyIterator SecureAuthContext::end() const {
void SecureAuthContext::AddProperty(const TString& key,
const grpc::string_ref& value) {
- if (ctx_ == nullptr) return;
- grpc_auth_context_add_property(ctx_.get(), key.c_str(), value.data(),
- value.size());
+ if (ctx_ == nullptr) return;
+ grpc_auth_context_add_property(ctx_.get(), key.c_str(), value.data(),
+ value.size());
}
bool SecureAuthContext::SetPeerIdentityPropertyName(const TString& name) {
- if (ctx_ == nullptr) return false;
- return grpc_auth_context_set_peer_identity_property_name(ctx_.get(),
+ if (ctx_ == nullptr) return false;
+ return grpc_auth_context_set_peer_identity_property_name(ctx_.get(),
name.c_str()) != 0;
}
bool SecureAuthContext::IsPeerAuthenticated() const {
- if (ctx_ == nullptr) return false;
- return grpc_auth_context_peer_is_authenticated(ctx_.get()) != 0;
+ if (ctx_ == nullptr) return false;
+ return grpc_auth_context_peer_is_authenticated(ctx_.get()) != 0;
}
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h
index 40d1007dd60..51013efac70 100644
--- a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h
+++ b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h
@@ -21,17 +21,17 @@
#include <grpcpp/security/auth_context.h>
-#include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/security/context/security_context.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/security/context/security_context.h"
namespace grpc {
class SecureAuthContext final : public AuthContext {
public:
- explicit SecureAuthContext(grpc_auth_context* ctx)
- : ctx_(ctx != nullptr ? ctx->Ref() : nullptr) {}
+ explicit SecureAuthContext(grpc_auth_context* ctx)
+ : ctx_(ctx != nullptr ? ctx->Ref() : nullptr) {}
- ~SecureAuthContext() override = default;
+ ~SecureAuthContext() override = default;
bool IsPeerAuthenticated() const override;
@@ -52,7 +52,7 @@ class SecureAuthContext final : public AuthContext {
virtual bool SetPeerIdentityPropertyName(const TString& name) override;
private:
- grpc_core::RefCountedPtr<grpc_auth_context> ctx_;
+ grpc_core::RefCountedPtr<grpc_auth_context> ctx_;
};
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc b/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc
index 5883d5c3a7d..908c46629e6 100644
--- a/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc
+++ b/contrib/libs/grpc/src/cpp/common/secure_create_auth_context.cc
@@ -20,7 +20,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpcpp/security/auth_context.h>
-#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/cpp/common/secure_auth_context.h"
namespace grpc {
@@ -29,8 +29,8 @@ std::shared_ptr<const AuthContext> CreateAuthContext(grpc_call* call) {
if (call == nullptr) {
return std::shared_ptr<const AuthContext>();
}
- grpc_core::RefCountedPtr<grpc_auth_context> ctx(grpc_call_auth_context(call));
- return std::make_shared<SecureAuthContext>(ctx.get());
+ grpc_core::RefCountedPtr<grpc_auth_context> ctx(grpc_call_auth_context(call));
+ return std::make_shared<SecureAuthContext>(ctx.get());
}
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
index b707f3c4766..6dcf84bf40d 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
@@ -1,29 +1,29 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/cpp/server/channelz/channelz_service.h"
-
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-
-namespace grpc {
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/channelz/channelz_service.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+namespace grpc {
namespace {
@@ -33,121 +33,121 @@ grpc::protobuf::util::Status ParseJson(const char* json_str,
options.case_insensitive_enum_parsing = true;
return grpc::protobuf::json::JsonStringToMessage(json_str, message, options);
}
-
+
} // namespace
-Status ChannelzService::GetTopChannels(
+Status ChannelzService::GetTopChannels(
ServerContext* /*unused*/,
const channelz::v1::GetTopChannelsRequest* request,
- channelz::v1::GetTopChannelsResponse* response) {
- char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_top_channels returned null");
- }
+ channelz::v1::GetTopChannelsResponse* response) {
+ char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_top_channels returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetServers(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServers(
ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request,
- channelz::v1::GetServersResponse* response) {
- char* json_str = grpc_channelz_get_servers(request->start_server_id());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_servers returned null");
- }
+ channelz::v1::GetServersResponse* response) {
+ char* json_str = grpc_channelz_get_servers(request->start_server_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_servers returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
Status ChannelzService::GetServer(ServerContext* /*unused*/,
- const channelz::v1::GetServerRequest* request,
- channelz::v1::GetServerResponse* response) {
- char* json_str = grpc_channelz_get_server(request->server_id());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_server returned null");
- }
+ const channelz::v1::GetServerRequest* request,
+ channelz::v1::GetServerResponse* response) {
+ char* json_str = grpc_channelz_get_server(request->server_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetServerSockets(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServerSockets(
ServerContext* /*unused*/,
const channelz::v1::GetServerSocketsRequest* request,
- channelz::v1::GetServerSocketsResponse* response) {
- char* json_str = grpc_channelz_get_server_sockets(
- request->server_id(), request->start_socket_id(), request->max_results());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_server_sockets returned null");
- }
+ channelz::v1::GetServerSocketsResponse* response) {
+ char* json_str = grpc_channelz_get_server_sockets(
+ request->server_id(), request->start_socket_id(), request->max_results());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server_sockets returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetChannel(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetChannel(
ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request,
- channelz::v1::GetChannelResponse* response) {
- char* json_str = grpc_channelz_get_channel(request->channel_id());
- if (json_str == nullptr) {
- return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId");
- }
+ channelz::v1::GetChannelResponse* response) {
+ char* json_str = grpc_channelz_get_channel(request->channel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetSubchannel(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetSubchannel(
ServerContext* /*unused*/,
const channelz::v1::GetSubchannelRequest* request,
- channelz::v1::GetSubchannelResponse* response) {
- char* json_str = grpc_channelz_get_subchannel(request->subchannel_id());
- if (json_str == nullptr) {
- return Status(StatusCode::NOT_FOUND,
- "No object found for that SubchannelId");
- }
+ channelz::v1::GetSubchannelResponse* response) {
+ char* json_str = grpc_channelz_get_subchannel(request->subchannel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND,
+ "No object found for that SubchannelId");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
Status ChannelzService::GetSocket(ServerContext* /*unused*/,
- const channelz::v1::GetSocketRequest* request,
- channelz::v1::GetSocketResponse* response) {
- char* json_str = grpc_channelz_get_socket(request->socket_id());
- if (json_str == nullptr) {
- return Status(StatusCode::NOT_FOUND, "No object found for that SocketId");
- }
+ const channelz::v1::GetSocketRequest* request,
+ channelz::v1::GetSocketResponse* response) {
+ char* json_str = grpc_channelz_get_socket(request->socket_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND, "No object found for that SocketId");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-} // namespace grpc
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
index 72818a0d726..b4a66ba1c66 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
@@ -1,64 +1,64 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
-#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
-
-#include <grpc/support/port_platform.h>
-
-#include <grpcpp/grpcpp.h>
-#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
-
-namespace grpc {
-
-class ChannelzService final : public channelz::v1::Channelz::Service {
- private:
- // implementation of GetTopChannels rpc
- Status GetTopChannels(
- ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
- channelz::v1::GetTopChannelsResponse* response) override;
- // implementation of GetServers rpc
- Status GetServers(ServerContext* unused,
- const channelz::v1::GetServersRequest* request,
- channelz::v1::GetServersResponse* response) override;
- // implementation of GetServer rpc
- Status GetServer(ServerContext* unused,
- const channelz::v1::GetServerRequest* request,
- channelz::v1::GetServerResponse* response) override;
- // implementation of GetServerSockets rpc
- Status GetServerSockets(
- ServerContext* unused,
- const channelz::v1::GetServerSocketsRequest* request,
- channelz::v1::GetServerSocketsResponse* response) override;
- // implementation of GetChannel rpc
- Status GetChannel(ServerContext* unused,
- const channelz::v1::GetChannelRequest* request,
- channelz::v1::GetChannelResponse* response) override;
- // implementation of GetSubchannel rpc
- Status GetSubchannel(ServerContext* unused,
- const channelz::v1::GetSubchannelRequest* request,
- channelz::v1::GetSubchannelResponse* response) override;
- // implementation of GetSocket rpc
- Status GetSocket(ServerContext* unused,
- const channelz::v1::GetSocketRequest* request,
- channelz::v1::GetSocketResponse* response) override;
-};
-
-} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/grpcpp.h>
+#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
+
+namespace grpc {
+
+class ChannelzService final : public channelz::v1::Channelz::Service {
+ private:
+ // implementation of GetTopChannels rpc
+ Status GetTopChannels(
+ ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
+ channelz::v1::GetTopChannelsResponse* response) override;
+ // implementation of GetServers rpc
+ Status GetServers(ServerContext* unused,
+ const channelz::v1::GetServersRequest* request,
+ channelz::v1::GetServersResponse* response) override;
+ // implementation of GetServer rpc
+ Status GetServer(ServerContext* unused,
+ const channelz::v1::GetServerRequest* request,
+ channelz::v1::GetServerResponse* response) override;
+ // implementation of GetServerSockets rpc
+ Status GetServerSockets(
+ ServerContext* unused,
+ const channelz::v1::GetServerSocketsRequest* request,
+ channelz::v1::GetServerSocketsResponse* response) override;
+ // implementation of GetChannel rpc
+ Status GetChannel(ServerContext* unused,
+ const channelz::v1::GetChannelRequest* request,
+ channelz::v1::GetChannelResponse* response) override;
+ // implementation of GetSubchannel rpc
+ Status GetSubchannel(ServerContext* unused,
+ const channelz::v1::GetSubchannelRequest* request,
+ channelz::v1::GetSubchannelResponse* response) override;
+ // implementation of GetSocket rpc
+ Status GetSocket(ServerContext* unused,
+ const channelz::v1::GetSocketRequest* request,
+ channelz::v1::GetSocketResponse* response) override;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
index 35ecd081255..ae26a447ab3 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
@@ -1,72 +1,72 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <grpcpp/ext/channelz_service_plugin.h>
-#include <grpcpp/impl/server_builder_plugin.h>
-#include <grpcpp/impl/server_initializer.h>
-#include <grpcpp/server.h>
-
-#include "src/cpp/server/channelz/channelz_service.h"
-
-namespace grpc {
-namespace channelz {
-namespace experimental {
-
-class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
- public:
- ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
-
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/channelz_service_plugin.h>
+#include <grpcpp/impl/server_builder_plugin.h>
+#include <grpcpp/impl/server_initializer.h>
+#include <grpcpp/server.h>
+
+#include "src/cpp/server/channelz/channelz_service.h"
+
+namespace grpc {
+namespace channelz {
+namespace experimental {
+
+class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
+ public:
+ ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
+
TString name() override { return "channelz_service"; }
-
- void InitServer(grpc::ServerInitializer* si) override {
- si->RegisterService(channelz_service_);
- }
-
+
+ void InitServer(grpc::ServerInitializer* si) override {
+ si->RegisterService(channelz_service_);
+ }
+
void Finish(grpc::ServerInitializer* /*si*/) override {}
-
+
void ChangeArguments(const TString& /*name*/, void* /*value*/) override {}
-
- bool has_sync_methods() const override {
- if (channelz_service_) {
- return channelz_service_->has_synchronous_methods();
- }
- return false;
- }
-
- bool has_async_methods() const override {
- if (channelz_service_) {
- return channelz_service_->has_async_methods();
- }
- return false;
- }
-
- private:
- std::shared_ptr<grpc::ChannelzService> channelz_service_;
-};
-
-static std::unique_ptr< ::grpc::ServerBuilderPlugin>
-CreateChannelzServicePlugin() {
- return std::unique_ptr< ::grpc::ServerBuilderPlugin>(
- new ChannelzServicePlugin());
-}
-
+
+ bool has_sync_methods() const override {
+ if (channelz_service_) {
+ return channelz_service_->has_synchronous_methods();
+ }
+ return false;
+ }
+
+ bool has_async_methods() const override {
+ if (channelz_service_) {
+ return channelz_service_->has_async_methods();
+ }
+ return false;
+ }
+
+ private:
+ std::shared_ptr<grpc::ChannelzService> channelz_service_;
+};
+
+static std::unique_ptr< ::grpc::ServerBuilderPlugin>
+CreateChannelzServicePlugin() {
+ return std::unique_ptr< ::grpc::ServerBuilderPlugin>(
+ new ChannelzServicePlugin());
+}
+
} // namespace experimental
} // namespace channelz
} // namespace grpc
@@ -74,15 +74,15 @@ namespace grpc_impl {
namespace channelz {
namespace experimental {
-void InitChannelzService() {
+void InitChannelzService() {
static struct Initializer {
Initializer() {
::grpc::ServerBuilder::InternalAddPluginFactory(
&grpc::channelz::experimental::CreateChannelzServicePlugin);
}
} initialize;
-}
-
-} // namespace experimental
-} // namespace channelz
+}
+
+} // namespace experimental
+} // namespace channelz
} // namespace grpc_impl
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
index 5f70ce05406..3cc508d0cbf 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
@@ -25,191 +25,191 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/method_handler.h>
-#include "src/cpp/server/health/default_health_check_service.h"
+#include "src/cpp/server/health/default_health_check_service.h"
#include "src/proto/grpc/health/v1/health.upb.h"
#include "upb/upb.hpp"
#define MAX_SERVICE_NAME_LENGTH 200
namespace grpc {
-
-//
-// DefaultHealthCheckService
-//
-
-DefaultHealthCheckService::DefaultHealthCheckService() {
- services_map_[""].SetServingStatus(SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(
+
+//
+// DefaultHealthCheckService
+//
+
+DefaultHealthCheckService::DefaultHealthCheckService() {
+ services_map_[""].SetServingStatus(SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(
const TString& service_name, bool serving) {
grpc_core::MutexLock lock(&mu_);
- if (shutdown_) {
- // Set to NOT_SERVING in case service_name is not in the map.
- serving = false;
- }
- services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(bool serving) {
- const ServingStatus status = serving ? SERVING : NOT_SERVING;
+ if (shutdown_) {
+ // Set to NOT_SERVING in case service_name is not in the map.
+ serving = false;
+ }
+ services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+ const ServingStatus status = serving ? SERVING : NOT_SERVING;
grpc_core::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
- for (auto& p : services_map_) {
- ServiceData& service_data = p.second;
- service_data.SetServingStatus(status);
- }
-}
-
-void DefaultHealthCheckService::Shutdown() {
+ if (shutdown_) {
+ return;
+ }
+ for (auto& p : services_map_) {
+ ServiceData& service_data = p.second;
+ service_data.SetServingStatus(status);
+ }
+}
+
+void DefaultHealthCheckService::Shutdown() {
grpc_core::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
- shutdown_ = true;
- for (auto& p : services_map_) {
- ServiceData& service_data = p.second;
- service_data.SetServingStatus(NOT_SERVING);
- }
-}
-
-DefaultHealthCheckService::ServingStatus
-DefaultHealthCheckService::GetServingStatus(
+ if (shutdown_) {
+ return;
+ }
+ shutdown_ = true;
+ for (auto& p : services_map_) {
+ ServiceData& service_data = p.second;
+ service_data.SetServingStatus(NOT_SERVING);
+ }
+}
+
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
const TString& service_name) const {
grpc_core::MutexLock lock(&mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) {
- return NOT_FOUND;
- }
- const ServiceData& service_data = it->second;
- return service_data.GetServingStatus();
-}
-
-void DefaultHealthCheckService::RegisterCallHandler(
+ auto it = services_map_.find(service_name);
+ if (it == services_map_.end()) {
+ return NOT_FOUND;
+ }
+ const ServiceData& service_data = it->second;
+ return service_data.GetServingStatus();
+}
+
+void DefaultHealthCheckService::RegisterCallHandler(
const TString& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
grpc_core::MutexLock lock(&mu_);
- ServiceData& service_data = services_map_[service_name];
- service_data.AddCallHandler(handler /* copies ref */);
- HealthCheckServiceImpl::CallHandler* h = handler.get();
- h->SendHealth(std::move(handler), service_data.GetServingStatus());
-}
-
-void DefaultHealthCheckService::UnregisterCallHandler(
+ ServiceData& service_data = services_map_[service_name];
+ service_data.AddCallHandler(handler /* copies ref */);
+ HealthCheckServiceImpl::CallHandler* h = handler.get();
+ h->SendHealth(std::move(handler), service_data.GetServingStatus());
+}
+
+void DefaultHealthCheckService::UnregisterCallHandler(
const TString& service_name,
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
grpc_core::MutexLock lock(&mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) return;
- ServiceData& service_data = it->second;
- service_data.RemoveCallHandler(handler);
- if (service_data.Unused()) {
- services_map_.erase(it);
- }
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl*
-DefaultHealthCheckService::GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq) {
- GPR_ASSERT(impl_ == nullptr);
- impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
- return impl_.get();
-}
-
-//
-// DefaultHealthCheckService::ServiceData
-//
-
-void DefaultHealthCheckService::ServiceData::SetServingStatus(
- ServingStatus status) {
- status_ = status;
- for (auto& call_handler : call_handlers_) {
- call_handler->SendHealth(call_handler /* copies ref */, status);
- }
-}
-
-void DefaultHealthCheckService::ServiceData::AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- call_handlers_.insert(std::move(handler));
-}
-
-void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
- call_handlers_.erase(handler);
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl
-//
-
+ auto it = services_map_.find(service_name);
+ if (it == services_map_.end()) return;
+ ServiceData& service_data = it->second;
+ service_data.RemoveCallHandler(handler);
+ if (service_data.Unused()) {
+ services_map_.erase(it);
+ }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq) {
+ GPR_ASSERT(impl_ == nullptr);
+ impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
+ return impl_.get();
+}
+
+//
+// DefaultHealthCheckService::ServiceData
+//
+
+void DefaultHealthCheckService::ServiceData::SetServingStatus(
+ ServingStatus status) {
+ status_ = status;
+ for (auto& call_handler : call_handlers_) {
+ call_handler->SendHealth(call_handler /* copies ref */, status);
+ }
+}
+
+void DefaultHealthCheckService::ServiceData::AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ call_handlers_.insert(std::move(handler));
+}
+
+void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
+ call_handlers_.erase(handler);
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl
+//
+
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
-const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
+const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
- DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq)
- : database_(database), cq_(std::move(cq)) {
- // Add Check() method.
- AddMethod(new internal::RpcServiceMethod(
- kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
- // Add Watch() method.
- AddMethod(new internal::RpcServiceMethod(
- kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
- // Create serving thread.
- thread_ = std::unique_ptr<::grpc_core::Thread>(
- new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
+ DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : database_(database), cq_(std::move(cq)) {
+ // Add Check() method.
+ AddMethod(new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
+ // Add Watch() method.
+ AddMethod(new internal::RpcServiceMethod(
+ kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
+ // Create serving thread.
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
}
-DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
- // We will reach here after the server starts shutting down.
- shutdown_ = true;
- {
+DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
grpc_core::MutexLock lock(&cq_shutdown_mu_);
- cq_->Shutdown();
- }
- thread_->Join();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
- // Request the calls we're interested in.
- // We do this before starting the serving thread, so that we know it's
- // done before server startup is complete.
- CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
- WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
- // Start serving thread.
- thread_->Start();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
+ cq_->Shutdown();
+ }
+ thread_->Join();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
+ // Request the calls we're interested in.
+ // We do this before starting the serving thread, so that we know it's
+ // done before server startup is complete.
+ CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
+ WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
+ // Start serving thread.
+ thread_->Start();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
- void* tag;
- bool ok;
- while (true) {
- if (!service->cq_->Next(&tag, &ok)) {
- // The completion queue is shutting down.
- GPR_ASSERT(service->shutdown_);
- break;
- }
- auto* next_step = static_cast<CallableTag*>(tag);
- next_step->Run(ok);
- }
-}
-
-bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+}
+
+bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
const ByteBuffer& request, TString* service_name) {
std::vector<Slice> slices;
- if (!request.Dump(&slices).ok()) return false;
+ if (!request.Dump(&slices).ok()) return false;
uint8_t* request_bytes = nullptr;
size_t request_size = 0;
- if (slices.size() == 1) {
+ if (slices.size() == 1) {
request_bytes = const_cast<uint8_t*>(slices[0].begin());
request_size = slices[0].size();
- } else if (slices.size() > 1) {
- request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
+ } else if (slices.size() > 1) {
+ request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
memcpy(copy_to, slices[i].begin(), slices[i].size());
@@ -220,8 +220,8 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_parse(
reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
- if (slices.size() > 1) {
- gpr_free(request_bytes);
+ if (slices.size() > 1) {
+ gpr_free(request_bytes);
}
if (request_struct == nullptr) {
return false;
@@ -232,17 +232,17 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
return false;
}
service_name->assign(service.data, service.size);
- return true;
-}
+ return true;
+}
-bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
- ServingStatus status, ByteBuffer* response) {
+bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
+ ServingStatus status, ByteBuffer* response) {
upb::Arena arena;
grpc_health_v1_HealthCheckResponse* response_struct =
grpc_health_v1_HealthCheckResponse_new(arena.ptr());
grpc_health_v1_HealthCheckResponse_set_status(
response_struct,
- status == NOT_FOUND
+ status == NOT_FOUND
? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
: status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
: grpc_health_v1_HealthCheckResponse_NOT_SERVING);
@@ -256,249 +256,249 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
- return true;
+ return true;
}
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<CheckCallHandler>(cq, database, service);
- CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
- {
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service) {
+ std::shared_ptr<CallHandler> self =
+ std::make_shared<CheckCallHandler>(cq, database, service);
+ CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
+ {
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request a Check() call.
- handler->next_ =
- CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
- &handler->writer_, cq, cq, &handler->next_);
- }
+ if (service->shutdown_) return;
+ // Request a Check() call.
+ handler->next_ =
+ CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
+ &handler->writer_, cq, cq, &handler->next_);
+ }
}
-DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- // The value of ok being false means that the server is shutting down.
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Process request.
- gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
- this);
+DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service)
+ : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ // The value of ok being false means that the server is shutting down.
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, database_, service_);
+ // Process request.
+ gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
+ this);
TString service_name;
- grpc::Status status = Status::OK;
- ByteBuffer response;
- if (!service_->DecodeRequest(request_, &service_name)) {
- status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
- } else {
- ServingStatus serving_status = database_->GetServingStatus(service_name);
- if (serving_status == NOT_FOUND) {
- status = Status(StatusCode::NOT_FOUND, "service name unknown");
- } else if (!service_->EncodeResponse(serving_status, &response)) {
- status = Status(StatusCode::INTERNAL, "could not encode response");
- }
- }
- // Send response.
- {
+ grpc::Status status = Status::OK;
+ ByteBuffer response;
+ if (!service_->DecodeRequest(request_, &service_name)) {
+ status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
+ } else {
+ ServingStatus serving_status = database_->GetServingStatus(service_name);
+ if (serving_status == NOT_FOUND) {
+ status = Status(StatusCode::NOT_FOUND, "service name unknown");
+ } else if (!service_->EncodeResponse(serving_status, &response)) {
+ status = Status(StatusCode::INTERNAL, "could not encode response");
+ }
+ }
+ // Send response.
+ {
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
- if (!service_->shutdown_) {
- next_ =
- CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- if (status.ok()) {
- writer_.Finish(response, status, &next_);
- } else {
- writer_.FinishWithError(status, &next_);
- }
- }
- }
+ if (!service_->shutdown_) {
+ next_ =
+ CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ if (status.ok()) {
+ writer_.Finish(response, status, &next_);
+ } else {
+ writer_.FinishWithError(status, &next_);
+ }
+ }
+ }
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
- service_, this);
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
+ service_, this);
}
- self.reset(); // To appease clang-tidy.
+ self.reset(); // To appease clang-tidy.
}
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<WatchCallHandler>(cq, database, service);
- WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
- {
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service) {
+ std::shared_ptr<CallHandler> self =
+ std::make_shared<WatchCallHandler>(cq, database, service);
+ WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
+ {
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request AsyncNotifyWhenDone().
- handler->on_done_notified_ =
- CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
- std::placeholders::_1, std::placeholders::_2),
- self /* copies ref */);
- handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
- // Request a Watch() call.
- handler->next_ =
- CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
- &handler->stream_, cq, cq,
- &handler->next_);
+ if (service->shutdown_) return;
+ // Request AsyncNotifyWhenDone().
+ handler->on_done_notified_ =
+ CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ self /* copies ref */);
+ handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
+ // Request a Watch() call.
+ handler->next_ =
+ CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
+ &handler->stream_, cq, cq,
+ &handler->next_);
}
}
-DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- // Server shutting down.
- //
- // AsyncNotifyWhenDone() needs to be called before the call starts, but the
- // tag will not pop out if the call never starts (
- // https://github.com/grpc/grpc/issues/10136). So we need to manually
- // release the ownership of the handler in this case.
- GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Parse request.
- if (!service_->DecodeRequest(request_, &service_name_)) {
- SendFinish(std::move(self),
- Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
- return;
- }
- // Register the call for updates to the service.
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
- service_, service_name_.c_str(), this);
- database_->RegisterCallHandler(service_name_, std::move(self));
+DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service)
+ : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ // Server shutting down.
+ //
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, database_, service_);
+ // Parse request.
+ if (!service_->DecodeRequest(request_, &service_name_)) {
+ SendFinish(std::move(self),
+ Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
+ return;
+ }
+ // Register the call for updates to the service.
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
+ service_, service_name_.c_str(), this);
+ database_->RegisterCallHandler(service_name_, std::move(self));
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
grpc_core::MutexLock lock(&send_mu_);
- // If there's already a send in flight, cache the new status, and
- // we'll start a new send for it when the one in flight completes.
- if (send_in_flight_) {
- pending_status_ = status;
- return;
- }
- // Start a send.
- SendHealthLocked(std::move(self), status);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
- send_in_flight_ = true;
- // Construct response.
- ByteBuffer response;
- bool success = service_->EncodeResponse(status, &response);
- // Grab shutdown lock and send response.
+ // If there's already a send in flight, cache the new status, and
+ // we'll start a new send for it when the one in flight completes.
+ if (send_in_flight_) {
+ pending_status_ = status;
+ return;
+ }
+ // Start a send.
+ SendHealthLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
+ send_in_flight_ = true;
+ // Construct response.
+ ByteBuffer response;
+ bool success = service_->EncodeResponse(status, &response);
+ // Grab shutdown lock and send response.
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
- if (service_->shutdown_) {
- SendFinishLocked(std::move(self), Status::CANCELLED);
- return;
- }
- if (!success) {
- SendFinishLocked(std::move(self),
- Status(StatusCode::INTERNAL, "could not encode response"));
- return;
- }
- next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Write(response, &next_);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- SendFinish(std::move(self), Status::CANCELLED);
- return;
- }
+ if (service_->shutdown_) {
+ SendFinishLocked(std::move(self), Status::CANCELLED);
+ return;
+ }
+ if (!success) {
+ SendFinishLocked(std::move(self),
+ Status(StatusCode::INTERNAL, "could not encode response"));
+ return;
+ }
+ next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ SendFinish(std::move(self), Status::CANCELLED);
+ return;
+ }
grpc_core::MutexLock lock(&send_mu_);
- send_in_flight_ = false;
- // If we got a new status since we started the last send, start a
- // new send for it.
- if (pending_status_ != NOT_FOUND) {
- auto status = pending_status_;
- pending_status_ = NOT_FOUND;
- SendHealthLocked(std::move(self), status);
- }
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
- if (finish_called_) return;
+ send_in_flight_ = false;
+ // If we got a new status since we started the last send, start a
+ // new send for it.
+ if (pending_status_ != NOT_FOUND) {
+ auto status = pending_status_;
+ pending_status_ = NOT_FOUND;
+ SendHealthLocked(std::move(self), status);
+ }
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
+ if (finish_called_) return;
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
- if (service_->shutdown_) return;
- SendFinishLocked(std::move(self), status);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Finish(status, &on_finish_done_);
- finish_called_ = true;
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health watch call finished (service_name: \"%s\", "
- "handler: %p).",
- service_, service_name_.c_str(), this);
- }
- self.reset(); // To appease clang-tidy.
-}
-
-// TODO(roth): This method currently assumes that there will be only one
-// thread polling the cq and invoking the corresponding callbacks. If
-// that changes, we will need to add synchronization here.
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
- GPR_ASSERT(ok);
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health watch call is notified done (handler: %p, "
- "is_cancelled: %d).",
- service_, this, static_cast<int>(ctx_.IsCancelled()));
- database_->UnregisterCallHandler(service_name_, self);
- SendFinish(std::move(self), Status::CANCELLED);
-}
-
+ if (service_->shutdown_) return;
+ SendFinishLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
+ on_finish_done_ =
+ CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Finish(status, &on_finish_done_);
+ finish_called_ = true;
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch call finished (service_name: \"%s\", "
+ "handler: %p).",
+ service_, service_name_.c_str(), this);
+ }
+ self.reset(); // To appease clang-tidy.
+}
+
+// TODO(roth): This method currently assumes that there will be only one
+// thread polling the cq and invoking the corresponding callbacks. If
+// that changes, we will need to add synchronization here.
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(ctx_.IsCancelled()));
+ database_->UnregisterCallHandler(service_name_, self);
+ SendFinish(std::move(self), Status::CANCELLED);
+}
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
index 5da0ef935a4..9da1dfc15fa 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
@@ -19,260 +19,260 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
-#include <atomic>
-#include <set>
+#include <atomic>
+#include <set>
-#include <grpc/support/log.h>
-#include <grpcpp/grpcpp.h>
+#include <grpc/support/log.h>
+#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
-#include <grpcpp/impl/codegen/async_generic_service.h>
-#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/sync.h"
-#include "src/core/lib/gprpp/thd.h"
-
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
- enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
-
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
- // Base class for call handlers.
- class CallHandler {
- public:
- virtual ~CallHandler() = default;
- virtual void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) = 0;
- };
-
- HealthCheckServiceImpl(DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq);
-
- ~HealthCheckServiceImpl();
-
- void StartServingThread();
-
+ // Base class for call handlers.
+ class CallHandler {
+ public:
+ virtual ~CallHandler() = default;
+ virtual void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) = 0;
+ };
+
+ HealthCheckServiceImpl(DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq);
+
+ ~HealthCheckServiceImpl();
+
+ void StartServingThread();
+
private:
- // A tag that can be called with a bool argument. It's tailored for
- // CallHandler's use. Before being used, it should be constructed with a
- // method of CallHandler and a shared pointer to the handler. The
- // shared pointer will be moved to the invoked function and the function
- // can only be invoked once. That makes ref counting of the handler easier,
- // because the shared pointer is not bound to the function and can be gone
- // once the invoked function returns (if not used any more).
- class CallableTag {
- public:
- using HandlerFunction =
- std::function<void(std::shared_ptr<CallHandler>, bool)>;
-
- CallableTag() {}
-
- CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
- : handler_function_(std::move(func)), handler_(std::move(handler)) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- }
-
- // Runs the tag. This should be called only once. The handler is no
- // longer owned by this tag after this method is invoked.
- void Run(bool ok) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- handler_function_(std::move(handler_), ok);
- }
-
- // Releases and returns the shared pointer to the handler.
- std::shared_ptr<CallHandler> ReleaseHandler() {
- return std::move(handler_);
- }
-
- private:
- HandlerFunction handler_function_ = nullptr;
- std::shared_ptr<CallHandler> handler_;
- };
-
- // Call handler for Check method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class CheckCallHandler : public CallHandler {
- public:
- // Instantiates a CheckCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // Not used for Check.
+ // A tag that can be called with a bool argument. It's tailored for
+ // CallHandler's use. Before being used, it should be constructed with a
+ // method of CallHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function
+ // can only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<CallHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no
+ // longer owned by this tag after this method is invoked.
+ void Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+ }
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<CallHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<CallHandler> handler_;
+ };
+
+ // Call handler for Check method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class CheckCallHandler : public CallHandler {
+ public:
+ // Instantiates a CheckCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // Not used for Check.
void SendHealth(std::shared_ptr<CallHandler> /*self*/,
ServingStatus /*status*/) override {}
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
- GenericServerAsyncResponseWriter writer_;
- ServerContext ctx_;
-
- CallableTag next_;
- };
-
- // Call handler for Watch method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class WatchCallHandler : public CallHandler {
- public:
- // Instantiates a WatchCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) override;
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Requires holding send_mu_.
- void SendHealthLocked(std::shared_ptr<CallHandler> self,
- ServingStatus status);
-
- // When sending a health result finishes.
- void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
-
- void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
-
- // Requires holding service_->cq_shutdown_mu_.
- void SendFinishLocked(std::shared_ptr<CallHandler> self,
- const Status& status);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when AsyncNotifyWhenDone() notifies us.
- void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
+ GenericServerAsyncResponseWriter writer_;
+ ServerContext ctx_;
+
+ CallableTag next_;
+ };
+
+ // Call handler for Watch method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class WatchCallHandler : public CallHandler {
+ public:
+ // Instantiates a WatchCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) override;
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Requires holding send_mu_.
+ void SendHealthLocked(std::shared_ptr<CallHandler> self,
+ ServingStatus status);
+
+ // When sending a health result finishes.
+ void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
+
+ // Requires holding service_->cq_shutdown_mu_.
+ void SendFinishLocked(std::shared_ptr<CallHandler> self,
+ const Status& status);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
TString service_name_;
- GenericServerAsyncWriter stream_;
- ServerContext ctx_;
-
+ GenericServerAsyncWriter stream_;
+ ServerContext ctx_;
+
grpc_core::Mutex send_mu_;
- bool send_in_flight_ = false; // Guarded by mu_.
- ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
-
- bool finish_called_ = false;
- CallableTag next_;
- CallableTag on_done_notified_;
- CallableTag on_finish_done_;
- };
-
- // Handles the incoming requests and drives the completion queue in a loop.
- static void Serve(void* arg);
-
- // Returns true on success.
- static bool DecodeRequest(const ByteBuffer& request,
+ bool send_in_flight_ = false; // Guarded by mu_.
+ ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
+
+ bool finish_called_ = false;
+ CallableTag next_;
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Serve(void* arg);
+
+ // Returns true on success.
+ static bool DecodeRequest(const ByteBuffer& request,
TString* service_name);
- static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
-
- // Needed to appease Windows compilers, which don't seem to allow
- // nested classes to access protected members in the parent's
- // superclass.
- using Service::RequestAsyncServerStreaming;
- using Service::RequestAsyncUnary;
-
- DefaultHealthCheckService* database_;
- std::unique_ptr<ServerCompletionQueue> cq_;
-
- // To synchronize the operations related to shutdown state of cq_, so that
- // we don't enqueue new tags into cq_ after it is already shut down.
+ static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
+
+ // Needed to appease Windows compilers, which don't seem to allow
+ // nested classes to access protected members in the parent's
+ // superclass.
+ using Service::RequestAsyncServerStreaming;
+ using Service::RequestAsyncUnary;
+
+ DefaultHealthCheckService* database_;
+ std::unique_ptr<ServerCompletionQueue> cq_;
+
+ // To synchronize the operations related to shutdown state of cq_, so that
+ // we don't enqueue new tags into cq_ after it is already shut down.
grpc_core::Mutex cq_shutdown_mu_;
- std::atomic_bool shutdown_{false};
- std::unique_ptr<::grpc_core::Thread> thread_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
};
DefaultHealthCheckService();
-
+
void SetServingStatus(const TString& service_name, bool serving) override;
void SetServingStatus(bool serving) override;
-
- void Shutdown() override;
-
+
+ void Shutdown() override;
+
ServingStatus GetServingStatus(const TString& service_name) const;
- HealthCheckServiceImpl* GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq);
-
+ HealthCheckServiceImpl* GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq);
+
private:
- // Stores the current serving status of a service and any call
- // handlers registered for updates when the service's status changes.
- class ServiceData {
- public:
- void SetServingStatus(ServingStatus status);
- ServingStatus GetServingStatus() const { return status_; }
- void AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
- void RemoveCallHandler(
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
- bool Unused() const {
- return call_handlers_.empty() && status_ == NOT_FOUND;
- }
-
- private:
- ServingStatus status_ = NOT_FOUND;
- std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
- call_handlers_;
- };
-
- void RegisterCallHandler(
+ // Stores the current serving status of a service and any call
+ // handlers registered for updates when the service's status changes.
+ class ServiceData {
+ public:
+ void SetServingStatus(ServingStatus status);
+ ServingStatus GetServingStatus() const { return status_; }
+ void AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ void RemoveCallHandler(
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
+ bool Unused() const {
+ return call_handlers_.empty() && status_ == NOT_FOUND;
+ }
+
+ private:
+ ServingStatus status_ = NOT_FOUND;
+ std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
+ call_handlers_;
+ };
+
+ void RegisterCallHandler(
const TString& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
-
- void UnregisterCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
+ void UnregisterCallHandler(
const TString& service_name,
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
-
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
+
mutable grpc_core::Mutex mu_;
bool shutdown_ = false; // Guarded by mu_.
std::map<TString, ServiceData> services_map_; // Guarded by mu_.
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
index eabed9711ec..00ad794a049 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
@@ -1,81 +1,81 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
-#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
-
-#include <grpc/impl/codegen/port_platform.h>
-
-namespace grpc {
-namespace load_reporter {
-
-// TODO(juanlishen): Update the version number with the PR number every time
-// there is any change to the server load reporter.
-constexpr uint32_t kVersion = 15853;
-
-// TODO(juanlishen): This window size is from the internal spec for the load
-// reporter. Need to ask the gRPC LB team whether we should make this and the
-// fetching interval configurable.
-constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
-constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
-
-constexpr size_t kLbIdLength = 8;
-constexpr size_t kIpv4AddressLength = 8;
-constexpr size_t kIpv6AddressLength = 32;
-
-constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
-
-// Call statuses.
-
-constexpr char kCallStatusOk[] = "OK";
-constexpr char kCallStatusServerError[] = "5XX";
-constexpr char kCallStatusClientError[] = "4XX";
-
-// Tag keys.
-
-constexpr char kTagKeyToken[] = "token";
-constexpr char kTagKeyHost[] = "host";
-constexpr char kTagKeyUserId[] = "user_id";
-constexpr char kTagKeyStatus[] = "status";
-constexpr char kTagKeyMetricName[] = "metric_name";
-
-// Measure names.
-
-constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
-constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
-constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
-constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
-constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
-constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
-
-// View names.
-
-constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
-constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
-constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
-constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
-constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
-constexpr char kViewOtherCallMetricCount[] =
- "grpc.io/lb_view/other_call_metric_count";
-constexpr char kViewOtherCallMetricValue[] =
- "grpc.io/lb_view/other_call_metric_value";
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+namespace grpc {
+namespace load_reporter {
+
+// TODO(juanlishen): Update the version number with the PR number every time
+// there is any change to the server load reporter.
+constexpr uint32_t kVersion = 15853;
+
+// TODO(juanlishen): This window size is from the internal spec for the load
+// reporter. Need to ask the gRPC LB team whether we should make this and the
+// fetching interval configurable.
+constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
+constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
+
+constexpr size_t kLbIdLength = 8;
+constexpr size_t kIpv4AddressLength = 8;
+constexpr size_t kIpv6AddressLength = 32;
+
+constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
+
+// Call statuses.
+
+constexpr char kCallStatusOk[] = "OK";
+constexpr char kCallStatusServerError[] = "5XX";
+constexpr char kCallStatusClientError[] = "4XX";
+
+// Tag keys.
+
+constexpr char kTagKeyToken[] = "token";
+constexpr char kTagKeyHost[] = "host";
+constexpr char kTagKeyUserId[] = "user_id";
+constexpr char kTagKeyStatus[] = "status";
+constexpr char kTagKeyMetricName[] = "metric_name";
+
+// Measure names.
+
+constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
+constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
+constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
+constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
+constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
+constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
+
+// View names.
+
+constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
+constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
+constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
+constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
+constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
+constexpr char kViewOtherCallMetricCount[] =
+ "grpc.io/lb_view/other_call_metric_count";
+constexpr char kViewOtherCallMetricValue[] =
+ "grpc.io/lb_view/other_call_metric_value";
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
index 8544b054177..f514b0752f7 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
@@ -1,36 +1,36 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
-#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
-
-#include <grpc/impl/codegen/port_platform.h>
-
-#include <utility>
-
-namespace grpc {
-namespace load_reporter {
-
-// Reads the CPU stats (in a pair of busy and total numbers) from the system.
-// The units of the stats should be the same.
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <utility>
+
+namespace grpc {
+namespace load_reporter {
+
+// Reads the CPU stats (in a pair of busy and total numbers) from the system.
+// The units of the stats should be the same.
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
index 8565d384a81..561d4f50482 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
@@ -1,48 +1,48 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_LINUX
-
-#include <cstdio>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- FILE* fp;
- fp = fopen("/proc/stat", "r");
- uint64_t user, nice, system, idle;
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_LINUX
+
+#include <cstdio>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILE* fp;
+ fp = fopen("/proc/stat", "r");
+ uint64_t user, nice, system, idle;
if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) {
// Something bad happened with the information, so assume it's all invalid
user = nice = system = idle = 0;
}
- fclose(fp);
- busy = user + nice + system;
- total = busy + idle;
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GPR_LINUX
+ fclose(fp);
+ busy = user + nice + system;
+ total = busy + idle;
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_LINUX
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
index 125631a3d1e..dbdde304c20 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
@@ -1,45 +1,45 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_APPLE
-
-#include <mach/mach.h>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- host_cpu_load_info_data_t cpuinfo;
- mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
- if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
- (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
- for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
- busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
- }
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GPR_APPLE
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_APPLE
+
+#include <mach/mach.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ host_cpu_load_info_data_t cpuinfo;
+ mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
+ if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
+ (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
+ for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
+ busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_APPLE
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
index e2d61859c8e..80fb8b6da1f 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
@@ -1,40 +1,40 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
-
-#include <grpc/support/log.h>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- gpr_log(GPR_ERROR,
- "Platforms other than Linux, Windows, and MacOS are not supported.");
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
+
+#include <grpc/support/log.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ gpr_log(GPR_ERROR,
+ "Platforms other than Linux, Windows, and MacOS are not supported.");
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
index bc5718a0568..0a98e848a2c 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
@@ -1,55 +1,55 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_WINDOWS
-
-#include <windows.h>
-#include <cstdint>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-namespace {
-
-uint64_t FiletimeToInt(const FILETIME& ft) {
- ULARGE_INTEGER i;
- i.LowPart = ft.dwLowDateTime;
- i.HighPart = ft.dwHighDateTime;
- return i.QuadPart;
-}
-
-} // namespace
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- FILETIME idle, kernel, user;
- if (GetSystemTimes(&idle, &kernel, &user) != 0) {
- total = FiletimeToInt(kernel) + FiletimeToInt(user);
- busy = total - FiletimeToInt(idle);
- }
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GPR_WINDOWS
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS
+
+#include <windows.h>
+#include <cstdint>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+namespace {
+
+uint64_t FiletimeToInt(const FILETIME& ft) {
+ ULARGE_INTEGER i;
+ i.LowPart = ft.dwLowDateTime;
+ i.HighPart = ft.dwHighDateTime;
+ return i.QuadPart;
+}
+
+} // namespace
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILETIME idle, kernel, user;
+ if (GetSystemTimes(&idle, &kernel, &user) != 0) {
+ total = FiletimeToInt(kernel) + FiletimeToInt(user);
+ busy = total - FiletimeToInt(idle);
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_WINDOWS
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
index 12e8203fe1b..f07fa812a7d 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
@@ -16,15 +16,15 @@
*
*/
-#include <grpc/impl/codegen/port_platform.h>
-
-#include <stdio.h>
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
#include <cstdlib>
#include <set>
#include <unordered_map>
#include <vector>
-#include "src/core/lib/iomgr/socket_utils.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
namespace grpc {
@@ -79,65 +79,65 @@ const typename C::value_type* RandomElement(const C& container) {
LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token,
TString user_id)
- : user_id_(std::move(user_id)) {
- GPR_ASSERT(client_ip_and_token.size() >= 2);
- int ip_hex_size;
- GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
- &ip_hex_size) == 1);
- GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
- ip_hex_size == kIpv6AddressLength);
- size_t cur_pos = 2;
- client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
- cur_pos += ip_hex_size;
- if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
- lb_id_ = kInvalidLbId;
- lb_tag_ = "";
- } else {
- lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
- lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
- }
-}
-
+ : user_id_(std::move(user_id)) {
+ GPR_ASSERT(client_ip_and_token.size() >= 2);
+ int ip_hex_size;
+ GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
+ &ip_hex_size) == 1);
+ GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
+ ip_hex_size == kIpv6AddressLength);
+ size_t cur_pos = 2;
+ client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
+ cur_pos += ip_hex_size;
+ if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
+ lb_id_ = kInvalidLbId;
+ lb_tag_ = "";
+ } else {
+ lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
+ lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
+ }
+}
+
TString LoadRecordKey::GetClientIpBytes() const {
- if (client_ip_hex_.empty()) {
- return "";
- } else if (client_ip_hex_.size() == kIpv4AddressLength) {
- uint32_t ip_bytes;
- if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
- gpr_log(GPR_ERROR,
- "Can't parse client IP (%s) from a hex string to an integer.",
- client_ip_hex_.c_str());
- return "";
- }
- ip_bytes = grpc_htonl(ip_bytes);
+ if (client_ip_hex_.empty()) {
+ return "";
+ } else if (client_ip_hex_.size() == kIpv4AddressLength) {
+ uint32_t ip_bytes;
+ if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
+ gpr_log(GPR_ERROR,
+ "Can't parse client IP (%s) from a hex string to an integer.",
+ client_ip_hex_.c_str());
+ return "";
+ }
+ ip_bytes = grpc_htonl(ip_bytes);
return TString(reinterpret_cast<const char*>(&ip_bytes),
sizeof(ip_bytes));
- } else if (client_ip_hex_.size() == kIpv6AddressLength) {
- uint32_t ip_bytes[4];
- for (size_t i = 0; i < 4; ++i) {
- if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
- ip_bytes + i) != 1) {
- gpr_log(
- GPR_ERROR,
- "Can't parse client IP part (%s) from a hex string to an integer.",
- client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
- return "";
- }
- ip_bytes[i] = grpc_htonl(ip_bytes[i]);
- }
+ } else if (client_ip_hex_.size() == kIpv6AddressLength) {
+ uint32_t ip_bytes[4];
+ for (size_t i = 0; i < 4; ++i) {
+ if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
+ ip_bytes + i) != 1) {
+ gpr_log(
+ GPR_ERROR,
+ "Can't parse client IP part (%s) from a hex string to an integer.",
+ client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
+ return "";
+ }
+ ip_bytes[i] = grpc_htonl(ip_bytes[i]);
+ }
return TString(reinterpret_cast<const char*>(ip_bytes),
sizeof(ip_bytes));
- } else {
- GPR_UNREACHABLE_CODE(return "");
- }
-}
-
+ } else {
+ GPR_UNREACHABLE_CODE(return "");
+ }
+}
+
LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls,
- double total_metric_value) {
- call_metrics_.emplace(std::move(metric_name),
- CallMetricValue(num_calls, total_metric_value));
-}
-
+ double total_metric_value) {
+ call_metrics_.emplace(std::move(metric_name),
+ CallMetricValue(num_calls, total_metric_value));
+}
+
void PerBalancerStore::MergeRow(const LoadRecordKey& key,
const LoadRecordValue& value) {
// During suspension, the load data received will be dropped.
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
index 7047488c0e8..61ba618331a 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
@@ -28,8 +28,8 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
-#include "src/cpp/server/load_reporter/constants.h"
-
+#include "src/cpp/server/load_reporter/constants.h"
+
#include <util/string/cast.h>
namespace grpc {
@@ -76,9 +76,9 @@ class LoadRecordKey {
user_id_(std::move(user_id)),
client_ip_hex_(std::move(client_ip_hex)) {}
- // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
+ // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
LoadRecordKey(const TString& client_ip_and_token, TString user_id);
-
+
TString ToString() const {
return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
@@ -90,9 +90,9 @@ class LoadRecordKey {
user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
}
- // Gets the client IP bytes in network order (i.e., big-endian).
+ // Gets the client IP bytes in network order (i.e., big-endian).
TString GetClientIpBytes() const;
-
+
// Getters.
const TString& lb_id() const { return lb_id_; }
const TString& lb_tag() const { return lb_tag_; }
@@ -126,8 +126,8 @@ class LoadRecordKey {
class LoadRecordValue {
public:
explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
- uint64_t error_count = 0, uint64_t bytes_sent = 0,
- uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
+ uint64_t error_count = 0, uint64_t bytes_sent = 0,
+ uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
: start_count_(start_count),
ok_count_(ok_count),
error_count_(error_count),
@@ -136,8 +136,8 @@ class LoadRecordValue {
latency_ms_(latency_ms) {}
LoadRecordValue(TString metric_name, uint64_t num_calls,
- double total_metric_value);
-
+ double total_metric_value);
+
void MergeFrom(const LoadRecordValue& other) {
start_count_ += other.start_count_;
ok_count_ += other.ok_count_;
@@ -175,9 +175,9 @@ class LoadRecordValue {
uint64_t start_count() const { return start_count_; }
uint64_t ok_count() const { return ok_count_; }
uint64_t error_count() const { return error_count_; }
- uint64_t bytes_sent() const { return bytes_sent_; }
- uint64_t bytes_recv() const { return bytes_recv_; }
- uint64_t latency_ms() const { return latency_ms_; }
+ uint64_t bytes_sent() const { return bytes_sent_; }
+ uint64_t bytes_recv() const { return bytes_recv_; }
+ uint64_t latency_ms() const { return latency_ms_; }
const std::unordered_map<TString, CallMetricValue>& call_metrics() const {
return call_metrics_;
}
@@ -186,9 +186,9 @@ class LoadRecordValue {
uint64_t start_count_ = 0;
uint64_t ok_count_ = 0;
uint64_t error_count_ = 0;
- uint64_t bytes_sent_ = 0;
- uint64_t bytes_recv_ = 0;
- uint64_t latency_ms_ = 0;
+ uint64_t bytes_sent_ = 0;
+ uint64_t bytes_recv_ = 0;
+ uint64_t latency_ms_ = 0;
std::unordered_map<TString, CallMetricValue> call_metrics_;
};
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
index a57ddc47150..24ad9f3f248 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
@@ -1,47 +1,47 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/impl/codegen/port_platform.h>
-
-#include <grpcpp/ext/server_load_reporting.h>
-
-#include <cmath>
-
-#include <grpc/support/log.h>
-
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include <cmath>
+
+#include <grpc/support/log.h>
+
namespace grpc {
-namespace load_reporter {
-namespace experimental {
-
-void AddLoadReportingCost(grpc::ServerContext* ctx,
+namespace load_reporter {
+namespace experimental {
+
+void AddLoadReportingCost(grpc::ServerContext* ctx,
const TString& cost_name, double cost_value) {
- if (std::isnormal(cost_value)) {
+ if (std::isnormal(cost_value)) {
TString buf;
- buf.resize(sizeof(cost_value) + cost_name.size());
- memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
- memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
- cost_name.size());
- ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
- } else {
- gpr_log(GPR_ERROR, "Call metric value is not normal.");
- }
-}
-
-} // namespace experimental
-} // namespace load_reporter
+ buf.resize(sizeof(cost_value) + cost_name.size());
+ memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
+ memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
+ cost_name.size());
+ ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
+ } else {
+ gpr_log(GPR_ERROR, "Call metric value is not normal.");
+ }
+}
+
+} // namespace experimental
+} // namespace load_reporter
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
index 3b734535157..732602bcb70 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
@@ -44,10 +44,10 @@ void AuthMetadataProcessorAyncWrapper::Process(
return;
}
if (w->processor_->IsBlocking()) {
- w->thread_pool_->Add([w, context, md, num_md, cb, user_data] {
- w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md,
- cb, user_data);
- });
+ w->thread_pool_->Add([w, context, md, num_md, cb, user_data] {
+ w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md,
+ cb, user_data);
+ });
} else {
// invoke directly.
w->InvokeProcessor(context, md, num_md, cb, user_data);
@@ -62,7 +62,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key),
StringRefFromSlice(&md[i].value)));
}
- SecureAuthContext context(ctx);
+ SecureAuthContext context(ctx);
AuthMetadataProcessor::OutputMetadata consumed_metadata;
AuthMetadataProcessor::OutputMetadata response_metadata;
@@ -138,12 +138,12 @@ std::shared_ptr<ServerCredentials> AltsServerCredentials(
new SecureServerCredentials(c_creds));
}
-std::shared_ptr<ServerCredentials> LocalServerCredentials(
- grpc_local_connect_type type) {
- return std::shared_ptr<ServerCredentials>(
- new SecureServerCredentials(grpc_local_server_credentials_create(type)));
-}
-
+std::shared_ptr<ServerCredentials> LocalServerCredentials(
+ grpc_local_connect_type type) {
+ return std::shared_ptr<ServerCredentials>(
+ new SecureServerCredentials(grpc_local_server_credentials_create(type)));
+}
+
std::shared_ptr<ServerCredentials> TlsServerCredentials(
const grpc::experimental::TlsCredentialsOptions& options) {
grpc::GrpcLibraryCodegen init;
diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc
index ec5d4eec8cd..0cc00b365ff 100644
--- a/contrib/libs/grpc/src/cpp/server/server_builder.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc
@@ -24,8 +24,8 @@
#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
-#include <utility>
-
+#include <utility>
+
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
@@ -44,8 +44,8 @@ static void do_plugin_list_init(void) {
}
ServerBuilder::ServerBuilder()
- : max_receive_message_size_(INT_MIN),
- max_send_message_size_(INT_MIN),
+ : max_receive_message_size_(INT_MIN),
+ max_send_message_size_(INT_MIN),
sync_server_settings_(SyncServerSettings()),
resource_quota_(nullptr) {
gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
@@ -71,9 +71,9 @@ ServerBuilder::~ServerBuilder() {
std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue(
- GRPC_CQ_NEXT,
- is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
- nullptr);
+ GRPC_CQ_NEXT,
+ is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
+ nullptr);
cqs_.push_back(cq);
return std::unique_ptr<grpc::ServerCompletionQueue>(cq);
}
@@ -180,7 +180,7 @@ ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
grpc_compression_level level) {
- maybe_default_compression_level_.is_set = true;
+ maybe_default_compression_level_.is_set = true;
maybe_default_compression_level_.level = level;
return *this;
}
@@ -212,14 +212,14 @@ ServerBuilder& ServerBuilder::AddListeningPort(
while (addr_uri[pos] == '/') ++pos; // Skip slashes.
addr = addr_uri.substr(pos);
}
- Port port = {addr, std::move(creds), selected_port};
+ Port port = {addr, std::move(creds), selected_port};
ports_.push_back(port);
return *this;
}
std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
grpc::ChannelArguments args;
- if (max_receive_message_size_ >= -1) {
+ if (max_receive_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
if (max_send_message_size_ >= -1) {
@@ -306,14 +306,14 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
- sync_server_cqs->emplace_back(
+ sync_server_cqs->emplace_back(
new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
}
}
- // TODO(vjpai): Add a section here for plugins once they can support callback
- // methods
-
+ // TODO(vjpai): Add a section here for plugins once they can support callback
+ // methods
+
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
@@ -324,16 +324,16 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.cq_timeout_msec);
}
- if (has_callback_methods) {
- gpr_log(GPR_INFO, "Callback server.");
- }
-
+ if (has_callback_methods) {
+ gpr_log(GPR_INFO, "Callback server.");
+ }
+
std::unique_ptr<grpc::Server> server(new grpc::Server(
&args, sync_server_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
std::move(acceptors_), resource_quota_,
std::move(interceptor_creators_)));
-
+
ServerInitializer* initializer = server->initializer();
// Register all the completion queues with the server. i.e
@@ -347,10 +347,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
}
if (has_callback_methods || callback_generic_service_ != nullptr) {
- auto* cq = server->CallbackCQ();
- grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
- }
-
+ auto* cq = server->CallbackCQ();
+ grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
+ }
+
// cqs_ contains the completion queue added by calling the ServerBuilder's
// AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
// calling Next() or AsyncNext()) and hence are not safe to be used for
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index 56bf75d730b..c2a911c7f7c 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -30,10 +30,10 @@
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/byte_buffer.h>
-#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/method_handler.h>
-#include <grpcpp/impl/codegen/server_interceptor.h>
+#include <grpcpp/impl/codegen/server_interceptor.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/impl/server_initializer.h>
@@ -43,10 +43,10 @@
#include <grpcpp/support/time.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
@@ -58,13 +58,13 @@
namespace grpc {
namespace {
-// The default value for maximum number of threads that can be created in the
-// sync server. This value of INT_MAX is chosen to match the default behavior if
-// no ResourceQuota is set. To modify the max number of threads in a sync
-// server, pass a custom ResourceQuota object (with the desired number of
-// max-threads set) to the server builder.
-#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
-
+// The default value for maximum number of threads that can be created in the
+// sync server. This value of INT_MAX is chosen to match the default behavior if
+// no ResourceQuota is set. To modify the max number of threads in a sync
+// server, pass a custom ResourceQuota object (with the desired number of
+// max-threads set) to the server builder.
+#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -319,10 +319,10 @@ class Server::UnimplementedAsyncResponse final
grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag,
status)) {
- delete this;
- } else {
- // The tag was swallowed due to interception. We will see it again.
- }
+ delete this;
+ } else {
+ // The tag was swallowed due to interception. We will see it again.
+ }
return false;
}
@@ -334,7 +334,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
public:
SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag)
: method_(method),
- method_tag_(method_tag),
+ method_tag_(method_tag),
in_flight_(false),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
@@ -362,11 +362,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- if (method_tag_) {
- if (grpc_server_request_registered_call(
- server, method_tag_, &call_, &deadline_, &request_metadata_,
+ if (method_tag_) {
+ if (grpc_server_request_registered_call(
+ server, method_tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this) != GRPC_CALL_OK) {
+ notify_cq, this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
@@ -384,21 +384,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
}
}
- void PostShutdownCleanup() {
- if (call_) {
- grpc_call_unref(call_);
- call_ = nullptr;
- }
- if (cq_) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
- }
- }
-
+ void PostShutdownCleanup() {
+ if (call_) {
+ grpc_call_unref(call_);
+ call_ = nullptr;
+ }
+ if (cq_) {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+ }
+
bool FinalizeResult(void** /*tag*/, bool* status) override {
if (!*status) {
grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ cq_ = nullptr;
}
if (call_details_) {
deadline_ = call_details_->deadline;
@@ -408,26 +408,26 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
return true;
}
- // The CallData class represents a call that is "active" as opposed
- // to just being requested. It wraps and takes ownership of the cq from
- // the call request
+ // The CallData class represents a call that is "active" as opposed
+ // to just being requested. It wraps and takes ownership of the cq from
+ // the call request
class CallData final {
public:
explicit CallData(Server* server, SyncRequest* mrd)
: cq_(mrd->cq_),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
- request_payload_(has_request_payload_ ? mrd->request_payload_
- : nullptr),
- request_(nullptr),
- method_(mrd->method_),
- call_(
- mrd->call_, server, &cq_, server->max_receive_message_size(),
- ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
- server->interceptor_creators_)),
- server_(server),
- global_callbacks_(nullptr),
- resources_(false) {
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ request_(nullptr),
+ method_(mrd->method_),
+ call_(
+ mrd->call_, server, &cq_, server->max_receive_message_size(),
+ ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
+ server->interceptor_creators_)),
+ server_(server),
+ global_callbacks_(nullptr),
+ resources_(false) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -441,82 +441,82 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
}
}
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
- bool resources) {
- global_callbacks_ = global_callbacks;
- resources_ = resources;
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- interceptor_methods_.AddInterceptionHookPoint(
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
- interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
- if (has_request_payload_) {
- // Set interception point for RECV MESSAGE
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- request_ = handler->Deserialize(call_.call(), request_payload_,
+ if (has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ request_ = handler->Deserialize(call_.call(), request_payload_,
&request_status_, nullptr);
- request_payload_ = nullptr;
- interceptor_methods_.AddInterceptionHookPoint(
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_, nullptr);
- }
-
- if (interceptor_methods_.RunInterceptors(
- [this]() { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
+ interceptor_methods_.SetRecvMessage(request_, nullptr);
+ }
+
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
}
- void ContinueRunAfterInterception() {
- {
- ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
- global_callbacks_->PreSynchronousRequest(&ctx_);
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
+ void ContinueRunAfterInterception() {
+ {
+ ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
+ global_callbacks_->PreSynchronousRequest(&ctx_);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_, request_status_, nullptr, nullptr));
- request_ = nullptr;
- global_callbacks_->PostSynchronousRequest(&ctx_);
-
- cq_.Shutdown();
-
+ request_ = nullptr;
+ global_callbacks_->PostSynchronousRequest(&ctx_);
+
+ cq_.Shutdown();
+
grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
- cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
-
- /* Ensure the cq_ is shutdown */
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
grpc::DummyTag ignored_tag;
- GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- }
- delete this;
- }
-
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
+ }
+ delete this;
+ }
+
private:
grpc::CompletionQueue cq_;
grpc::ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- void* request_;
+ void* request_;
grpc::Status request_status_;
grpc::internal::RpcServiceMethod* const method_;
grpc::internal::Call call_;
- Server* server_;
- std::shared_ptr<GlobalCallbacks> global_callbacks_;
- bool resources_;
+ Server* server_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
private:
grpc::internal::RpcServiceMethod* const method_;
- void* const method_tag_;
+ void* const method_tag_;
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
@@ -541,19 +541,19 @@ class Server::CallbackRequest final
CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
grpc::CompletionQueue* cq,
grpc_core::Server::RegisteredCallAllocation* data)
- : server_(server),
- method_(method),
+ : server_(server),
+ method_(method),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
cq_(cq),
- tag_(this) {
+ tag_(this) {
CommonSetup(server, data);
data->deadline = &deadline_;
data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
- }
-
+ }
+
// For generic services, method is nullptr since these services don't have
// pre-defined methods.
CallbackRequest(Server* server, grpc::CompletionQueue* cq,
@@ -567,8 +567,8 @@ class Server::CallbackRequest final
CommonSetup(server, data);
grpc_call_details_init(call_details_);
data->details = call_details_;
- }
-
+ }
+
~CallbackRequest() {
delete call_details_;
grpc_metadata_array_destroy(&request_metadata_);
@@ -576,21 +576,21 @@ class Server::CallbackRequest final
grpc_byte_buffer_destroy(request_payload_);
}
server_->UnrefWithPossibleNotify();
- }
-
+ }
+
// Needs specialization to account for different processing of metadata
// in generic API
bool FinalizeResult(void** tag, bool* status) override;
-
- private:
+
+ private:
// method_name needs to be specialized between named method and generic
const char* method_name() const;
- class CallbackCallTag : public grpc_experimental_completion_queue_functor {
- public:
+ class CallbackCallTag : public grpc_experimental_completion_queue_functor {
+ public:
CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
- functor_run = &CallbackCallTag::StaticRun;
+ functor_run = &CallbackCallTag::StaticRun;
// Set inlineable to true since this callback is internally-controlled
// without taking any locks, and thus does not need to be run from the
// executor (which triggers a thread hop). This should only be used by
@@ -598,42 +598,42 @@ class Server::CallbackRequest final
// here is actually non-trivial, but there is no chance of having user
// locks conflict with each other so it's ok to run inlined.
inlineable = true;
- }
-
- // force_run can not be performed on a tag if operations using this tag
- // have been sent to PerformOpsOnCall. It is intended for error conditions
- // that are detected before the operations are internally processed.
- void force_run(bool ok) { Run(ok); }
-
- private:
+ }
+
+ // force_run can not be performed on a tag if operations using this tag
+ // have been sent to PerformOpsOnCall. It is intended for error conditions
+ // that are detected before the operations are internally processed.
+ void force_run(bool ok) { Run(ok); }
+
+ private:
Server::CallbackRequest<ServerContextType>* req_;
grpc::internal::Call* call_;
-
- static void StaticRun(grpc_experimental_completion_queue_functor* cb,
- int ok) {
- static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
- }
- void Run(bool ok) {
- void* ignored = req_;
- bool new_ok = ok;
- GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
- GPR_ASSERT(ignored == req_);
-
- if (!ok) {
- // The call has been shutdown.
- // Delete its contents to free up the request.
- delete req_;
- return;
- }
-
- // Bind the call, deadline, and metadata from what we got
- req_->ctx_.set_call(req_->call_);
- req_->ctx_.cq_ = req_->cq_;
- req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
- &req_->request_metadata_);
- req_->request_metadata_.count = 0;
-
- // Create a C++ Call to control the underlying core call
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = req_;
+ bool new_ok = ok;
+ GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == req_);
+
+ if (!ok) {
+ // The call has been shutdown.
+ // Delete its contents to free up the request.
+ delete req_;
+ return;
+ }
+
+ // Bind the call, deadline, and metadata from what we got
+ req_->ctx_.set_call(req_->call_);
+ req_->ctx_.cq_ = req_->cq_;
+ req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
+ &req_->request_metadata_);
+ req_->request_metadata_.count = 0;
+
+ // Create a C++ Call to control the underlying core call
call_ =
new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call)))
grpc::internal::Call(
@@ -645,71 +645,71 @@ class Server::CallbackRequest final
? req_->method_->method_type()
: grpc::internal::RpcMethod::BIDI_STREAMING,
req_->server_->interceptor_creators_));
-
- req_->interceptor_methods_.SetCall(call_);
- req_->interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- req_->interceptor_methods_.AddInterceptionHookPoint(
+
+ req_->interceptor_methods_.SetCall(call_);
+ req_->interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
- req_->interceptor_methods_.SetRecvInitialMetadata(
- &req_->ctx_.client_metadata_);
-
- if (req_->has_request_payload_) {
- // Set interception point for RECV MESSAGE
- req_->request_ = req_->method_->handler()->Deserialize(
+ req_->interceptor_methods_.SetRecvInitialMetadata(
+ &req_->ctx_.client_metadata_);
+
+ if (req_->has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ req_->request_ = req_->method_->handler()->Deserialize(
req_->call_, req_->request_payload_, &req_->request_status_,
&req_->handler_data_);
- req_->request_payload_ = nullptr;
- req_->interceptor_methods_.AddInterceptionHookPoint(
+ req_->request_payload_ = nullptr;
+ req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
- }
-
- if (req_->interceptor_methods_.RunInterceptors(
- [this] { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
- }
- void ContinueRunAfterInterception() {
+ req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
+ }
+
+ if (req_->interceptor_methods_.RunInterceptors(
+ [this] { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
+ }
+ void ContinueRunAfterInterception() {
auto* handler = (req_->method_ != nullptr)
? req_->method_->handler()
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
req_->handler_data_, [this] { delete req_; }));
- }
- };
-
+ }
+ };
+
template <class CallAllocation>
void CommonSetup(Server* server, CallAllocation* data) {
server->Ref();
- grpc_metadata_array_init(&request_metadata_);
+ grpc_metadata_array_init(&request_metadata_);
data->tag = &tag_;
data->call = &call_;
data->initial_metadata = &request_metadata_;
- }
-
- Server* const server_;
+ }
+
+ Server* const server_;
grpc::internal::RpcServiceMethod* const method_;
- const bool has_request_payload_;
+ const bool has_request_payload_;
grpc_byte_buffer* request_payload_ = nullptr;
void* request_ = nullptr;
void* handler_data_ = nullptr;
grpc::Status request_status_;
grpc_call_details* const call_details_ = nullptr;
- grpc_call* call_;
- gpr_timespec deadline_;
- grpc_metadata_array request_metadata_;
+ grpc_call* call_;
+ gpr_timespec deadline_;
+ grpc_metadata_array request_metadata_;
grpc::CompletionQueue* const cq_;
- CallbackCallTag tag_;
+ CallbackCallTag tag_;
ServerContextType ctx_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
-};
-
+};
+
template <>
bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
void** /*tag*/, bool* /*status*/) {
@@ -750,13 +750,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
public:
SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- grpc_resource_quota* rq, int min_pollers,
- int max_pollers, int cq_timeout_msec)
- : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
- global_callbacks_(std::move(global_callbacks)) {}
+ global_callbacks_(std::move(global_callbacks)) {}
WorkStatus PollForWork(void** tag, bool* ok) override {
*tag = nullptr;
@@ -778,7 +778,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
- void DoWork(void* tag, bool ok, bool resources) override {
+ void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
@@ -789,9 +789,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
if (ok) {
- // Calldata takes ownership of the completion queue and interceptors
- // inside sync_req
- auto* cd = new SyncRequest::CallData(server_, sync_req);
+ // Calldata takes ownership of the completion queue and interceptors
+ // inside sync_req
+ auto* cd = new SyncRequest::CallData(server_, sync_req);
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
@@ -799,7 +799,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd->Run(global_callbacks_, resources);
+ cd->Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -831,17 +831,17 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
- if (ok) {
- // If a request was pulled off the queue, it means that the thread
- // handling the request added it to the completion queue after shutdown
- // was called - because the thread had already started and checked the
- // shutdown flag before shutdown was called. In this case, we simply
- // clean it up here, *after* calling wait on all the worker threads, at
- // which point we are certain no in-flight requests will add more to the
- // queue. This fixes an intermittent memory leak on shutdown.
- SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- sync_req->PostShutdownCleanup();
- }
+ if (ok) {
+ // If a request was pulled off the queue, it means that the thread
+ // handling the request added it to the completion queue after shutdown
+ // was called - because the thread had already started and checked the
+ // shutdown flag before shutdown was called. In this case, we simply
+ // clean it up here, *after* calling wait on all the worker threads, at
+ // which point we are certain no in-flight requests will add more to the
+ // queue. This fixes an intermittent memory leak on shutdown.
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+ sync_req->PostShutdownCleanup();
+ }
}
}
@@ -870,17 +870,17 @@ Server::Server(
grpc::ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec,
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
- grpc_resource_quota* server_rq,
- std::vector<
+ grpc_resource_quota* server_rq,
+ std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
- interceptor_creators)
+ interceptor_creators)
: acceptors_(std::move(acceptors)),
interceptor_creators_(std::move(interceptor_creators)),
max_receive_message_size_(INT_MIN),
- sync_server_cqs_(std::move(sync_server_cqs)),
+ sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
shutdown_(false),
shutdown_notified_(false),
@@ -893,23 +893,23 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
- bool default_rq_created = false;
- if (server_rq == nullptr) {
- server_rq = grpc_resource_quota_create("SyncServer-default-rq");
- grpc_resource_quota_set_max_threads(server_rq,
- DEFAULT_MAX_SYNC_SERVER_THREADS);
- default_rq_created = true;
- }
-
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-default-rq");
+ grpc_resource_quota_set_max_threads(server_rq,
+ DEFAULT_MAX_SYNC_SERVER_THREADS);
+ default_rq_created = true;
+ }
+
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
- this, it.get(), global_callbacks_, server_rq, min_pollers,
- max_pollers, sync_cq_timeout_msec));
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
+ }
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
}
-
- if (default_rq_created) {
- grpc_resource_quota_unref(server_rq);
- }
}
for (auto& acceptor : acceptors_) {
@@ -974,24 +974,24 @@ std::shared_ptr<grpc::Channel> Server::InProcessChannel(
const grpc::ChannelArguments& args) {
grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
- "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
+ "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
}
std::shared_ptr<grpc::Channel>
-Server::experimental_type::InProcessChannelWithInterceptors(
+Server::experimental_type::InProcessChannelWithInterceptors(
const grpc::ChannelArguments& args,
- std::vector<
+ std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
- grpc_channel_args channel_args = args.c_channel_args();
+ interceptor_creators) {
+ grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
- "inproc",
- grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
- std::move(interceptor_creators));
-}
-
+ "inproc",
+ grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
+ std::move(interceptor_creators));
+}
+
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
grpc::internal::RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -1014,29 +1014,29 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) {
}
const char* method_name = nullptr;
-
+
for (const auto& method : service->methods_) {
if (method.get() == nullptr) { // Handled by generic service if any.
continue;
}
- void* method_registration_tag = grpc_server_register_method(
+ void* method_registration_tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method.get()), 0);
- if (method_registration_tag == nullptr) {
+ if (method_registration_tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- if (method->handler() == nullptr) { // Async method without handler
- method->set_server_tag(method_registration_tag);
- } else if (method->api_type() ==
+ if (method->handler() == nullptr) { // Async method without handler
+ method->set_server_tag(method_registration_tag);
+ } else if (method->api_type() ==
grpc::internal::RpcServiceMethod::ApiType::SYNC) {
for (const auto& value : sync_req_mgrs_) {
value->AddSyncMethod(method.get(), method_registration_tag);
}
- } else {
+ } else {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
@@ -1130,23 +1130,23 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// explicit one.
grpc::ServerCompletionQueue* health_check_cq = nullptr;
grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
- default_health_check_service_impl = nullptr;
+ default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
grpc::DefaultHealthCheckServiceEnabled()) {
auto* default_hc_service = new grpc::DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- // We create a non-polling CQ to avoid impacting application
- // performance. This ensures that we don't introduce thread hops
- // for application requests that wind up on this CQ, which is polled
- // in its own thread.
+ health_check_service_.reset(default_hc_service);
+ // We create a non-polling CQ to avoid impacting application
+ // performance. This ensures that we don't introduce thread hops
+ // for application requests that wind up on this CQ, which is polled
+ // in its own thread.
health_check_cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
- grpc_server_register_completion_queue(server_, health_check_cq->cq(),
- nullptr);
- default_health_check_service_impl =
- default_hc_service->GetHealthCheckService(
+ grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+ nullptr);
+ default_health_check_service_impl =
+ default_hc_service->GetHealthCheckService(
std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
- RegisterService(nullptr, default_health_check_service_impl);
+ RegisterService(nullptr, default_health_check_service_impl);
}
for (auto& acceptor : acceptors_) {
@@ -1179,26 +1179,26 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
- if (health_check_cq != nullptr) {
- new UnimplementedAsyncRequest(this, health_check_cq);
- }
+ if (health_check_cq != nullptr) {
+ new UnimplementedAsyncRequest(this, health_check_cq);
+ }
}
- // If this server has any support for synchronous methods (has any sync
- // server CQs), make sure that we have a ResourceExhausted handler
- // to deal with the case of thread exhaustion
- if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
+ // If this server has any support for synchronous methods (has any sync
+ // server CQs), make sure that we have a ResourceExhausted handler
+ // to deal with the case of thread exhaustion
+ if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
resource_exhausted_handler_.reset(
new grpc::internal::ResourceExhaustedHandler);
- }
-
+ }
+
for (const auto& value : sync_req_mgrs_) {
value->Start();
}
-
- if (default_health_check_service_impl != nullptr) {
- default_health_check_service_impl->StartServingThread();
- }
+
+ if (default_health_check_service_impl != nullptr) {
+ default_health_check_service_impl->StartServingThread();
+ }
for (auto& acceptor : acceptors_) {
acceptor->Start();
@@ -1207,50 +1207,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::internal::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
+ if (shutdown_) {
+ return;
+ }
- shutdown_ = true;
+ shutdown_ = true;
for (auto& acceptor : acceptors_) {
acceptor->Shutdown();
}
- /// The completion queue to use for server shutdown completion notification
+ /// The completion queue to use for server shutdown completion notification
grpc::CompletionQueue shutdown_cq;
grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag
- grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
- shutdown_cq.Shutdown();
+ shutdown_cq.Shutdown();
- void* tag;
- bool ok;
+ void* tag;
+ bool ok;
grpc::CompletionQueue::NextStatus status =
- shutdown_cq.AsyncNext(&tag, &ok, deadline);
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
- // If this timed out, it means we are done with the grace period for a clean
- // shutdown. We should force a shutdown now by cancelling all inflight calls
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
- grpc_server_cancel_all_calls(server_);
- }
- // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
- // successfully shutdown
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
- // Shutdown all ThreadManagers. This will try to gracefully stop all the
- // threads in the ThreadManagers (once they process any inflight requests)
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
for (const auto& value : sync_req_mgrs_) {
value->Shutdown(); // ThreadManager's Shutdown()
- }
+ }
- // Wait for threads in all ThreadManagers to terminate
+ // Wait for threads in all ThreadManagers to terminate
for (const auto& value : sync_req_mgrs_) {
value->Wait();
- }
+ }
// Drop the shutdown ref and wait for all other refs to drop as well.
UnrefAndWaitLocked();
-
+
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
if (callback_cq_ != nullptr) {
@@ -1258,13 +1258,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
callback_cq_ = nullptr;
}
- // Drain the shutdown queue (if the previous call to AsyncNext() timed out
- // and we didn't remove the tag from the queue yet)
- while (shutdown_cq.Next(&tag, &ok)) {
- // Nothing to be done here. Just ignore ok and tag values
- }
-
- shutdown_notified_ = true;
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while (shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here. Just ignore ok and tag values
+ }
+
+ shutdown_notified_ = true;
shutdown_cv_.Broadcast();
#ifndef NDEBUG
@@ -1286,23 +1286,23 @@ void Server::Wait() {
void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
grpc::internal::Call* call) {
- ops->FillOps(call);
+ ops->FillOps(call);
}
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) {
- if (GenericAsyncRequest::FinalizeResult(tag, status)) {
- // We either had no interceptors run or we are done intercepting
- if (*status) {
+ if (GenericAsyncRequest::FinalizeResult(tag, status)) {
+ // We either had no interceptors run or we are done intercepting
+ if (*status) {
// Create a new request/response pair using the server and CQ values
// stored in this object's base class.
new UnimplementedAsyncRequest(server_, notification_cq_);
- new UnimplementedAsyncResponse(this);
- } else {
- delete this;
- }
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
} else {
- // The tag was swallowed due to interception. We will see it again.
+ // The tag was swallowed due to interception. We will see it again.
}
return false;
}
@@ -1320,8 +1320,8 @@ grpc::ServerInitializer* Server::initializer() {
}
grpc::CompletionQueue* Server::CallbackCQ() {
- // TODO(vjpai): Consider using a single global CQ for the default CQ
- // if there is no explicit per-server CQ registered
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-server CQ registered
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ != nullptr) {
return callback_cq_;
@@ -1330,11 +1330,11 @@ grpc::CompletionQueue* Server::CallbackCQ() {
callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
-
+
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
- return callback_cq_;
+ return callback_cq_;
}
-
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc
index fea258e6e75..458ac20d87c 100644
--- a/contrib/libs/grpc/src/cpp/server/server_context.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_context.cc
@@ -43,50 +43,50 @@ class ServerContextBase::CompletionOp final
: public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- // must ref the call before calling constructor and after deleting this
+ // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call* call,
::grpc::internal::ServerCallbackCall* callback_controller)
- : call_(*call),
+ : call_(*call),
callback_controller_(callback_controller),
- has_tag_(false),
+ has_tag_(false),
tag_(nullptr),
- core_cq_tag_(this),
+ core_cq_tag_(this),
refs_(2),
finalized_(false),
- cancelled_(0),
- done_intercepting_(false) {}
-
- // CompletionOp isn't copyable or movable
- CompletionOp(const CompletionOp&) = delete;
- CompletionOp& operator=(const CompletionOp&) = delete;
- CompletionOp(CompletionOp&&) = delete;
- CompletionOp& operator=(CompletionOp&&) = delete;
-
- ~CompletionOp() {
- if (call_.server_rpc_info()) {
- call_.server_rpc_info()->Unref();
- }
- }
-
+ cancelled_(0),
+ done_intercepting_(false) {}
+
+ // CompletionOp isn't copyable or movable
+ CompletionOp(const CompletionOp&) = delete;
+ CompletionOp& operator=(const CompletionOp&) = delete;
+ CompletionOp(CompletionOp&&) = delete;
+ CompletionOp& operator=(CompletionOp&&) = delete;
+
+ ~CompletionOp() {
+ if (call_.server_rpc_info()) {
+ call_.server_rpc_info()->Unref();
+ }
+ }
+
void FillOps(internal::Call* call) override;
-
- // This should always be arena allocated in the call, so override delete.
- // But this class is not trivially destructible, so must actually call delete
- // before allowing the arena to be freed
+
+ // This should always be arena allocated in the call, so override delete.
+ // But this class is not trivially destructible, so must actually call delete
+ // before allowing the arena to be freed
static void operator delete(void* /*ptr*/, std::size_t size) {
// Use size to avoid unused-parameter warning since assert seems to be
// compiled out and treated as unused in some gcc optimized versions.
(void)size;
- assert(size == sizeof(CompletionOp));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
+ assert(size == sizeof(CompletionOp));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq) {
@@ -100,36 +100,36 @@ class ServerContextBase::CompletionOp final
tag_ = tag;
}
- void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
-
- void* core_cq_tag() override { return core_cq_tag_; }
-
+ void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
+
+ void* core_cq_tag() override { return core_cq_tag_; }
+
void Unref();
- // This will be called while interceptors are run if the RPC is a hijacked
- // RPC. This should set hijacking state for each of the ops.
- void SetHijackingState() override {
- /* Servers don't allow hijacking */
+ // This will be called while interceptors are run if the RPC is a hijacked
+ // RPC. This should set hijacking state for each of the ops.
+ void SetHijackingState() override {
+ /* Servers don't allow hijacking */
GPR_ASSERT(false);
- }
-
- /* Should be called after interceptors are done running */
- void ContinueFillOpsAfterInterception() override {}
-
- /* Should be called after interceptors are done running on the finalize result
- * path */
- void ContinueFinalizeResultAfterInterception() override {
- done_intercepting_ = true;
- if (!has_tag_) {
- /* We don't have a tag to return. */
+ }
+
+ /* Should be called after interceptors are done running */
+ void ContinueFillOpsAfterInterception() override {}
+
+ /* Should be called after interceptors are done running on the finalize result
+ * path */
+ void ContinueFinalizeResultAfterInterception() override {
+ done_intercepting_ = true;
+ if (!has_tag_) {
+ /* We don't have a tag to return. */
Unref();
- return;
- }
- /* Start a dummy op so that we can return the tag */
+ return;
+ }
+ /* Start a dummy op so that we can return the tag */
GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
nullptr) == GRPC_CALL_OK);
- }
-
+ }
+
private:
bool CheckCancelledNoPluck() {
grpc_core::MutexLock lock(&mu_);
@@ -140,37 +140,37 @@ class ServerContextBase::CompletionOp final
::grpc::internal::ServerCallbackCall* const callback_controller_;
bool has_tag_;
void* tag_;
- void* core_cq_tag_;
+ void* core_cq_tag_;
grpc_core::RefCount refs_;
grpc_core::Mutex mu_;
bool finalized_;
- int cancelled_; // This is an int (not bool) because it is passed to core
- bool done_intercepting_;
+ int cancelled_; // This is an int (not bool) because it is passed to core
+ bool done_intercepting_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
void ServerContextBase::CompletionOp::Unref() {
if (refs_.Unref()) {
- grpc_call* call = call_.call();
+ grpc_call* call = call_.call();
delete this;
- grpc_call_unref(call);
+ grpc_call_unref(call);
}
}
void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
- grpc_op ops;
- ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops.data.recv_close_on_server.cancelled = &cancelled_;
- ops.flags = 0;
- ops.reserved = nullptr;
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- interceptor_methods_.SetCallOpSetInterface(this);
+ grpc_op ops;
+ ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops.data.recv_close_on_server.cancelled = &cancelled_;
+ ops.flags = 0;
+ ops.reserved = nullptr;
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ interceptor_methods_.SetCallOpSetInterface(this);
// The following call_start_batch is internally-generated so no need for an
// explanatory log on failure.
GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
nullptr) == GRPC_CALL_OK);
- /* No interceptors to run here */
+ /* No interceptors to run here */
}
bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
@@ -187,9 +187,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
}
Unref();
return has_tag;
- }
+ }
finalized_ = true;
-
+
// If for some reason the incoming status is false, mark that as a
// cancellation.
// TODO(vjpai): does this ever happen?
@@ -200,24 +200,24 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
call_cancel = (cancelled_ != 0);
// Release the lock since we may call a callback and interceptors.
}
-
+
if (call_cancel && callback_controller_ != nullptr) {
callback_controller_->MaybeCallOnCancel();
}
- /* Add interception point and run through interceptors */
- interceptor_methods_.AddInterceptionHookPoint(
+ /* Add interception point and run through interceptors */
+ interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE);
- if (interceptor_methods_.RunInterceptors()) {
+ if (interceptor_methods_.RunInterceptors()) {
// No interceptors were run
bool has_tag = has_tag_;
if (has_tag) {
- *tag = tag_;
- }
+ *tag = tag_;
+ }
Unref();
return has_tag;
- }
+ }
// There are interceptors to be run. Return false for now.
- return false;
+ return false;
}
// ServerContextBase body
@@ -233,17 +233,17 @@ ServerContextBase::ServerContextBase(gpr_timespec deadline,
void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
grpc_metadata_array* arr) {
- deadline_ = deadline;
- std::swap(*client_metadata_.arr(), *arr);
-}
-
+ deadline_ = deadline;
+ std::swap(*client_metadata_.arr(), *arr);
+}
+
ServerContextBase::~ServerContextBase() {
if (completion_op_) {
completion_op_->Unref();
}
- if (rpc_info_) {
- rpc_info_->Unref();
- }
+ if (rpc_info_) {
+ rpc_info_->Unref();
+ }
if (default_reactor_used_.load(std::memory_order_relaxed)) {
reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
}
@@ -261,19 +261,19 @@ void ServerContextBase::BeginCompletionOp(
internal::Call* call, std::function<void(bool)> callback,
::grpc::internal::ServerCallbackCall* callback_controller) {
GPR_ASSERT(!completion_op_);
- if (rpc_info_) {
- rpc_info_->Ref();
- }
- grpc_call_ref(call->call());
- completion_op_ =
- new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
+ if (rpc_info_) {
+ rpc_info_->Ref();
+ }
+ grpc_call_ref(call->call());
+ completion_op_ =
+ new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
CompletionOp(call, callback_controller);
if (callback_controller != nullptr) {
completion_tag_.Set(call->call(), std::move(callback), completion_op_,
true);
- completion_op_->set_core_cq_tag(&completion_tag_);
- completion_op_->set_tag(completion_op_);
- } else if (has_notify_when_done_tag_) {
+ completion_op_->set_core_cq_tag(&completion_tag_);
+ completion_op_->set_tag(completion_op_);
+ } else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
call->PerformOps(completion_op_);
@@ -295,11 +295,11 @@ void ServerContextBase::AddTrailingMetadata(const TString& key,
void ServerContextBase::TryCancel() const {
internal::CancelInterceptorBatchMethods cancel_methods;
- if (rpc_info_) {
- for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
- rpc_info_->RunInterceptor(&cancel_methods, i);
- }
- }
+ if (rpc_info_) {
+ for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
+ rpc_info_->RunInterceptor(&cancel_methods, i);
+ }
+ }
grpc_call_error err =
grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
"Cancelled on the server side", nullptr);
@@ -309,15 +309,15 @@ void ServerContextBase::TryCancel() const {
}
bool ServerContextBase::IsCancelled() const {
- if (completion_tag_) {
- // When using callback API, this result is always valid.
- return completion_op_->CheckCancelledAsync();
- } else if (has_notify_when_done_tag_) {
- // When using async API, the result is only valid
+ if (completion_tag_) {
+ // When using callback API, this result is always valid.
+ return completion_op_->CheckCancelledAsync();
+ } else if (has_notify_when_done_tag_) {
+ // When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
- // when using sync API, the result is always valid
+ // when using sync API, the result is always valid
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
}
diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc
index c93129dc5f8..c8560aa81dd 100644
--- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc
+++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc
@@ -22,7 +22,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/thd.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc {
@@ -49,17 +49,17 @@ ThreadManager::WorkerThread::~WorkerThread() {
thd_.Join();
}
-ThreadManager::ThreadManager(const char* name,
- grpc_resource_quota* resource_quota,
- int min_pollers, int max_pollers)
+ThreadManager::ThreadManager(const char* name,
+ grpc_resource_quota* resource_quota,
+ int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
- num_threads_(0),
- max_active_threads_sofar_(0) {
- resource_user_ = grpc_resource_user_create(resource_quota, name);
-}
+ num_threads_(0),
+ max_active_threads_sofar_(0) {
+ resource_user_ = grpc_resource_user_create(resource_quota, name);
+}
ThreadManager::~ThreadManager() {
{
@@ -67,8 +67,8 @@ ThreadManager::~ThreadManager() {
GPR_ASSERT(num_threads_ == 0);
}
- grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx
- grpc_resource_user_unref(resource_user_);
+ grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx
+ grpc_resource_user_unref(resource_user_);
CleanupCompletedThreads();
}
@@ -89,27 +89,27 @@ bool ThreadManager::IsShutdown() {
return shutdown_;
}
-int ThreadManager::GetMaxActiveThreadsSoFar() {
+int ThreadManager::GetMaxActiveThreadsSoFar() {
grpc_core::MutexLock list_lock(&list_mu_);
- return max_active_threads_sofar_;
-}
-
+ return max_active_threads_sofar_;
+}
+
void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
{
grpc_core::MutexLock list_lock(&list_mu_);
completed_threads_.push_back(thd);
}
- {
+ {
grpc_core::MutexLock lock(&mu_);
- num_threads_--;
- if (num_threads_ == 0) {
+ num_threads_--;
+ if (num_threads_ == 0) {
shutdown_cv_.Signal();
- }
+ }
}
-
- // Give a thread back to the resource quota
- grpc_resource_user_free_threads(resource_user_, 1);
+
+ // Give a thread back to the resource quota
+ grpc_resource_user_free_threads(resource_user_, 1);
}
void ThreadManager::CleanupCompletedThreads() {
@@ -124,19 +124,19 @@ void ThreadManager::CleanupCompletedThreads() {
}
void ThreadManager::Initialize() {
- if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) {
- gpr_log(GPR_ERROR,
- "No thread quota available to even create the minimum required "
- "polling threads (i.e %d). Unable to start the thread manager",
- min_pollers_);
- abort();
- }
-
+ if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) {
+ gpr_log(GPR_ERROR,
+ "No thread quota available to even create the minimum required "
+ "polling threads (i.e %d). Unable to start the thread manager",
+ min_pollers_);
+ abort();
+ }
+
{
grpc_core::MutexLock lock(&mu_);
num_pollers_ = min_pollers_;
num_threads_ = min_pollers_;
- max_active_threads_sofar_ = min_pollers_;
+ max_active_threads_sofar_ = min_pollers_;
}
for (int i = 0; i < min_pollers_; i++) {
@@ -167,18 +167,18 @@ void ThreadManager::MainWorkLoop() {
done = true;
break;
case WORK_FOUND:
- // If we got work and there are now insufficient pollers and there is
- // quota available to create a new thread, start a new poller thread
- bool resource_exhausted = false;
+ // If we got work and there are now insufficient pollers and there is
+ // quota available to create a new thread, start a new poller thread
+ bool resource_exhausted = false;
if (!shutdown_ && num_pollers_ < min_pollers_) {
- if (grpc_resource_user_allocate_threads(resource_user_, 1)) {
- // We can allocate a new poller thread
- num_pollers_++;
- num_threads_++;
- if (num_threads_ > max_active_threads_sofar_) {
- max_active_threads_sofar_ = num_threads_;
- }
- // Drop lock before spawning thread to avoid contention
+ if (grpc_resource_user_allocate_threads(resource_user_, 1)) {
+ // We can allocate a new poller thread
+ num_pollers_++;
+ num_threads_++;
+ if (num_threads_ > max_active_threads_sofar_) {
+ max_active_threads_sofar_ = num_threads_;
+ }
+ // Drop lock before spawning thread to avoid contention
lock.Unlock();
WorkerThread* worker = new WorkerThread(this);
if (worker->created()) {
@@ -191,26 +191,26 @@ void ThreadManager::MainWorkLoop() {
resource_exhausted = true;
delete worker;
}
- } else if (num_pollers_ > 0) {
- // There is still at least some thread polling, so we can go on
- // even though we are below the number of pollers that we would
- // like to have (min_pollers_)
+ } else if (num_pollers_ > 0) {
+ // There is still at least some thread polling, so we can go on
+ // even though we are below the number of pollers that we would
+ // like to have (min_pollers_)
lock.Unlock();
- } else {
- // There are no pollers to spare and we couldn't allocate
- // a new thread, so resources are exhausted!
+ } else {
+ // There are no pollers to spare and we couldn't allocate
+ // a new thread, so resources are exhausted!
lock.Unlock();
- resource_exhausted = true;
- }
+ resource_exhausted = true;
+ }
} else {
- // There are a sufficient number of pollers available so we can do
- // the work and continue polling with our existing poller threads
+ // There are a sufficient number of pollers available so we can do
+ // the work and continue polling with our existing poller threads
lock.Unlock();
}
// Lock is always released at this point - do the application work
- // or return resource exhausted if there is new work but we couldn't
- // get a thread in which to do it.
- DoWork(tag, ok, !resource_exhausted);
+ // or return resource exhausted if there is new work but we couldn't
+ // get a thread in which to do it.
+ DoWork(tag, ok, !resource_exhausted);
// Take the lock again to check post conditions
lock.Lock();
// If we're shutdown, we should finish at this point.
@@ -254,8 +254,8 @@ void ThreadManager::MainWorkLoop() {
}
};
- // This thread is exiting. Do some cleanup work i.e delete already completed
- // worker threads
+ // This thread is exiting. Do some cleanup work i.e delete already completed
+ // worker threads
CleanupCompletedThreads();
// If we are here, either ThreadManager is shutting down or it already has
diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h
index 902b237cfdf..43f1fd5585f 100644
--- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h
+++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h
@@ -26,14 +26,14 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
-#include "src/core/lib/iomgr/resource_quota.h"
+#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc {
class ThreadManager {
public:
- explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota,
- int min_pollers, int max_pollers);
+ explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota,
+ int min_pollers, int max_pollers);
virtual ~ThreadManager();
// Initializes and Starts the Rpc Manager threads
@@ -66,14 +66,14 @@ class ThreadManager {
// The implementation of DoWork() is supposed to perform the work found by
// PollForWork(). The tag and ok parameters are the same as returned by
- // PollForWork(). The resources parameter indicates that the call actually
- // has the resources available for performing the RPC's work. If it doesn't,
- // the implementation should fail it appropriately.
+ // PollForWork(). The resources parameter indicates that the call actually
+ // has the resources available for performing the RPC's work. If it doesn't,
+ // the implementation should fail it appropriately.
//
// The implementation of DoWork() should also do any setup needed to ensure
// that the next call to PollForWork() (not necessarily by the current thread)
// actually finds some work
- virtual void DoWork(void* tag, bool ok, bool resources) = 0;
+ virtual void DoWork(void* tag, bool ok, bool resources) = 0;
// Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which
@@ -87,11 +87,11 @@ class ThreadManager {
// all the threads have drained all the outstanding work
virtual void Wait();
- // Max number of concurrent threads that were ever active in this thread
- // manager so far. This is useful for debugging purposes (and in unit tests)
- // to check if resource_quota is properly being enforced.
- int GetMaxActiveThreadsSoFar();
-
+ // Max number of concurrent threads that were ever active in this thread
+ // manager so far. This is useful for debugging purposes (and in unit tests)
+ // to check if resource_quota is properly being enforced.
+ int GetMaxActiveThreadsSoFar();
+
private:
// Helper wrapper class around grpc_core::Thread. Takes a ThreadManager object
// and starts a new grpc_core::Thread to calls the Run() function.
@@ -99,24 +99,24 @@ class ThreadManager {
// The Run() function calls ThreadManager::MainWorkLoop() function and once
// that completes, it marks the WorkerThread completed by calling
// ThreadManager::MarkAsCompleted()
- //
- // WHY IS THIS NEEDED?:
- // When a thread terminates, some other thread *must* call Join() on that
- // thread so that the resources are released. Having a WorkerThread wrapper
- // will make this easier. Once Run() completes, each thread calls the
- // following two functions:
- // ThreadManager::CleanupCompletedThreads()
- // ThreadManager::MarkAsCompleted()
- //
- // - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's
- // completed_threads_ list
- // - CleanupCompletedThreads() calls "Join()" on the threads that are already
- // in the completed_threads_ list (since a thread cannot call Join() on
- // itself, it calls CleanupCompletedThreads() *before* calling
- // MarkAsCompleted())
- //
- // TODO(sreek): Consider creating the threads 'detached' so that Join() need
- // not be called (and the need for this WorkerThread class is eliminated)
+ //
+ // WHY IS THIS NEEDED?:
+ // When a thread terminates, some other thread *must* call Join() on that
+ // thread so that the resources are released. Having a WorkerThread wrapper
+ // will make this easier. Once Run() completes, each thread calls the
+ // following two functions:
+ // ThreadManager::CleanupCompletedThreads()
+ // ThreadManager::MarkAsCompleted()
+ //
+ // - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's
+ // completed_threads_ list
+ // - CleanupCompletedThreads() calls "Join()" on the threads that are already
+ // in the completed_threads_ list (since a thread cannot call Join() on
+ // itself, it calls CleanupCompletedThreads() *before* calling
+ // MarkAsCompleted())
+ //
+ // TODO(sreek): Consider creating the threads 'detached' so that Join() need
+ // not be called (and the need for this WorkerThread class is eliminated)
class WorkerThread {
public:
WorkerThread(ThreadManager* thd_mgr);
@@ -141,21 +141,21 @@ class ThreadManager {
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();
- // Protects shutdown_, num_pollers_, num_threads_ and
- // max_active_threads_sofar_
+ // Protects shutdown_, num_pollers_, num_threads_ and
+ // max_active_threads_sofar_
grpc_core::Mutex mu_;
bool shutdown_;
grpc_core::CondVar shutdown_cv_;
- // The resource user object to use when requesting quota to create threads
- //
- // Note: The user of this ThreadManager object must create grpc_resource_quota
- // object (that contains the actual max thread quota) and a grpc_resource_user
+ // The resource user object to use when requesting quota to create threads
+ //
+ // Note: The user of this ThreadManager object must create grpc_resource_quota
+ // object (that contains the actual max thread quota) and a grpc_resource_user
// object through which quota is requested whenever new threads need to be
- // created
- grpc_resource_user* resource_user_;
-
+ // created
+ grpc_resource_user* resource_user_;
+
// Number of threads doing polling
int num_pollers_;
@@ -163,15 +163,15 @@ class ThreadManager {
int min_pollers_;
int max_pollers_;
- // The total number of threads currently active (includes threads includes the
- // threads that are currently polling i.e num_pollers_)
+ // The total number of threads currently active (includes threads includes the
+ // threads that are currently polling i.e num_pollers_)
int num_threads_;
- // See GetMaxActiveThreadsSoFar()'s description.
- // To be more specific, this variable tracks the max value num_threads_ was
- // ever set so far
- int max_active_threads_sofar_;
-
+ // See GetMaxActiveThreadsSoFar()'s description.
+ // To be more specific, this variable tracks the max value num_threads_ was
+ // ever set so far
+ int max_active_threads_sofar_;
+
grpc_core::Mutex list_mu_;
std::list<WorkerThread*> completed_threads_;
};