aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp/server
diff options
context:
space:
mode:
authorneksard <neksard@yandex-team.ru>2022-02-10 16:45:23 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:23 +0300
commit8f7cf138264e0caa318144bf8a2c950e0b0a8593 (patch)
tree83bf5c8c8047c42d8475e6095df90ccdc3d1b57f /contrib/libs/grpc/src/cpp/server
parentd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (diff)
downloadydb-8f7cf138264e0caa318144bf8a2c950e0b0a8593.tar.gz
Restoring authorship annotation for <neksard@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc250
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h128
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc142
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc768
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h442
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/constants.h162
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h72
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc88
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc90
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc80
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc110
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc114
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h32
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/util.cc86
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc22
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_builder.cc46
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc612
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_context.cc216
18 files changed, 1730 insertions, 1730 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
index 6dcf84bf40..b707f3c476 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
@@ -1,29 +1,29 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/cpp/server/channelz/channelz_service.h"
-
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-
-namespace grpc {
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/server/channelz/channelz_service.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+namespace grpc {
namespace {
@@ -33,121 +33,121 @@ grpc::protobuf::util::Status ParseJson(const char* json_str,
options.case_insensitive_enum_parsing = true;
return grpc::protobuf::json::JsonStringToMessage(json_str, message, options);
}
-
+
} // namespace
-Status ChannelzService::GetTopChannels(
+Status ChannelzService::GetTopChannels(
ServerContext* /*unused*/,
const channelz::v1::GetTopChannelsRequest* request,
- channelz::v1::GetTopChannelsResponse* response) {
- char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_top_channels returned null");
- }
+ channelz::v1::GetTopChannelsResponse* response) {
+ char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_top_channels returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetServers(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServers(
ServerContext* /*unused*/, const channelz::v1::GetServersRequest* request,
- channelz::v1::GetServersResponse* response) {
- char* json_str = grpc_channelz_get_servers(request->start_server_id());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_servers returned null");
- }
+ channelz::v1::GetServersResponse* response) {
+ char* json_str = grpc_channelz_get_servers(request->start_server_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_servers returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
Status ChannelzService::GetServer(ServerContext* /*unused*/,
- const channelz::v1::GetServerRequest* request,
- channelz::v1::GetServerResponse* response) {
- char* json_str = grpc_channelz_get_server(request->server_id());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_server returned null");
- }
+ const channelz::v1::GetServerRequest* request,
+ channelz::v1::GetServerResponse* response) {
+ char* json_str = grpc_channelz_get_server(request->server_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetServerSockets(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetServerSockets(
ServerContext* /*unused*/,
const channelz::v1::GetServerSocketsRequest* request,
- channelz::v1::GetServerSocketsResponse* response) {
- char* json_str = grpc_channelz_get_server_sockets(
- request->server_id(), request->start_socket_id(), request->max_results());
- if (json_str == nullptr) {
- return Status(StatusCode::INTERNAL,
- "grpc_channelz_get_server_sockets returned null");
- }
+ channelz::v1::GetServerSocketsResponse* response) {
+ char* json_str = grpc_channelz_get_server_sockets(
+ request->server_id(), request->start_socket_id(), request->max_results());
+ if (json_str == nullptr) {
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server_sockets returned null");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetChannel(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetChannel(
ServerContext* /*unused*/, const channelz::v1::GetChannelRequest* request,
- channelz::v1::GetChannelResponse* response) {
- char* json_str = grpc_channelz_get_channel(request->channel_id());
- if (json_str == nullptr) {
- return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId");
- }
+ channelz::v1::GetChannelResponse* response) {
+ char* json_str = grpc_channelz_get_channel(request->channel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-Status ChannelzService::GetSubchannel(
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+Status ChannelzService::GetSubchannel(
ServerContext* /*unused*/,
const channelz::v1::GetSubchannelRequest* request,
- channelz::v1::GetSubchannelResponse* response) {
- char* json_str = grpc_channelz_get_subchannel(request->subchannel_id());
- if (json_str == nullptr) {
- return Status(StatusCode::NOT_FOUND,
- "No object found for that SubchannelId");
- }
+ channelz::v1::GetSubchannelResponse* response) {
+ char* json_str = grpc_channelz_get_subchannel(request->subchannel_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND,
+ "No object found for that SubchannelId");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
Status ChannelzService::GetSocket(ServerContext* /*unused*/,
- const channelz::v1::GetSocketRequest* request,
- channelz::v1::GetSocketResponse* response) {
- char* json_str = grpc_channelz_get_socket(request->socket_id());
- if (json_str == nullptr) {
- return Status(StatusCode::NOT_FOUND, "No object found for that SocketId");
- }
+ const channelz::v1::GetSocketRequest* request,
+ channelz::v1::GetSocketResponse* response) {
+ char* json_str = grpc_channelz_get_socket(request->socket_id());
+ if (json_str == nullptr) {
+ return Status(StatusCode::NOT_FOUND, "No object found for that SocketId");
+ }
grpc::protobuf::util::Status s = ParseJson(json_str, response);
- gpr_free(json_str);
- if (!s.ok()) {
- return Status(StatusCode::INTERNAL, s.ToString());
- }
- return Status::OK;
-}
-
-} // namespace grpc
+ gpr_free(json_str);
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
+ }
+ return Status::OK;
+}
+
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
index b4a66ba1c6..72818a0d72 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.h
@@ -1,64 +1,64 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
-#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
-
-#include <grpc/support/port_platform.h>
-
-#include <grpcpp/grpcpp.h>
-#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
-
-namespace grpc {
-
-class ChannelzService final : public channelz::v1::Channelz::Service {
- private:
- // implementation of GetTopChannels rpc
- Status GetTopChannels(
- ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
- channelz::v1::GetTopChannelsResponse* response) override;
- // implementation of GetServers rpc
- Status GetServers(ServerContext* unused,
- const channelz::v1::GetServersRequest* request,
- channelz::v1::GetServersResponse* response) override;
- // implementation of GetServer rpc
- Status GetServer(ServerContext* unused,
- const channelz::v1::GetServerRequest* request,
- channelz::v1::GetServerResponse* response) override;
- // implementation of GetServerSockets rpc
- Status GetServerSockets(
- ServerContext* unused,
- const channelz::v1::GetServerSocketsRequest* request,
- channelz::v1::GetServerSocketsResponse* response) override;
- // implementation of GetChannel rpc
- Status GetChannel(ServerContext* unused,
- const channelz::v1::GetChannelRequest* request,
- channelz::v1::GetChannelResponse* response) override;
- // implementation of GetSubchannel rpc
- Status GetSubchannel(ServerContext* unused,
- const channelz::v1::GetSubchannelRequest* request,
- channelz::v1::GetSubchannelResponse* response) override;
- // implementation of GetSocket rpc
- Status GetSocket(ServerContext* unused,
- const channelz::v1::GetSocketRequest* request,
- channelz::v1::GetSocketResponse* response) override;
-};
-
-} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/grpcpp.h>
+#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
+
+namespace grpc {
+
+class ChannelzService final : public channelz::v1::Channelz::Service {
+ private:
+ // implementation of GetTopChannels rpc
+ Status GetTopChannels(
+ ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
+ channelz::v1::GetTopChannelsResponse* response) override;
+ // implementation of GetServers rpc
+ Status GetServers(ServerContext* unused,
+ const channelz::v1::GetServersRequest* request,
+ channelz::v1::GetServersResponse* response) override;
+ // implementation of GetServer rpc
+ Status GetServer(ServerContext* unused,
+ const channelz::v1::GetServerRequest* request,
+ channelz::v1::GetServerResponse* response) override;
+ // implementation of GetServerSockets rpc
+ Status GetServerSockets(
+ ServerContext* unused,
+ const channelz::v1::GetServerSocketsRequest* request,
+ channelz::v1::GetServerSocketsResponse* response) override;
+ // implementation of GetChannel rpc
+ Status GetChannel(ServerContext* unused,
+ const channelz::v1::GetChannelRequest* request,
+ channelz::v1::GetChannelResponse* response) override;
+ // implementation of GetSubchannel rpc
+ Status GetSubchannel(ServerContext* unused,
+ const channelz::v1::GetSubchannelRequest* request,
+ channelz::v1::GetSubchannelResponse* response) override;
+ // implementation of GetSocket rpc
+ Status GetSocket(ServerContext* unused,
+ const channelz::v1::GetSocketRequest* request,
+ channelz::v1::GetSocketResponse* response) override;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
index ae26a447ab..35ecd08125 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
@@ -1,72 +1,72 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <grpcpp/ext/channelz_service_plugin.h>
-#include <grpcpp/impl/server_builder_plugin.h>
-#include <grpcpp/impl/server_initializer.h>
-#include <grpcpp/server.h>
-
-#include "src/cpp/server/channelz/channelz_service.h"
-
-namespace grpc {
-namespace channelz {
-namespace experimental {
-
-class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
- public:
- ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
-
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpcpp/ext/channelz_service_plugin.h>
+#include <grpcpp/impl/server_builder_plugin.h>
+#include <grpcpp/impl/server_initializer.h>
+#include <grpcpp/server.h>
+
+#include "src/cpp/server/channelz/channelz_service.h"
+
+namespace grpc {
+namespace channelz {
+namespace experimental {
+
+class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
+ public:
+ ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
+
TString name() override { return "channelz_service"; }
-
- void InitServer(grpc::ServerInitializer* si) override {
- si->RegisterService(channelz_service_);
- }
-
+
+ void InitServer(grpc::ServerInitializer* si) override {
+ si->RegisterService(channelz_service_);
+ }
+
void Finish(grpc::ServerInitializer* /*si*/) override {}
-
+
void ChangeArguments(const TString& /*name*/, void* /*value*/) override {}
-
- bool has_sync_methods() const override {
- if (channelz_service_) {
- return channelz_service_->has_synchronous_methods();
- }
- return false;
- }
-
- bool has_async_methods() const override {
- if (channelz_service_) {
- return channelz_service_->has_async_methods();
- }
- return false;
- }
-
- private:
- std::shared_ptr<grpc::ChannelzService> channelz_service_;
-};
-
-static std::unique_ptr< ::grpc::ServerBuilderPlugin>
-CreateChannelzServicePlugin() {
- return std::unique_ptr< ::grpc::ServerBuilderPlugin>(
- new ChannelzServicePlugin());
-}
-
+
+ bool has_sync_methods() const override {
+ if (channelz_service_) {
+ return channelz_service_->has_synchronous_methods();
+ }
+ return false;
+ }
+
+ bool has_async_methods() const override {
+ if (channelz_service_) {
+ return channelz_service_->has_async_methods();
+ }
+ return false;
+ }
+
+ private:
+ std::shared_ptr<grpc::ChannelzService> channelz_service_;
+};
+
+static std::unique_ptr< ::grpc::ServerBuilderPlugin>
+CreateChannelzServicePlugin() {
+ return std::unique_ptr< ::grpc::ServerBuilderPlugin>(
+ new ChannelzServicePlugin());
+}
+
} // namespace experimental
} // namespace channelz
} // namespace grpc
@@ -74,15 +74,15 @@ namespace grpc_impl {
namespace channelz {
namespace experimental {
-void InitChannelzService() {
+void InitChannelzService() {
static struct Initializer {
Initializer() {
::grpc::ServerBuilder::InternalAddPluginFactory(
&grpc::channelz::experimental::CreateChannelzServicePlugin);
}
} initialize;
-}
-
-} // namespace experimental
-} // namespace channelz
+}
+
+} // namespace experimental
+} // namespace channelz
} // namespace grpc_impl
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
index 3cc508d0cb..5f70ce0540 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
@@ -25,191 +25,191 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/method_handler.h>
-#include "src/cpp/server/health/default_health_check_service.h"
+#include "src/cpp/server/health/default_health_check_service.h"
#include "src/proto/grpc/health/v1/health.upb.h"
#include "upb/upb.hpp"
#define MAX_SERVICE_NAME_LENGTH 200
namespace grpc {
-
-//
-// DefaultHealthCheckService
-//
-
-DefaultHealthCheckService::DefaultHealthCheckService() {
- services_map_[""].SetServingStatus(SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(
+
+//
+// DefaultHealthCheckService
+//
+
+DefaultHealthCheckService::DefaultHealthCheckService() {
+ services_map_[""].SetServingStatus(SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(
const TString& service_name, bool serving) {
grpc_core::MutexLock lock(&mu_);
- if (shutdown_) {
- // Set to NOT_SERVING in case service_name is not in the map.
- serving = false;
- }
- services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(bool serving) {
- const ServingStatus status = serving ? SERVING : NOT_SERVING;
+ if (shutdown_) {
+ // Set to NOT_SERVING in case service_name is not in the map.
+ serving = false;
+ }
+ services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+ const ServingStatus status = serving ? SERVING : NOT_SERVING;
grpc_core::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
- for (auto& p : services_map_) {
- ServiceData& service_data = p.second;
- service_data.SetServingStatus(status);
- }
-}
-
-void DefaultHealthCheckService::Shutdown() {
+ if (shutdown_) {
+ return;
+ }
+ for (auto& p : services_map_) {
+ ServiceData& service_data = p.second;
+ service_data.SetServingStatus(status);
+ }
+}
+
+void DefaultHealthCheckService::Shutdown() {
grpc_core::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
- shutdown_ = true;
- for (auto& p : services_map_) {
- ServiceData& service_data = p.second;
- service_data.SetServingStatus(NOT_SERVING);
- }
-}
-
-DefaultHealthCheckService::ServingStatus
-DefaultHealthCheckService::GetServingStatus(
+ if (shutdown_) {
+ return;
+ }
+ shutdown_ = true;
+ for (auto& p : services_map_) {
+ ServiceData& service_data = p.second;
+ service_data.SetServingStatus(NOT_SERVING);
+ }
+}
+
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
const TString& service_name) const {
grpc_core::MutexLock lock(&mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) {
- return NOT_FOUND;
- }
- const ServiceData& service_data = it->second;
- return service_data.GetServingStatus();
-}
-
-void DefaultHealthCheckService::RegisterCallHandler(
+ auto it = services_map_.find(service_name);
+ if (it == services_map_.end()) {
+ return NOT_FOUND;
+ }
+ const ServiceData& service_data = it->second;
+ return service_data.GetServingStatus();
+}
+
+void DefaultHealthCheckService::RegisterCallHandler(
const TString& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
grpc_core::MutexLock lock(&mu_);
- ServiceData& service_data = services_map_[service_name];
- service_data.AddCallHandler(handler /* copies ref */);
- HealthCheckServiceImpl::CallHandler* h = handler.get();
- h->SendHealth(std::move(handler), service_data.GetServingStatus());
-}
-
-void DefaultHealthCheckService::UnregisterCallHandler(
+ ServiceData& service_data = services_map_[service_name];
+ service_data.AddCallHandler(handler /* copies ref */);
+ HealthCheckServiceImpl::CallHandler* h = handler.get();
+ h->SendHealth(std::move(handler), service_data.GetServingStatus());
+}
+
+void DefaultHealthCheckService::UnregisterCallHandler(
const TString& service_name,
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
grpc_core::MutexLock lock(&mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) return;
- ServiceData& service_data = it->second;
- service_data.RemoveCallHandler(handler);
- if (service_data.Unused()) {
- services_map_.erase(it);
- }
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl*
-DefaultHealthCheckService::GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq) {
- GPR_ASSERT(impl_ == nullptr);
- impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
- return impl_.get();
-}
-
-//
-// DefaultHealthCheckService::ServiceData
-//
-
-void DefaultHealthCheckService::ServiceData::SetServingStatus(
- ServingStatus status) {
- status_ = status;
- for (auto& call_handler : call_handlers_) {
- call_handler->SendHealth(call_handler /* copies ref */, status);
- }
-}
-
-void DefaultHealthCheckService::ServiceData::AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- call_handlers_.insert(std::move(handler));
-}
-
-void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
- call_handlers_.erase(handler);
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl
-//
-
+ auto it = services_map_.find(service_name);
+ if (it == services_map_.end()) return;
+ ServiceData& service_data = it->second;
+ service_data.RemoveCallHandler(handler);
+ if (service_data.Unused()) {
+ services_map_.erase(it);
+ }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq) {
+ GPR_ASSERT(impl_ == nullptr);
+ impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
+ return impl_.get();
+}
+
+//
+// DefaultHealthCheckService::ServiceData
+//
+
+void DefaultHealthCheckService::ServiceData::SetServingStatus(
+ ServingStatus status) {
+ status_ = status;
+ for (auto& call_handler : call_handlers_) {
+ call_handler->SendHealth(call_handler /* copies ref */, status);
+ }
+}
+
+void DefaultHealthCheckService::ServiceData::AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ call_handlers_.insert(std::move(handler));
+}
+
+void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
+ call_handlers_.erase(handler);
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl
+//
+
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
-const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
+const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
- DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq)
- : database_(database), cq_(std::move(cq)) {
- // Add Check() method.
- AddMethod(new internal::RpcServiceMethod(
- kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
- // Add Watch() method.
- AddMethod(new internal::RpcServiceMethod(
- kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
- // Create serving thread.
- thread_ = std::unique_ptr<::grpc_core::Thread>(
- new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
+ DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq)
+ : database_(database), cq_(std::move(cq)) {
+ // Add Check() method.
+ AddMethod(new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
+ // Add Watch() method.
+ AddMethod(new internal::RpcServiceMethod(
+ kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
+ // Create serving thread.
+ thread_ = std::unique_ptr<::grpc_core::Thread>(
+ new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
}
-DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
- // We will reach here after the server starts shutting down.
- shutdown_ = true;
- {
+DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
+ // We will reach here after the server starts shutting down.
+ shutdown_ = true;
+ {
grpc_core::MutexLock lock(&cq_shutdown_mu_);
- cq_->Shutdown();
- }
- thread_->Join();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
- // Request the calls we're interested in.
- // We do this before starting the serving thread, so that we know it's
- // done before server startup is complete.
- CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
- WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
- // Start serving thread.
- thread_->Start();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
+ cq_->Shutdown();
+ }
+ thread_->Join();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
+ // Request the calls we're interested in.
+ // We do this before starting the serving thread, so that we know it's
+ // done before server startup is complete.
+ CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
+ WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
+ // Start serving thread.
+ thread_->Start();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
- void* tag;
- bool ok;
- while (true) {
- if (!service->cq_->Next(&tag, &ok)) {
- // The completion queue is shutting down.
- GPR_ASSERT(service->shutdown_);
- break;
- }
- auto* next_step = static_cast<CallableTag*>(tag);
- next_step->Run(ok);
- }
-}
-
-bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
+ void* tag;
+ bool ok;
+ while (true) {
+ if (!service->cq_->Next(&tag, &ok)) {
+ // The completion queue is shutting down.
+ GPR_ASSERT(service->shutdown_);
+ break;
+ }
+ auto* next_step = static_cast<CallableTag*>(tag);
+ next_step->Run(ok);
+ }
+}
+
+bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
const ByteBuffer& request, TString* service_name) {
std::vector<Slice> slices;
- if (!request.Dump(&slices).ok()) return false;
+ if (!request.Dump(&slices).ok()) return false;
uint8_t* request_bytes = nullptr;
size_t request_size = 0;
- if (slices.size() == 1) {
+ if (slices.size() == 1) {
request_bytes = const_cast<uint8_t*>(slices[0].begin());
request_size = slices[0].size();
- } else if (slices.size() > 1) {
- request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
+ } else if (slices.size() > 1) {
+ request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
memcpy(copy_to, slices[i].begin(), slices[i].size());
@@ -220,8 +220,8 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_parse(
reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
- if (slices.size() > 1) {
- gpr_free(request_bytes);
+ if (slices.size() > 1) {
+ gpr_free(request_bytes);
}
if (request_struct == nullptr) {
return false;
@@ -232,17 +232,17 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
return false;
}
service_name->assign(service.data, service.size);
- return true;
-}
+ return true;
+}
-bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
- ServingStatus status, ByteBuffer* response) {
+bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
+ ServingStatus status, ByteBuffer* response) {
upb::Arena arena;
grpc_health_v1_HealthCheckResponse* response_struct =
grpc_health_v1_HealthCheckResponse_new(arena.ptr());
grpc_health_v1_HealthCheckResponse_set_status(
response_struct,
- status == NOT_FOUND
+ status == NOT_FOUND
? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
: status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
: grpc_health_v1_HealthCheckResponse_NOT_SERVING);
@@ -256,249 +256,249 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
- return true;
+ return true;
}
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<CheckCallHandler>(cq, database, service);
- CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
- {
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service) {
+ std::shared_ptr<CallHandler> self =
+ std::make_shared<CheckCallHandler>(cq, database, service);
+ CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
+ {
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request a Check() call.
- handler->next_ =
- CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
- &handler->writer_, cq, cq, &handler->next_);
- }
+ if (service->shutdown_) return;
+ // Request a Check() call.
+ handler->next_ =
+ CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
+ &handler->writer_, cq, cq, &handler->next_);
+ }
}
-DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- // The value of ok being false means that the server is shutting down.
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Process request.
- gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
- this);
+DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service)
+ : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ // The value of ok being false means that the server is shutting down.
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, database_, service_);
+ // Process request.
+ gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
+ this);
TString service_name;
- grpc::Status status = Status::OK;
- ByteBuffer response;
- if (!service_->DecodeRequest(request_, &service_name)) {
- status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
- } else {
- ServingStatus serving_status = database_->GetServingStatus(service_name);
- if (serving_status == NOT_FOUND) {
- status = Status(StatusCode::NOT_FOUND, "service name unknown");
- } else if (!service_->EncodeResponse(serving_status, &response)) {
- status = Status(StatusCode::INTERNAL, "could not encode response");
- }
- }
- // Send response.
- {
+ grpc::Status status = Status::OK;
+ ByteBuffer response;
+ if (!service_->DecodeRequest(request_, &service_name)) {
+ status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
+ } else {
+ ServingStatus serving_status = database_->GetServingStatus(service_name);
+ if (serving_status == NOT_FOUND) {
+ status = Status(StatusCode::NOT_FOUND, "service name unknown");
+ } else if (!service_->EncodeResponse(serving_status, &response)) {
+ status = Status(StatusCode::INTERNAL, "could not encode response");
+ }
+ }
+ // Send response.
+ {
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
- if (!service_->shutdown_) {
- next_ =
- CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- if (status.ok()) {
- writer_.Finish(response, status, &next_);
- } else {
- writer_.FinishWithError(status, &next_);
- }
- }
- }
+ if (!service_->shutdown_) {
+ next_ =
+ CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ if (status.ok()) {
+ writer_.Finish(response, status, &next_);
+ } else {
+ writer_.FinishWithError(status, &next_);
+ }
+ }
+ }
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
- service_, this);
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+ OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
+ service_, this);
}
- self.reset(); // To appease clang-tidy.
+ self.reset(); // To appease clang-tidy.
}
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<WatchCallHandler>(cq, database, service);
- WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
- {
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service) {
+ std::shared_ptr<CallHandler> self =
+ std::make_shared<WatchCallHandler>(cq, database, service);
+ WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
+ {
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request AsyncNotifyWhenDone().
- handler->on_done_notified_ =
- CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
- std::placeholders::_1, std::placeholders::_2),
- self /* copies ref */);
- handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
- // Request a Watch() call.
- handler->next_ =
- CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
- &handler->stream_, cq, cq,
- &handler->next_);
+ if (service->shutdown_) return;
+ // Request AsyncNotifyWhenDone().
+ handler->on_done_notified_ =
+ CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ self /* copies ref */);
+ handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
+ // Request a Watch() call.
+ handler->next_ =
+ CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
+ &handler->stream_, cq, cq,
+ &handler->next_);
}
}
-DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- // Server shutting down.
- //
- // AsyncNotifyWhenDone() needs to be called before the call starts, but the
- // tag will not pop out if the call never starts (
- // https://github.com/grpc/grpc/issues/10136). So we need to manually
- // release the ownership of the handler in this case.
- GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Parse request.
- if (!service_->DecodeRequest(request_, &service_name_)) {
- SendFinish(std::move(self),
- Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
- return;
- }
- // Register the call for updates to the service.
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
- service_, service_name_.c_str(), this);
- database_->RegisterCallHandler(service_name_, std::move(self));
+DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service)
+ : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ // Server shutting down.
+ //
+ // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+ // tag will not pop out if the call never starts (
+ // https://github.com/grpc/grpc/issues/10136). So we need to manually
+ // release the ownership of the handler in this case.
+ GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+ return;
+ }
+ // Spawn a new handler instance to serve the next new client. Every handler
+ // instance will deallocate itself when it's done.
+ CreateAndStart(cq_, database_, service_);
+ // Parse request.
+ if (!service_->DecodeRequest(request_, &service_name_)) {
+ SendFinish(std::move(self),
+ Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
+ return;
+ }
+ // Register the call for updates to the service.
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
+ service_, service_name_.c_str(), this);
+ database_->RegisterCallHandler(service_name_, std::move(self));
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
grpc_core::MutexLock lock(&send_mu_);
- // If there's already a send in flight, cache the new status, and
- // we'll start a new send for it when the one in flight completes.
- if (send_in_flight_) {
- pending_status_ = status;
- return;
- }
- // Start a send.
- SendHealthLocked(std::move(self), status);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
- send_in_flight_ = true;
- // Construct response.
- ByteBuffer response;
- bool success = service_->EncodeResponse(status, &response);
- // Grab shutdown lock and send response.
+ // If there's already a send in flight, cache the new status, and
+ // we'll start a new send for it when the one in flight completes.
+ if (send_in_flight_) {
+ pending_status_ = status;
+ return;
+ }
+ // Start a send.
+ SendHealthLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
+ send_in_flight_ = true;
+ // Construct response.
+ ByteBuffer response;
+ bool success = service_->EncodeResponse(status, &response);
+ // Grab shutdown lock and send response.
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
- if (service_->shutdown_) {
- SendFinishLocked(std::move(self), Status::CANCELLED);
- return;
- }
- if (!success) {
- SendFinishLocked(std::move(self),
- Status(StatusCode::INTERNAL, "could not encode response"));
- return;
- }
- next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Write(response, &next_);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- SendFinish(std::move(self), Status::CANCELLED);
- return;
- }
+ if (service_->shutdown_) {
+ SendFinishLocked(std::move(self), Status::CANCELLED);
+ return;
+ }
+ if (!success) {
+ SendFinishLocked(std::move(self),
+ Status(StatusCode::INTERNAL, "could not encode response"));
+ return;
+ }
+ next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Write(response, &next_);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (!ok) {
+ SendFinish(std::move(self), Status::CANCELLED);
+ return;
+ }
grpc_core::MutexLock lock(&send_mu_);
- send_in_flight_ = false;
- // If we got a new status since we started the last send, start a
- // new send for it.
- if (pending_status_ != NOT_FOUND) {
- auto status = pending_status_;
- pending_status_ = NOT_FOUND;
- SendHealthLocked(std::move(self), status);
- }
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
- if (finish_called_) return;
+ send_in_flight_ = false;
+ // If we got a new status since we started the last send, start a
+ // new send for it.
+ if (pending_status_ != NOT_FOUND) {
+ auto status = pending_status_;
+ pending_status_ = NOT_FOUND;
+ SendHealthLocked(std::move(self), status);
+ }
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
+ if (finish_called_) return;
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
- if (service_->shutdown_) return;
- SendFinishLocked(std::move(self), status);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Finish(status, &on_finish_done_);
- finish_called_ = true;
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health watch call finished (service_name: \"%s\", "
- "handler: %p).",
- service_, service_name_.c_str(), this);
- }
- self.reset(); // To appease clang-tidy.
-}
-
-// TODO(roth): This method currently assumes that there will be only one
-// thread polling the cq and invoking the corresponding callbacks. If
-// that changes, we will need to add synchronization here.
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
- GPR_ASSERT(ok);
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health watch call is notified done (handler: %p, "
- "is_cancelled: %d).",
- service_, this, static_cast<int>(ctx_.IsCancelled()));
- database_->UnregisterCallHandler(service_name_, self);
- SendFinish(std::move(self), Status::CANCELLED);
-}
-
+ if (service_->shutdown_) return;
+ SendFinishLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
+ on_finish_done_ =
+ CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
+ std::placeholders::_1, std::placeholders::_2),
+ std::move(self));
+ stream_.Finish(status, &on_finish_done_);
+ finish_called_ = true;
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+ if (ok) {
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch call finished (service_name: \"%s\", "
+ "handler: %p).",
+ service_, service_name_.c_str(), this);
+ }
+ self.reset(); // To appease clang-tidy.
+}
+
+// TODO(roth): This method currently assumes that there will be only one
+// thread polling the cq and invoking the corresponding callbacks. If
+// that changes, we will need to add synchronization here.
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+ OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
+ GPR_ASSERT(ok);
+ gpr_log(GPR_DEBUG,
+ "[HCS %p] Health watch call is notified done (handler: %p, "
+ "is_cancelled: %d).",
+ service_, this, static_cast<int>(ctx_.IsCancelled()));
+ database_->UnregisterCallHandler(service_name_, self);
+ SendFinish(std::move(self), Status::CANCELLED);
+}
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
index 9da1dfc15f..5da0ef935a 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
@@ -19,260 +19,260 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
-#include <atomic>
-#include <set>
+#include <atomic>
+#include <set>
-#include <grpc/support/log.h>
-#include <grpcpp/grpcpp.h>
+#include <grpc/support/log.h>
+#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
-#include <grpcpp/impl/codegen/async_generic_service.h>
-#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/sync.h"
-#include "src/core/lib/gprpp/thd.h"
-
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
- enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
-
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
- // Base class for call handlers.
- class CallHandler {
- public:
- virtual ~CallHandler() = default;
- virtual void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) = 0;
- };
-
- HealthCheckServiceImpl(DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq);
-
- ~HealthCheckServiceImpl();
-
- void StartServingThread();
-
+ // Base class for call handlers.
+ class CallHandler {
+ public:
+ virtual ~CallHandler() = default;
+ virtual void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) = 0;
+ };
+
+ HealthCheckServiceImpl(DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq);
+
+ ~HealthCheckServiceImpl();
+
+ void StartServingThread();
+
private:
- // A tag that can be called with a bool argument. It's tailored for
- // CallHandler's use. Before being used, it should be constructed with a
- // method of CallHandler and a shared pointer to the handler. The
- // shared pointer will be moved to the invoked function and the function
- // can only be invoked once. That makes ref counting of the handler easier,
- // because the shared pointer is not bound to the function and can be gone
- // once the invoked function returns (if not used any more).
- class CallableTag {
- public:
- using HandlerFunction =
- std::function<void(std::shared_ptr<CallHandler>, bool)>;
-
- CallableTag() {}
-
- CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
- : handler_function_(std::move(func)), handler_(std::move(handler)) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- }
-
- // Runs the tag. This should be called only once. The handler is no
- // longer owned by this tag after this method is invoked.
- void Run(bool ok) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- handler_function_(std::move(handler_), ok);
- }
-
- // Releases and returns the shared pointer to the handler.
- std::shared_ptr<CallHandler> ReleaseHandler() {
- return std::move(handler_);
- }
-
- private:
- HandlerFunction handler_function_ = nullptr;
- std::shared_ptr<CallHandler> handler_;
- };
-
- // Call handler for Check method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class CheckCallHandler : public CallHandler {
- public:
- // Instantiates a CheckCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // Not used for Check.
+ // A tag that can be called with a bool argument. It's tailored for
+ // CallHandler's use. Before being used, it should be constructed with a
+ // method of CallHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function
+ // can only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<CallHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no
+ // longer owned by this tag after this method is invoked.
+ void Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+ }
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<CallHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<CallHandler> handler_;
+ };
+
+ // Call handler for Check method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class CheckCallHandler : public CallHandler {
+ public:
+ // Instantiates a CheckCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // Not used for Check.
void SendHealth(std::shared_ptr<CallHandler> /*self*/,
ServingStatus /*status*/) override {}
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
- GenericServerAsyncResponseWriter writer_;
- ServerContext ctx_;
-
- CallableTag next_;
- };
-
- // Call handler for Watch method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class WatchCallHandler : public CallHandler {
- public:
- // Instantiates a WatchCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) override;
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Requires holding send_mu_.
- void SendHealthLocked(std::shared_ptr<CallHandler> self,
- ServingStatus status);
-
- // When sending a health result finishes.
- void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
-
- void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
-
- // Requires holding service_->cq_shutdown_mu_.
- void SendFinishLocked(std::shared_ptr<CallHandler> self,
- const Status& status);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when AsyncNotifyWhenDone() notifies us.
- void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
+ GenericServerAsyncResponseWriter writer_;
+ ServerContext ctx_;
+
+ CallableTag next_;
+ };
+
+ // Call handler for Watch method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class WatchCallHandler : public CallHandler {
+ public:
+ // Instantiates a WatchCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) override;
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Requires holding send_mu_.
+ void SendHealthLocked(std::shared_ptr<CallHandler> self,
+ ServingStatus status);
+
+ // When sending a health result finishes.
+ void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
+
+ // Requires holding service_->cq_shutdown_mu_.
+ void SendFinishLocked(std::shared_ptr<CallHandler> self,
+ const Status& status);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
TString service_name_;
- GenericServerAsyncWriter stream_;
- ServerContext ctx_;
-
+ GenericServerAsyncWriter stream_;
+ ServerContext ctx_;
+
grpc_core::Mutex send_mu_;
- bool send_in_flight_ = false; // Guarded by mu_.
- ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
-
- bool finish_called_ = false;
- CallableTag next_;
- CallableTag on_done_notified_;
- CallableTag on_finish_done_;
- };
-
- // Handles the incoming requests and drives the completion queue in a loop.
- static void Serve(void* arg);
-
- // Returns true on success.
- static bool DecodeRequest(const ByteBuffer& request,
+ bool send_in_flight_ = false; // Guarded by mu_.
+ ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
+
+ bool finish_called_ = false;
+ CallableTag next_;
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Serve(void* arg);
+
+ // Returns true on success.
+ static bool DecodeRequest(const ByteBuffer& request,
TString* service_name);
- static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
-
- // Needed to appease Windows compilers, which don't seem to allow
- // nested classes to access protected members in the parent's
- // superclass.
- using Service::RequestAsyncServerStreaming;
- using Service::RequestAsyncUnary;
-
- DefaultHealthCheckService* database_;
- std::unique_ptr<ServerCompletionQueue> cq_;
-
- // To synchronize the operations related to shutdown state of cq_, so that
- // we don't enqueue new tags into cq_ after it is already shut down.
+ static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
+
+ // Needed to appease Windows compilers, which don't seem to allow
+ // nested classes to access protected members in the parent's
+ // superclass.
+ using Service::RequestAsyncServerStreaming;
+ using Service::RequestAsyncUnary;
+
+ DefaultHealthCheckService* database_;
+ std::unique_ptr<ServerCompletionQueue> cq_;
+
+ // To synchronize the operations related to shutdown state of cq_, so that
+ // we don't enqueue new tags into cq_ after it is already shut down.
grpc_core::Mutex cq_shutdown_mu_;
- std::atomic_bool shutdown_{false};
- std::unique_ptr<::grpc_core::Thread> thread_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
};
DefaultHealthCheckService();
-
+
void SetServingStatus(const TString& service_name, bool serving) override;
void SetServingStatus(bool serving) override;
-
- void Shutdown() override;
-
+
+ void Shutdown() override;
+
ServingStatus GetServingStatus(const TString& service_name) const;
- HealthCheckServiceImpl* GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq);
-
+ HealthCheckServiceImpl* GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq);
+
private:
- // Stores the current serving status of a service and any call
- // handlers registered for updates when the service's status changes.
- class ServiceData {
- public:
- void SetServingStatus(ServingStatus status);
- ServingStatus GetServingStatus() const { return status_; }
- void AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
- void RemoveCallHandler(
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
- bool Unused() const {
- return call_handlers_.empty() && status_ == NOT_FOUND;
- }
-
- private:
- ServingStatus status_ = NOT_FOUND;
- std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
- call_handlers_;
- };
-
- void RegisterCallHandler(
+ // Stores the current serving status of a service and any call
+ // handlers registered for updates when the service's status changes.
+ class ServiceData {
+ public:
+ void SetServingStatus(ServingStatus status);
+ ServingStatus GetServingStatus() const { return status_; }
+ void AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ void RemoveCallHandler(
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
+ bool Unused() const {
+ return call_handlers_.empty() && status_ == NOT_FOUND;
+ }
+
+ private:
+ ServingStatus status_ = NOT_FOUND;
+ std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
+ call_handlers_;
+ };
+
+ void RegisterCallHandler(
const TString& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
-
- void UnregisterCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
+ void UnregisterCallHandler(
const TString& service_name,
- const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
-
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
+
mutable grpc_core::Mutex mu_;
bool shutdown_ = false; // Guarded by mu_.
std::map<TString, ServiceData> services_map_; // Guarded by mu_.
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
index 00ad794a04..eabed9711e 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
@@ -1,81 +1,81 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
-#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
-
-#include <grpc/impl/codegen/port_platform.h>
-
-namespace grpc {
-namespace load_reporter {
-
-// TODO(juanlishen): Update the version number with the PR number every time
-// there is any change to the server load reporter.
-constexpr uint32_t kVersion = 15853;
-
-// TODO(juanlishen): This window size is from the internal spec for the load
-// reporter. Need to ask the gRPC LB team whether we should make this and the
-// fetching interval configurable.
-constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
-constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
-
-constexpr size_t kLbIdLength = 8;
-constexpr size_t kIpv4AddressLength = 8;
-constexpr size_t kIpv6AddressLength = 32;
-
-constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
-
-// Call statuses.
-
-constexpr char kCallStatusOk[] = "OK";
-constexpr char kCallStatusServerError[] = "5XX";
-constexpr char kCallStatusClientError[] = "4XX";
-
-// Tag keys.
-
-constexpr char kTagKeyToken[] = "token";
-constexpr char kTagKeyHost[] = "host";
-constexpr char kTagKeyUserId[] = "user_id";
-constexpr char kTagKeyStatus[] = "status";
-constexpr char kTagKeyMetricName[] = "metric_name";
-
-// Measure names.
-
-constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
-constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
-constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
-constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
-constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
-constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
-
-// View names.
-
-constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
-constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
-constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
-constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
-constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
-constexpr char kViewOtherCallMetricCount[] =
- "grpc.io/lb_view/other_call_metric_count";
-constexpr char kViewOtherCallMetricValue[] =
- "grpc.io/lb_view/other_call_metric_value";
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+namespace grpc {
+namespace load_reporter {
+
+// TODO(juanlishen): Update the version number with the PR number every time
+// there is any change to the server load reporter.
+constexpr uint32_t kVersion = 15853;
+
+// TODO(juanlishen): This window size is from the internal spec for the load
+// reporter. Need to ask the gRPC LB team whether we should make this and the
+// fetching interval configurable.
+constexpr uint32_t kFeedbackSampleWindowSeconds = 10;
+constexpr uint32_t kFetchAndSampleIntervalSeconds = 1;
+
+constexpr size_t kLbIdLength = 8;
+constexpr size_t kIpv4AddressLength = 8;
+constexpr size_t kIpv6AddressLength = 32;
+
+constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>";
+
+// Call statuses.
+
+constexpr char kCallStatusOk[] = "OK";
+constexpr char kCallStatusServerError[] = "5XX";
+constexpr char kCallStatusClientError[] = "4XX";
+
+// Tag keys.
+
+constexpr char kTagKeyToken[] = "token";
+constexpr char kTagKeyHost[] = "host";
+constexpr char kTagKeyUserId[] = "user_id";
+constexpr char kTagKeyStatus[] = "status";
+constexpr char kTagKeyMetricName[] = "metric_name";
+
+// Measure names.
+
+constexpr char kMeasureStartCount[] = "grpc.io/lb/start_count";
+constexpr char kMeasureEndCount[] = "grpc.io/lb/end_count";
+constexpr char kMeasureEndBytesSent[] = "grpc.io/lb/bytes_sent";
+constexpr char kMeasureEndBytesReceived[] = "grpc.io/lb/bytes_received";
+constexpr char kMeasureEndLatencyMs[] = "grpc.io/lb/latency_ms";
+constexpr char kMeasureOtherCallMetric[] = "grpc.io/lb/other_call_metric";
+
+// View names.
+
+constexpr char kViewStartCount[] = "grpc.io/lb_view/start_count";
+constexpr char kViewEndCount[] = "grpc.io/lb_view/end_count";
+constexpr char kViewEndBytesSent[] = "grpc.io/lb_view/bytes_sent";
+constexpr char kViewEndBytesReceived[] = "grpc.io/lb_view/bytes_received";
+constexpr char kViewEndLatencyMs[] = "grpc.io/lb_view/latency_ms";
+constexpr char kViewOtherCallMetricCount[] =
+ "grpc.io/lb_view/other_call_metric_count";
+constexpr char kViewOtherCallMetricValue[] =
+ "grpc.io/lb_view/other_call_metric_value";
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_UTIL_H
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
index f514b0752f..8544b05417 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats.h
@@ -1,36 +1,36 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
-#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
-
-#include <grpc/impl/codegen/port_platform.h>
-
-#include <utility>
-
-namespace grpc {
-namespace load_reporter {
-
-// Reads the CPU stats (in a pair of busy and total numbers) from the system.
-// The units of the stats should be the same.
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <utility>
+
+namespace grpc {
+namespace load_reporter {
+
+// Reads the CPU stats (in a pair of busy and total numbers) from the system.
+// The units of the stats should be the same.
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl();
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_GET_CPU_STATS_H
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
index 561d4f5048..8565d384a8 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_linux.cc
@@ -1,48 +1,48 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_LINUX
-
-#include <cstdio>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- FILE* fp;
- fp = fopen("/proc/stat", "r");
- uint64_t user, nice, system, idle;
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_LINUX
+
+#include <cstdio>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILE* fp;
+ fp = fopen("/proc/stat", "r");
+ uint64_t user, nice, system, idle;
if (fscanf(fp, "cpu %lu %lu %lu %lu", &user, &nice, &system, &idle) != 4) {
// Something bad happened with the information, so assume it's all invalid
user = nice = system = idle = 0;
}
- fclose(fp);
- busy = user + nice + system;
- total = busy + idle;
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GPR_LINUX
+ fclose(fp);
+ busy = user + nice + system;
+ total = busy + idle;
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_LINUX
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
index dbdde304c2..125631a3d1 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_macos.cc
@@ -1,45 +1,45 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_APPLE
-
-#include <mach/mach.h>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- host_cpu_load_info_data_t cpuinfo;
- mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
- if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
- (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
- for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
- busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
- }
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GPR_APPLE
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_APPLE
+
+#include <mach/mach.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ host_cpu_load_info_data_t cpuinfo;
+ mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
+ if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO,
+ (host_info_t)&cpuinfo, &count) == KERN_SUCCESS) {
+ for (int i = 0; i < CPU_STATE_MAX; i++) total += cpuinfo.cpu_ticks[i];
+ busy = total - cpuinfo.cpu_ticks[CPU_STATE_IDLE];
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_APPLE
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
index 80fb8b6da1..e2d61859c8 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_unsupported.cc
@@ -1,40 +1,40 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
-
-#include <grpc/support/log.h>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- gpr_log(GPR_ERROR,
- "Platforms other than Linux, Windows, and MacOS are not supported.");
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#if !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
+
+#include <grpc/support/log.h>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ gpr_log(GPR_ERROR,
+ "Platforms other than Linux, Windows, and MacOS are not supported.");
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // !defined(GPR_LINUX) && !defined(GPR_WINDOWS) && !defined(GPR_APPLE)
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
index 0a98e848a2..bc5718a056 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/get_cpu_stats_windows.cc
@@ -1,55 +1,55 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_WINDOWS
-
-#include <windows.h>
-#include <cstdint>
-
-#include "src/cpp/server/load_reporter/get_cpu_stats.h"
-
-namespace grpc {
-namespace load_reporter {
-
-namespace {
-
-uint64_t FiletimeToInt(const FILETIME& ft) {
- ULARGE_INTEGER i;
- i.LowPart = ft.dwLowDateTime;
- i.HighPart = ft.dwHighDateTime;
- return i.QuadPart;
-}
-
-} // namespace
-
-std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
- uint64_t busy = 0, total = 0;
- FILETIME idle, kernel, user;
- if (GetSystemTimes(&idle, &kernel, &user) != 0) {
- total = FiletimeToInt(kernel) + FiletimeToInt(user);
- busy = total - FiletimeToInt(idle);
- }
- return std::make_pair(busy, total);
-}
-
-} // namespace load_reporter
-} // namespace grpc
-
-#endif // GPR_WINDOWS
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS
+
+#include <windows.h>
+#include <cstdint>
+
+#include "src/cpp/server/load_reporter/get_cpu_stats.h"
+
+namespace grpc {
+namespace load_reporter {
+
+namespace {
+
+uint64_t FiletimeToInt(const FILETIME& ft) {
+ ULARGE_INTEGER i;
+ i.LowPart = ft.dwLowDateTime;
+ i.HighPart = ft.dwHighDateTime;
+ return i.QuadPart;
+}
+
+} // namespace
+
+std::pair<uint64_t, uint64_t> GetCpuStatsImpl() {
+ uint64_t busy = 0, total = 0;
+ FILETIME idle, kernel, user;
+ if (GetSystemTimes(&idle, &kernel, &user) != 0) {
+ total = FiletimeToInt(kernel) + FiletimeToInt(user);
+ busy = total - FiletimeToInt(idle);
+ }
+ return std::make_pair(busy, total);
+}
+
+} // namespace load_reporter
+} // namespace grpc
+
+#endif // GPR_WINDOWS
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
index f07fa812a7..12e8203fe1 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
@@ -16,15 +16,15 @@
*
*/
-#include <grpc/impl/codegen/port_platform.h>
-
-#include <stdio.h>
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
#include <cstdlib>
#include <set>
#include <unordered_map>
#include <vector>
-#include "src/core/lib/iomgr/socket_utils.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
namespace grpc {
@@ -79,65 +79,65 @@ const typename C::value_type* RandomElement(const C& container) {
LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token,
TString user_id)
- : user_id_(std::move(user_id)) {
- GPR_ASSERT(client_ip_and_token.size() >= 2);
- int ip_hex_size;
- GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
- &ip_hex_size) == 1);
- GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
- ip_hex_size == kIpv6AddressLength);
- size_t cur_pos = 2;
- client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
- cur_pos += ip_hex_size;
- if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
- lb_id_ = kInvalidLbId;
- lb_tag_ = "";
- } else {
- lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
- lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
- }
-}
-
+ : user_id_(std::move(user_id)) {
+ GPR_ASSERT(client_ip_and_token.size() >= 2);
+ int ip_hex_size;
+ GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
+ &ip_hex_size) == 1);
+ GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
+ ip_hex_size == kIpv6AddressLength);
+ size_t cur_pos = 2;
+ client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
+ cur_pos += ip_hex_size;
+ if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
+ lb_id_ = kInvalidLbId;
+ lb_tag_ = "";
+ } else {
+ lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
+ lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
+ }
+}
+
TString LoadRecordKey::GetClientIpBytes() const {
- if (client_ip_hex_.empty()) {
- return "";
- } else if (client_ip_hex_.size() == kIpv4AddressLength) {
- uint32_t ip_bytes;
- if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
- gpr_log(GPR_ERROR,
- "Can't parse client IP (%s) from a hex string to an integer.",
- client_ip_hex_.c_str());
- return "";
- }
- ip_bytes = grpc_htonl(ip_bytes);
+ if (client_ip_hex_.empty()) {
+ return "";
+ } else if (client_ip_hex_.size() == kIpv4AddressLength) {
+ uint32_t ip_bytes;
+ if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
+ gpr_log(GPR_ERROR,
+ "Can't parse client IP (%s) from a hex string to an integer.",
+ client_ip_hex_.c_str());
+ return "";
+ }
+ ip_bytes = grpc_htonl(ip_bytes);
return TString(reinterpret_cast<const char*>(&ip_bytes),
sizeof(ip_bytes));
- } else if (client_ip_hex_.size() == kIpv6AddressLength) {
- uint32_t ip_bytes[4];
- for (size_t i = 0; i < 4; ++i) {
- if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
- ip_bytes + i) != 1) {
- gpr_log(
- GPR_ERROR,
- "Can't parse client IP part (%s) from a hex string to an integer.",
- client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
- return "";
- }
- ip_bytes[i] = grpc_htonl(ip_bytes[i]);
- }
+ } else if (client_ip_hex_.size() == kIpv6AddressLength) {
+ uint32_t ip_bytes[4];
+ for (size_t i = 0; i < 4; ++i) {
+ if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
+ ip_bytes + i) != 1) {
+ gpr_log(
+ GPR_ERROR,
+ "Can't parse client IP part (%s) from a hex string to an integer.",
+ client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
+ return "";
+ }
+ ip_bytes[i] = grpc_htonl(ip_bytes[i]);
+ }
return TString(reinterpret_cast<const char*>(ip_bytes),
sizeof(ip_bytes));
- } else {
- GPR_UNREACHABLE_CODE(return "");
- }
-}
-
+ } else {
+ GPR_UNREACHABLE_CODE(return "");
+ }
+}
+
LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls,
- double total_metric_value) {
- call_metrics_.emplace(std::move(metric_name),
- CallMetricValue(num_calls, total_metric_value));
-}
-
+ double total_metric_value) {
+ call_metrics_.emplace(std::move(metric_name),
+ CallMetricValue(num_calls, total_metric_value));
+}
+
void PerBalancerStore::MergeRow(const LoadRecordKey& key,
const LoadRecordValue& value) {
// During suspension, the load data received will be dropped.
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
index 61ba618331..7047488c0e 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
@@ -28,8 +28,8 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
-#include "src/cpp/server/load_reporter/constants.h"
-
+#include "src/cpp/server/load_reporter/constants.h"
+
#include <util/string/cast.h>
namespace grpc {
@@ -76,9 +76,9 @@ class LoadRecordKey {
user_id_(std::move(user_id)),
client_ip_hex_(std::move(client_ip_hex)) {}
- // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
+ // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
LoadRecordKey(const TString& client_ip_and_token, TString user_id);
-
+
TString ToString() const {
return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
@@ -90,9 +90,9 @@ class LoadRecordKey {
user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
}
- // Gets the client IP bytes in network order (i.e., big-endian).
+ // Gets the client IP bytes in network order (i.e., big-endian).
TString GetClientIpBytes() const;
-
+
// Getters.
const TString& lb_id() const { return lb_id_; }
const TString& lb_tag() const { return lb_tag_; }
@@ -126,8 +126,8 @@ class LoadRecordKey {
class LoadRecordValue {
public:
explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
- uint64_t error_count = 0, uint64_t bytes_sent = 0,
- uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
+ uint64_t error_count = 0, uint64_t bytes_sent = 0,
+ uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
: start_count_(start_count),
ok_count_(ok_count),
error_count_(error_count),
@@ -136,8 +136,8 @@ class LoadRecordValue {
latency_ms_(latency_ms) {}
LoadRecordValue(TString metric_name, uint64_t num_calls,
- double total_metric_value);
-
+ double total_metric_value);
+
void MergeFrom(const LoadRecordValue& other) {
start_count_ += other.start_count_;
ok_count_ += other.ok_count_;
@@ -175,9 +175,9 @@ class LoadRecordValue {
uint64_t start_count() const { return start_count_; }
uint64_t ok_count() const { return ok_count_; }
uint64_t error_count() const { return error_count_; }
- uint64_t bytes_sent() const { return bytes_sent_; }
- uint64_t bytes_recv() const { return bytes_recv_; }
- uint64_t latency_ms() const { return latency_ms_; }
+ uint64_t bytes_sent() const { return bytes_sent_; }
+ uint64_t bytes_recv() const { return bytes_recv_; }
+ uint64_t latency_ms() const { return latency_ms_; }
const std::unordered_map<TString, CallMetricValue>& call_metrics() const {
return call_metrics_;
}
@@ -186,9 +186,9 @@ class LoadRecordValue {
uint64_t start_count_ = 0;
uint64_t ok_count_ = 0;
uint64_t error_count_ = 0;
- uint64_t bytes_sent_ = 0;
- uint64_t bytes_recv_ = 0;
- uint64_t latency_ms_ = 0;
+ uint64_t bytes_sent_ = 0;
+ uint64_t bytes_recv_ = 0;
+ uint64_t latency_ms_ = 0;
std::unordered_map<TString, CallMetricValue> call_metrics_;
};
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
index 24ad9f3f24..a57ddc4715 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
@@ -1,47 +1,47 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/impl/codegen/port_platform.h>
-
-#include <grpcpp/ext/server_load_reporting.h>
-
-#include <cmath>
-
-#include <grpc/support/log.h>
-
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <grpcpp/ext/server_load_reporting.h>
+
+#include <cmath>
+
+#include <grpc/support/log.h>
+
namespace grpc {
-namespace load_reporter {
-namespace experimental {
-
-void AddLoadReportingCost(grpc::ServerContext* ctx,
+namespace load_reporter {
+namespace experimental {
+
+void AddLoadReportingCost(grpc::ServerContext* ctx,
const TString& cost_name, double cost_value) {
- if (std::isnormal(cost_value)) {
+ if (std::isnormal(cost_value)) {
TString buf;
- buf.resize(sizeof(cost_value) + cost_name.size());
- memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
- memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
- cost_name.size());
- ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
- } else {
- gpr_log(GPR_ERROR, "Call metric value is not normal.");
- }
-}
-
-} // namespace experimental
-} // namespace load_reporter
+ buf.resize(sizeof(cost_value) + cost_name.size());
+ memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
+ memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
+ cost_name.size());
+ ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf);
+ } else {
+ gpr_log(GPR_ERROR, "Call metric value is not normal.");
+ }
+}
+
+} // namespace experimental
+} // namespace load_reporter
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
index 732602bcb7..3b73453515 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
@@ -44,10 +44,10 @@ void AuthMetadataProcessorAyncWrapper::Process(
return;
}
if (w->processor_->IsBlocking()) {
- w->thread_pool_->Add([w, context, md, num_md, cb, user_data] {
- w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md,
- cb, user_data);
- });
+ w->thread_pool_->Add([w, context, md, num_md, cb, user_data] {
+ w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md,
+ cb, user_data);
+ });
} else {
// invoke directly.
w->InvokeProcessor(context, md, num_md, cb, user_data);
@@ -62,7 +62,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key),
StringRefFromSlice(&md[i].value)));
}
- SecureAuthContext context(ctx);
+ SecureAuthContext context(ctx);
AuthMetadataProcessor::OutputMetadata consumed_metadata;
AuthMetadataProcessor::OutputMetadata response_metadata;
@@ -138,12 +138,12 @@ std::shared_ptr<ServerCredentials> AltsServerCredentials(
new SecureServerCredentials(c_creds));
}
-std::shared_ptr<ServerCredentials> LocalServerCredentials(
- grpc_local_connect_type type) {
- return std::shared_ptr<ServerCredentials>(
- new SecureServerCredentials(grpc_local_server_credentials_create(type)));
-}
-
+std::shared_ptr<ServerCredentials> LocalServerCredentials(
+ grpc_local_connect_type type) {
+ return std::shared_ptr<ServerCredentials>(
+ new SecureServerCredentials(grpc_local_server_credentials_create(type)));
+}
+
std::shared_ptr<ServerCredentials> TlsServerCredentials(
const grpc::experimental::TlsCredentialsOptions& options) {
grpc::GrpcLibraryCodegen init;
diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc
index 0cc00b365f..ec5d4eec8c 100644
--- a/contrib/libs/grpc/src/cpp/server/server_builder.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc
@@ -24,8 +24,8 @@
#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
-#include <utility>
-
+#include <utility>
+
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
@@ -44,8 +44,8 @@ static void do_plugin_list_init(void) {
}
ServerBuilder::ServerBuilder()
- : max_receive_message_size_(INT_MIN),
- max_send_message_size_(INT_MIN),
+ : max_receive_message_size_(INT_MIN),
+ max_send_message_size_(INT_MIN),
sync_server_settings_(SyncServerSettings()),
resource_quota_(nullptr) {
gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
@@ -71,9 +71,9 @@ ServerBuilder::~ServerBuilder() {
std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue(
- GRPC_CQ_NEXT,
- is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
- nullptr);
+ GRPC_CQ_NEXT,
+ is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
+ nullptr);
cqs_.push_back(cq);
return std::unique_ptr<grpc::ServerCompletionQueue>(cq);
}
@@ -180,7 +180,7 @@ ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
grpc_compression_level level) {
- maybe_default_compression_level_.is_set = true;
+ maybe_default_compression_level_.is_set = true;
maybe_default_compression_level_.level = level;
return *this;
}
@@ -212,14 +212,14 @@ ServerBuilder& ServerBuilder::AddListeningPort(
while (addr_uri[pos] == '/') ++pos; // Skip slashes.
addr = addr_uri.substr(pos);
}
- Port port = {addr, std::move(creds), selected_port};
+ Port port = {addr, std::move(creds), selected_port};
ports_.push_back(port);
return *this;
}
std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
grpc::ChannelArguments args;
- if (max_receive_message_size_ >= -1) {
+ if (max_receive_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
if (max_send_message_size_ >= -1) {
@@ -306,14 +306,14 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
- sync_server_cqs->emplace_back(
+ sync_server_cqs->emplace_back(
new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
}
}
- // TODO(vjpai): Add a section here for plugins once they can support callback
- // methods
-
+ // TODO(vjpai): Add a section here for plugins once they can support callback
+ // methods
+
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
@@ -324,16 +324,16 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.cq_timeout_msec);
}
- if (has_callback_methods) {
- gpr_log(GPR_INFO, "Callback server.");
- }
-
+ if (has_callback_methods) {
+ gpr_log(GPR_INFO, "Callback server.");
+ }
+
std::unique_ptr<grpc::Server> server(new grpc::Server(
&args, sync_server_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
std::move(acceptors_), resource_quota_,
std::move(interceptor_creators_)));
-
+
ServerInitializer* initializer = server->initializer();
// Register all the completion queues with the server. i.e
@@ -347,10 +347,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
}
if (has_callback_methods || callback_generic_service_ != nullptr) {
- auto* cq = server->CallbackCQ();
- grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
- }
-
+ auto* cq = server->CallbackCQ();
+ grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
+ }
+
// cqs_ contains the completion queue added by calling the ServerBuilder's
// AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
// calling Next() or AsyncNext()) and hence are not safe to be used for
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index c2a911c7f7..56bf75d730 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -30,10 +30,10 @@
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/byte_buffer.h>
-#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/method_handler.h>
-#include <grpcpp/impl/codegen/server_interceptor.h>
+#include <grpcpp/impl/codegen/server_interceptor.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/impl/server_initializer.h>
@@ -43,10 +43,10 @@
#include <grpcpp/support/time.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
@@ -58,13 +58,13 @@
namespace grpc {
namespace {
-// The default value for maximum number of threads that can be created in the
-// sync server. This value of INT_MAX is chosen to match the default behavior if
-// no ResourceQuota is set. To modify the max number of threads in a sync
-// server, pass a custom ResourceQuota object (with the desired number of
-// max-threads set) to the server builder.
-#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
-
+// The default value for maximum number of threads that can be created in the
+// sync server. This value of INT_MAX is chosen to match the default behavior if
+// no ResourceQuota is set. To modify the max number of threads in a sync
+// server, pass a custom ResourceQuota object (with the desired number of
+// max-threads set) to the server builder.
+#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -319,10 +319,10 @@ class Server::UnimplementedAsyncResponse final
grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag,
status)) {
- delete this;
- } else {
- // The tag was swallowed due to interception. We will see it again.
- }
+ delete this;
+ } else {
+ // The tag was swallowed due to interception. We will see it again.
+ }
return false;
}
@@ -334,7 +334,7 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
public:
SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag)
: method_(method),
- method_tag_(method_tag),
+ method_tag_(method_tag),
in_flight_(false),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
@@ -362,11 +362,11 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- if (method_tag_) {
- if (grpc_server_request_registered_call(
- server, method_tag_, &call_, &deadline_, &request_metadata_,
+ if (method_tag_) {
+ if (grpc_server_request_registered_call(
+ server, method_tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this) != GRPC_CALL_OK) {
+ notify_cq, this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
@@ -384,21 +384,21 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
}
}
- void PostShutdownCleanup() {
- if (call_) {
- grpc_call_unref(call_);
- call_ = nullptr;
- }
- if (cq_) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
- }
- }
-
+ void PostShutdownCleanup() {
+ if (call_) {
+ grpc_call_unref(call_);
+ call_ = nullptr;
+ }
+ if (cq_) {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+ }
+
bool FinalizeResult(void** /*tag*/, bool* status) override {
if (!*status) {
grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ cq_ = nullptr;
}
if (call_details_) {
deadline_ = call_details_->deadline;
@@ -408,26 +408,26 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
return true;
}
- // The CallData class represents a call that is "active" as opposed
- // to just being requested. It wraps and takes ownership of the cq from
- // the call request
+ // The CallData class represents a call that is "active" as opposed
+ // to just being requested. It wraps and takes ownership of the cq from
+ // the call request
class CallData final {
public:
explicit CallData(Server* server, SyncRequest* mrd)
: cq_(mrd->cq_),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
- request_payload_(has_request_payload_ ? mrd->request_payload_
- : nullptr),
- request_(nullptr),
- method_(mrd->method_),
- call_(
- mrd->call_, server, &cq_, server->max_receive_message_size(),
- ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
- server->interceptor_creators_)),
- server_(server),
- global_callbacks_(nullptr),
- resources_(false) {
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ request_(nullptr),
+ method_(mrd->method_),
+ call_(
+ mrd->call_, server, &cq_, server->max_receive_message_size(),
+ ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
+ server->interceptor_creators_)),
+ server_(server),
+ global_callbacks_(nullptr),
+ resources_(false) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -441,82 +441,82 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
}
}
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
- bool resources) {
- global_callbacks_ = global_callbacks;
- resources_ = resources;
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- interceptor_methods_.AddInterceptionHookPoint(
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
- interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
- if (has_request_payload_) {
- // Set interception point for RECV MESSAGE
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- request_ = handler->Deserialize(call_.call(), request_payload_,
+ if (has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ request_ = handler->Deserialize(call_.call(), request_payload_,
&request_status_, nullptr);
- request_payload_ = nullptr;
- interceptor_methods_.AddInterceptionHookPoint(
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_, nullptr);
- }
-
- if (interceptor_methods_.RunInterceptors(
- [this]() { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
+ interceptor_methods_.SetRecvMessage(request_, nullptr);
+ }
+
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
}
- void ContinueRunAfterInterception() {
- {
- ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
- global_callbacks_->PreSynchronousRequest(&ctx_);
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
+ void ContinueRunAfterInterception() {
+ {
+ ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
+ global_callbacks_->PreSynchronousRequest(&ctx_);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_, request_status_, nullptr, nullptr));
- request_ = nullptr;
- global_callbacks_->PostSynchronousRequest(&ctx_);
-
- cq_.Shutdown();
-
+ request_ = nullptr;
+ global_callbacks_->PostSynchronousRequest(&ctx_);
+
+ cq_.Shutdown();
+
grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
- cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
-
- /* Ensure the cq_ is shutdown */
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
grpc::DummyTag ignored_tag;
- GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- }
- delete this;
- }
-
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
+ }
+ delete this;
+ }
+
private:
grpc::CompletionQueue cq_;
grpc::ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- void* request_;
+ void* request_;
grpc::Status request_status_;
grpc::internal::RpcServiceMethod* const method_;
grpc::internal::Call call_;
- Server* server_;
- std::shared_ptr<GlobalCallbacks> global_callbacks_;
- bool resources_;
+ Server* server_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
private:
grpc::internal::RpcServiceMethod* const method_;
- void* const method_tag_;
+ void* const method_tag_;
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
@@ -541,19 +541,19 @@ class Server::CallbackRequest final
CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
grpc::CompletionQueue* cq,
grpc_core::Server::RegisteredCallAllocation* data)
- : server_(server),
- method_(method),
+ : server_(server),
+ method_(method),
has_request_payload_(method->method_type() ==
grpc::internal::RpcMethod::NORMAL_RPC ||
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
cq_(cq),
- tag_(this) {
+ tag_(this) {
CommonSetup(server, data);
data->deadline = &deadline_;
data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
- }
-
+ }
+
// For generic services, method is nullptr since these services don't have
// pre-defined methods.
CallbackRequest(Server* server, grpc::CompletionQueue* cq,
@@ -567,8 +567,8 @@ class Server::CallbackRequest final
CommonSetup(server, data);
grpc_call_details_init(call_details_);
data->details = call_details_;
- }
-
+ }
+
~CallbackRequest() {
delete call_details_;
grpc_metadata_array_destroy(&request_metadata_);
@@ -576,21 +576,21 @@ class Server::CallbackRequest final
grpc_byte_buffer_destroy(request_payload_);
}
server_->UnrefWithPossibleNotify();
- }
-
+ }
+
// Needs specialization to account for different processing of metadata
// in generic API
bool FinalizeResult(void** tag, bool* status) override;
-
- private:
+
+ private:
// method_name needs to be specialized between named method and generic
const char* method_name() const;
- class CallbackCallTag : public grpc_experimental_completion_queue_functor {
- public:
+ class CallbackCallTag : public grpc_experimental_completion_queue_functor {
+ public:
CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
- functor_run = &CallbackCallTag::StaticRun;
+ functor_run = &CallbackCallTag::StaticRun;
// Set inlineable to true since this callback is internally-controlled
// without taking any locks, and thus does not need to be run from the
// executor (which triggers a thread hop). This should only be used by
@@ -598,42 +598,42 @@ class Server::CallbackRequest final
// here is actually non-trivial, but there is no chance of having user
// locks conflict with each other so it's ok to run inlined.
inlineable = true;
- }
-
- // force_run can not be performed on a tag if operations using this tag
- // have been sent to PerformOpsOnCall. It is intended for error conditions
- // that are detected before the operations are internally processed.
- void force_run(bool ok) { Run(ok); }
-
- private:
+ }
+
+ // force_run can not be performed on a tag if operations using this tag
+ // have been sent to PerformOpsOnCall. It is intended for error conditions
+ // that are detected before the operations are internally processed.
+ void force_run(bool ok) { Run(ok); }
+
+ private:
Server::CallbackRequest<ServerContextType>* req_;
grpc::internal::Call* call_;
-
- static void StaticRun(grpc_experimental_completion_queue_functor* cb,
- int ok) {
- static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
- }
- void Run(bool ok) {
- void* ignored = req_;
- bool new_ok = ok;
- GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
- GPR_ASSERT(ignored == req_);
-
- if (!ok) {
- // The call has been shutdown.
- // Delete its contents to free up the request.
- delete req_;
- return;
- }
-
- // Bind the call, deadline, and metadata from what we got
- req_->ctx_.set_call(req_->call_);
- req_->ctx_.cq_ = req_->cq_;
- req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
- &req_->request_metadata_);
- req_->request_metadata_.count = 0;
-
- // Create a C++ Call to control the underlying core call
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = req_;
+ bool new_ok = ok;
+ GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == req_);
+
+ if (!ok) {
+ // The call has been shutdown.
+ // Delete its contents to free up the request.
+ delete req_;
+ return;
+ }
+
+ // Bind the call, deadline, and metadata from what we got
+ req_->ctx_.set_call(req_->call_);
+ req_->ctx_.cq_ = req_->cq_;
+ req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
+ &req_->request_metadata_);
+ req_->request_metadata_.count = 0;
+
+ // Create a C++ Call to control the underlying core call
call_ =
new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call)))
grpc::internal::Call(
@@ -645,71 +645,71 @@ class Server::CallbackRequest final
? req_->method_->method_type()
: grpc::internal::RpcMethod::BIDI_STREAMING,
req_->server_->interceptor_creators_));
-
- req_->interceptor_methods_.SetCall(call_);
- req_->interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- req_->interceptor_methods_.AddInterceptionHookPoint(
+
+ req_->interceptor_methods_.SetCall(call_);
+ req_->interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
- req_->interceptor_methods_.SetRecvInitialMetadata(
- &req_->ctx_.client_metadata_);
-
- if (req_->has_request_payload_) {
- // Set interception point for RECV MESSAGE
- req_->request_ = req_->method_->handler()->Deserialize(
+ req_->interceptor_methods_.SetRecvInitialMetadata(
+ &req_->ctx_.client_metadata_);
+
+ if (req_->has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ req_->request_ = req_->method_->handler()->Deserialize(
req_->call_, req_->request_payload_, &req_->request_status_,
&req_->handler_data_);
- req_->request_payload_ = nullptr;
- req_->interceptor_methods_.AddInterceptionHookPoint(
+ req_->request_payload_ = nullptr;
+ req_->interceptor_methods_.AddInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
- }
-
- if (req_->interceptor_methods_.RunInterceptors(
- [this] { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
- }
- void ContinueRunAfterInterception() {
+ req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
+ }
+
+ if (req_->interceptor_methods_.RunInterceptors(
+ [this] { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
+ }
+ }
+ void ContinueRunAfterInterception() {
auto* handler = (req_->method_ != nullptr)
? req_->method_->handler()
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
req_->handler_data_, [this] { delete req_; }));
- }
- };
-
+ }
+ };
+
template <class CallAllocation>
void CommonSetup(Server* server, CallAllocation* data) {
server->Ref();
- grpc_metadata_array_init(&request_metadata_);
+ grpc_metadata_array_init(&request_metadata_);
data->tag = &tag_;
data->call = &call_;
data->initial_metadata = &request_metadata_;
- }
-
- Server* const server_;
+ }
+
+ Server* const server_;
grpc::internal::RpcServiceMethod* const method_;
- const bool has_request_payload_;
+ const bool has_request_payload_;
grpc_byte_buffer* request_payload_ = nullptr;
void* request_ = nullptr;
void* handler_data_ = nullptr;
grpc::Status request_status_;
grpc_call_details* const call_details_ = nullptr;
- grpc_call* call_;
- gpr_timespec deadline_;
- grpc_metadata_array request_metadata_;
+ grpc_call* call_;
+ gpr_timespec deadline_;
+ grpc_metadata_array request_metadata_;
grpc::CompletionQueue* const cq_;
- CallbackCallTag tag_;
+ CallbackCallTag tag_;
ServerContextType ctx_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
-};
-
+};
+
template <>
bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
void** /*tag*/, bool* /*status*/) {
@@ -750,13 +750,13 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
public:
SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- grpc_resource_quota* rq, int min_pollers,
- int max_pollers, int cq_timeout_msec)
- : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
- global_callbacks_(std::move(global_callbacks)) {}
+ global_callbacks_(std::move(global_callbacks)) {}
WorkStatus PollForWork(void** tag, bool* ok) override {
*tag = nullptr;
@@ -778,7 +778,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
- void DoWork(void* tag, bool ok, bool resources) override {
+ void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
@@ -789,9 +789,9 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
if (ok) {
- // Calldata takes ownership of the completion queue and interceptors
- // inside sync_req
- auto* cd = new SyncRequest::CallData(server_, sync_req);
+ // Calldata takes ownership of the completion queue and interceptors
+ // inside sync_req
+ auto* cd = new SyncRequest::CallData(server_, sync_req);
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
@@ -799,7 +799,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd->Run(global_callbacks_, resources);
+ cd->Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -831,17 +831,17 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
- if (ok) {
- // If a request was pulled off the queue, it means that the thread
- // handling the request added it to the completion queue after shutdown
- // was called - because the thread had already started and checked the
- // shutdown flag before shutdown was called. In this case, we simply
- // clean it up here, *after* calling wait on all the worker threads, at
- // which point we are certain no in-flight requests will add more to the
- // queue. This fixes an intermittent memory leak on shutdown.
- SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- sync_req->PostShutdownCleanup();
- }
+ if (ok) {
+ // If a request was pulled off the queue, it means that the thread
+ // handling the request added it to the completion queue after shutdown
+ // was called - because the thread had already started and checked the
+ // shutdown flag before shutdown was called. In this case, we simply
+ // clean it up here, *after* calling wait on all the worker threads, at
+ // which point we are certain no in-flight requests will add more to the
+ // queue. This fixes an intermittent memory leak on shutdown.
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+ sync_req->PostShutdownCleanup();
+ }
}
}
@@ -870,17 +870,17 @@ Server::Server(
grpc::ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec,
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
- grpc_resource_quota* server_rq,
- std::vector<
+ grpc_resource_quota* server_rq,
+ std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
- interceptor_creators)
+ interceptor_creators)
: acceptors_(std::move(acceptors)),
interceptor_creators_(std::move(interceptor_creators)),
max_receive_message_size_(INT_MIN),
- sync_server_cqs_(std::move(sync_server_cqs)),
+ sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
shutdown_(false),
shutdown_notified_(false),
@@ -893,23 +893,23 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
- bool default_rq_created = false;
- if (server_rq == nullptr) {
- server_rq = grpc_resource_quota_create("SyncServer-default-rq");
- grpc_resource_quota_set_max_threads(server_rq,
- DEFAULT_MAX_SYNC_SERVER_THREADS);
- default_rq_created = true;
- }
-
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-default-rq");
+ grpc_resource_quota_set_max_threads(server_rq,
+ DEFAULT_MAX_SYNC_SERVER_THREADS);
+ default_rq_created = true;
+ }
+
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
- this, it.get(), global_callbacks_, server_rq, min_pollers,
- max_pollers, sync_cq_timeout_msec));
- }
-
- if (default_rq_created) {
- grpc_resource_quota_unref(server_rq);
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
}
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
+ }
}
for (auto& acceptor : acceptors_) {
@@ -974,24 +974,24 @@ std::shared_ptr<grpc::Channel> Server::InProcessChannel(
const grpc::ChannelArguments& args) {
grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
- "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
+ "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
}
std::shared_ptr<grpc::Channel>
-Server::experimental_type::InProcessChannelWithInterceptors(
+Server::experimental_type::InProcessChannelWithInterceptors(
const grpc::ChannelArguments& args,
- std::vector<
+ std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
- interceptor_creators) {
- grpc_channel_args channel_args = args.c_channel_args();
+ interceptor_creators) {
+ grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
- "inproc",
- grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
- std::move(interceptor_creators));
-}
-
+ "inproc",
+ grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
+ std::move(interceptor_creators));
+}
+
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
grpc::internal::RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -1014,29 +1014,29 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) {
}
const char* method_name = nullptr;
-
+
for (const auto& method : service->methods_) {
if (method.get() == nullptr) { // Handled by generic service if any.
continue;
}
- void* method_registration_tag = grpc_server_register_method(
+ void* method_registration_tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method.get()), 0);
- if (method_registration_tag == nullptr) {
+ if (method_registration_tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- if (method->handler() == nullptr) { // Async method without handler
- method->set_server_tag(method_registration_tag);
- } else if (method->api_type() ==
+ if (method->handler() == nullptr) { // Async method without handler
+ method->set_server_tag(method_registration_tag);
+ } else if (method->api_type() ==
grpc::internal::RpcServiceMethod::ApiType::SYNC) {
for (const auto& value : sync_req_mgrs_) {
value->AddSyncMethod(method.get(), method_registration_tag);
}
- } else {
+ } else {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
@@ -1130,23 +1130,23 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// explicit one.
grpc::ServerCompletionQueue* health_check_cq = nullptr;
grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
- default_health_check_service_impl = nullptr;
+ default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
grpc::DefaultHealthCheckServiceEnabled()) {
auto* default_hc_service = new grpc::DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- // We create a non-polling CQ to avoid impacting application
- // performance. This ensures that we don't introduce thread hops
- // for application requests that wind up on this CQ, which is polled
- // in its own thread.
+ health_check_service_.reset(default_hc_service);
+ // We create a non-polling CQ to avoid impacting application
+ // performance. This ensures that we don't introduce thread hops
+ // for application requests that wind up on this CQ, which is polled
+ // in its own thread.
health_check_cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
- grpc_server_register_completion_queue(server_, health_check_cq->cq(),
- nullptr);
- default_health_check_service_impl =
- default_hc_service->GetHealthCheckService(
+ grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+ nullptr);
+ default_health_check_service_impl =
+ default_hc_service->GetHealthCheckService(
std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
- RegisterService(nullptr, default_health_check_service_impl);
+ RegisterService(nullptr, default_health_check_service_impl);
}
for (auto& acceptor : acceptors_) {
@@ -1179,26 +1179,26 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
- if (health_check_cq != nullptr) {
- new UnimplementedAsyncRequest(this, health_check_cq);
- }
+ if (health_check_cq != nullptr) {
+ new UnimplementedAsyncRequest(this, health_check_cq);
+ }
}
- // If this server has any support for synchronous methods (has any sync
- // server CQs), make sure that we have a ResourceExhausted handler
- // to deal with the case of thread exhaustion
- if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
+ // If this server has any support for synchronous methods (has any sync
+ // server CQs), make sure that we have a ResourceExhausted handler
+ // to deal with the case of thread exhaustion
+ if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
resource_exhausted_handler_.reset(
new grpc::internal::ResourceExhaustedHandler);
- }
-
+ }
+
for (const auto& value : sync_req_mgrs_) {
value->Start();
}
-
- if (default_health_check_service_impl != nullptr) {
- default_health_check_service_impl->StartServingThread();
- }
+
+ if (default_health_check_service_impl != nullptr) {
+ default_health_check_service_impl->StartServingThread();
+ }
for (auto& acceptor : acceptors_) {
acceptor->Start();
@@ -1207,50 +1207,50 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::internal::MutexLock lock(&mu_);
- if (shutdown_) {
- return;
- }
+ if (shutdown_) {
+ return;
+ }
- shutdown_ = true;
+ shutdown_ = true;
for (auto& acceptor : acceptors_) {
acceptor->Shutdown();
}
- /// The completion queue to use for server shutdown completion notification
+ /// The completion queue to use for server shutdown completion notification
grpc::CompletionQueue shutdown_cq;
grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag
- grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
- shutdown_cq.Shutdown();
+ shutdown_cq.Shutdown();
- void* tag;
- bool ok;
+ void* tag;
+ bool ok;
grpc::CompletionQueue::NextStatus status =
- shutdown_cq.AsyncNext(&tag, &ok, deadline);
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
- // If this timed out, it means we are done with the grace period for a clean
- // shutdown. We should force a shutdown now by cancelling all inflight calls
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
- grpc_server_cancel_all_calls(server_);
- }
- // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
- // successfully shutdown
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
- // Shutdown all ThreadManagers. This will try to gracefully stop all the
- // threads in the ThreadManagers (once they process any inflight requests)
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
for (const auto& value : sync_req_mgrs_) {
value->Shutdown(); // ThreadManager's Shutdown()
- }
+ }
- // Wait for threads in all ThreadManagers to terminate
+ // Wait for threads in all ThreadManagers to terminate
for (const auto& value : sync_req_mgrs_) {
value->Wait();
- }
+ }
// Drop the shutdown ref and wait for all other refs to drop as well.
UnrefAndWaitLocked();
-
+
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
if (callback_cq_ != nullptr) {
@@ -1258,13 +1258,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
callback_cq_ = nullptr;
}
- // Drain the shutdown queue (if the previous call to AsyncNext() timed out
- // and we didn't remove the tag from the queue yet)
- while (shutdown_cq.Next(&tag, &ok)) {
- // Nothing to be done here. Just ignore ok and tag values
- }
-
- shutdown_notified_ = true;
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while (shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here. Just ignore ok and tag values
+ }
+
+ shutdown_notified_ = true;
shutdown_cv_.Broadcast();
#ifndef NDEBUG
@@ -1286,23 +1286,23 @@ void Server::Wait() {
void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
grpc::internal::Call* call) {
- ops->FillOps(call);
+ ops->FillOps(call);
}
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) {
- if (GenericAsyncRequest::FinalizeResult(tag, status)) {
- // We either had no interceptors run or we are done intercepting
- if (*status) {
+ if (GenericAsyncRequest::FinalizeResult(tag, status)) {
+ // We either had no interceptors run or we are done intercepting
+ if (*status) {
// Create a new request/response pair using the server and CQ values
// stored in this object's base class.
new UnimplementedAsyncRequest(server_, notification_cq_);
- new UnimplementedAsyncResponse(this);
- } else {
- delete this;
- }
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
} else {
- // The tag was swallowed due to interception. We will see it again.
+ // The tag was swallowed due to interception. We will see it again.
}
return false;
}
@@ -1320,8 +1320,8 @@ grpc::ServerInitializer* Server::initializer() {
}
grpc::CompletionQueue* Server::CallbackCQ() {
- // TODO(vjpai): Consider using a single global CQ for the default CQ
- // if there is no explicit per-server CQ registered
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-server CQ registered
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ != nullptr) {
return callback_cq_;
@@ -1330,11 +1330,11 @@ grpc::CompletionQueue* Server::CallbackCQ() {
callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
-
+
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
- return callback_cq_;
+ return callback_cq_;
}
-
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc
index 458ac20d87..fea258e6e7 100644
--- a/contrib/libs/grpc/src/cpp/server/server_context.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_context.cc
@@ -43,50 +43,50 @@ class ServerContextBase::CompletionOp final
: public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- // must ref the call before calling constructor and after deleting this
+ // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call* call,
::grpc::internal::ServerCallbackCall* callback_controller)
- : call_(*call),
+ : call_(*call),
callback_controller_(callback_controller),
- has_tag_(false),
+ has_tag_(false),
tag_(nullptr),
- core_cq_tag_(this),
+ core_cq_tag_(this),
refs_(2),
finalized_(false),
- cancelled_(0),
- done_intercepting_(false) {}
-
- // CompletionOp isn't copyable or movable
- CompletionOp(const CompletionOp&) = delete;
- CompletionOp& operator=(const CompletionOp&) = delete;
- CompletionOp(CompletionOp&&) = delete;
- CompletionOp& operator=(CompletionOp&&) = delete;
-
- ~CompletionOp() {
- if (call_.server_rpc_info()) {
- call_.server_rpc_info()->Unref();
- }
- }
-
+ cancelled_(0),
+ done_intercepting_(false) {}
+
+ // CompletionOp isn't copyable or movable
+ CompletionOp(const CompletionOp&) = delete;
+ CompletionOp& operator=(const CompletionOp&) = delete;
+ CompletionOp(CompletionOp&&) = delete;
+ CompletionOp& operator=(CompletionOp&&) = delete;
+
+ ~CompletionOp() {
+ if (call_.server_rpc_info()) {
+ call_.server_rpc_info()->Unref();
+ }
+ }
+
void FillOps(internal::Call* call) override;
-
- // This should always be arena allocated in the call, so override delete.
- // But this class is not trivially destructible, so must actually call delete
- // before allowing the arena to be freed
+
+ // This should always be arena allocated in the call, so override delete.
+ // But this class is not trivially destructible, so must actually call delete
+ // before allowing the arena to be freed
static void operator delete(void* /*ptr*/, std::size_t size) {
// Use size to avoid unused-parameter warning since assert seems to be
// compiled out and treated as unused in some gcc optimized versions.
(void)size;
- assert(size == sizeof(CompletionOp));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
+ assert(size == sizeof(CompletionOp));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq) {
@@ -100,36 +100,36 @@ class ServerContextBase::CompletionOp final
tag_ = tag;
}
- void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
-
- void* core_cq_tag() override { return core_cq_tag_; }
-
+ void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
+
+ void* core_cq_tag() override { return core_cq_tag_; }
+
void Unref();
- // This will be called while interceptors are run if the RPC is a hijacked
- // RPC. This should set hijacking state for each of the ops.
- void SetHijackingState() override {
- /* Servers don't allow hijacking */
+ // This will be called while interceptors are run if the RPC is a hijacked
+ // RPC. This should set hijacking state for each of the ops.
+ void SetHijackingState() override {
+ /* Servers don't allow hijacking */
GPR_ASSERT(false);
- }
-
- /* Should be called after interceptors are done running */
- void ContinueFillOpsAfterInterception() override {}
-
- /* Should be called after interceptors are done running on the finalize result
- * path */
- void ContinueFinalizeResultAfterInterception() override {
- done_intercepting_ = true;
- if (!has_tag_) {
- /* We don't have a tag to return. */
+ }
+
+ /* Should be called after interceptors are done running */
+ void ContinueFillOpsAfterInterception() override {}
+
+ /* Should be called after interceptors are done running on the finalize result
+ * path */
+ void ContinueFinalizeResultAfterInterception() override {
+ done_intercepting_ = true;
+ if (!has_tag_) {
+ /* We don't have a tag to return. */
Unref();
- return;
- }
- /* Start a dummy op so that we can return the tag */
+ return;
+ }
+ /* Start a dummy op so that we can return the tag */
GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
nullptr) == GRPC_CALL_OK);
- }
-
+ }
+
private:
bool CheckCancelledNoPluck() {
grpc_core::MutexLock lock(&mu_);
@@ -140,37 +140,37 @@ class ServerContextBase::CompletionOp final
::grpc::internal::ServerCallbackCall* const callback_controller_;
bool has_tag_;
void* tag_;
- void* core_cq_tag_;
+ void* core_cq_tag_;
grpc_core::RefCount refs_;
grpc_core::Mutex mu_;
bool finalized_;
- int cancelled_; // This is an int (not bool) because it is passed to core
- bool done_intercepting_;
+ int cancelled_; // This is an int (not bool) because it is passed to core
+ bool done_intercepting_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
void ServerContextBase::CompletionOp::Unref() {
if (refs_.Unref()) {
- grpc_call* call = call_.call();
+ grpc_call* call = call_.call();
delete this;
- grpc_call_unref(call);
+ grpc_call_unref(call);
}
}
void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
- grpc_op ops;
- ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops.data.recv_close_on_server.cancelled = &cancelled_;
- ops.flags = 0;
- ops.reserved = nullptr;
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- interceptor_methods_.SetCallOpSetInterface(this);
+ grpc_op ops;
+ ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops.data.recv_close_on_server.cancelled = &cancelled_;
+ ops.flags = 0;
+ ops.reserved = nullptr;
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ interceptor_methods_.SetCallOpSetInterface(this);
// The following call_start_batch is internally-generated so no need for an
// explanatory log on failure.
GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
nullptr) == GRPC_CALL_OK);
- /* No interceptors to run here */
+ /* No interceptors to run here */
}
bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
@@ -187,9 +187,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
}
Unref();
return has_tag;
- }
+ }
finalized_ = true;
-
+
// If for some reason the incoming status is false, mark that as a
// cancellation.
// TODO(vjpai): does this ever happen?
@@ -200,24 +200,24 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
call_cancel = (cancelled_ != 0);
// Release the lock since we may call a callback and interceptors.
}
-
+
if (call_cancel && callback_controller_ != nullptr) {
callback_controller_->MaybeCallOnCancel();
}
- /* Add interception point and run through interceptors */
- interceptor_methods_.AddInterceptionHookPoint(
+ /* Add interception point and run through interceptors */
+ interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE);
- if (interceptor_methods_.RunInterceptors()) {
+ if (interceptor_methods_.RunInterceptors()) {
// No interceptors were run
bool has_tag = has_tag_;
if (has_tag) {
- *tag = tag_;
- }
+ *tag = tag_;
+ }
Unref();
return has_tag;
- }
+ }
// There are interceptors to be run. Return false for now.
- return false;
+ return false;
}
// ServerContextBase body
@@ -233,17 +233,17 @@ ServerContextBase::ServerContextBase(gpr_timespec deadline,
void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
grpc_metadata_array* arr) {
- deadline_ = deadline;
- std::swap(*client_metadata_.arr(), *arr);
-}
-
+ deadline_ = deadline;
+ std::swap(*client_metadata_.arr(), *arr);
+}
+
ServerContextBase::~ServerContextBase() {
if (completion_op_) {
completion_op_->Unref();
}
- if (rpc_info_) {
- rpc_info_->Unref();
- }
+ if (rpc_info_) {
+ rpc_info_->Unref();
+ }
if (default_reactor_used_.load(std::memory_order_relaxed)) {
reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
}
@@ -261,19 +261,19 @@ void ServerContextBase::BeginCompletionOp(
internal::Call* call, std::function<void(bool)> callback,
::grpc::internal::ServerCallbackCall* callback_controller) {
GPR_ASSERT(!completion_op_);
- if (rpc_info_) {
- rpc_info_->Ref();
- }
- grpc_call_ref(call->call());
- completion_op_ =
- new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
+ if (rpc_info_) {
+ rpc_info_->Ref();
+ }
+ grpc_call_ref(call->call());
+ completion_op_ =
+ new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
CompletionOp(call, callback_controller);
if (callback_controller != nullptr) {
completion_tag_.Set(call->call(), std::move(callback), completion_op_,
true);
- completion_op_->set_core_cq_tag(&completion_tag_);
- completion_op_->set_tag(completion_op_);
- } else if (has_notify_when_done_tag_) {
+ completion_op_->set_core_cq_tag(&completion_tag_);
+ completion_op_->set_tag(completion_op_);
+ } else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
call->PerformOps(completion_op_);
@@ -295,11 +295,11 @@ void ServerContextBase::AddTrailingMetadata(const TString& key,
void ServerContextBase::TryCancel() const {
internal::CancelInterceptorBatchMethods cancel_methods;
- if (rpc_info_) {
- for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
- rpc_info_->RunInterceptor(&cancel_methods, i);
- }
- }
+ if (rpc_info_) {
+ for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
+ rpc_info_->RunInterceptor(&cancel_methods, i);
+ }
+ }
grpc_call_error err =
grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
"Cancelled on the server side", nullptr);
@@ -309,15 +309,15 @@ void ServerContextBase::TryCancel() const {
}
bool ServerContextBase::IsCancelled() const {
- if (completion_tag_) {
- // When using callback API, this result is always valid.
- return completion_op_->CheckCancelledAsync();
- } else if (has_notify_when_done_tag_) {
- // When using async API, the result is only valid
+ if (completion_tag_) {
+ // When using callback API, this result is always valid.
+ return completion_op_->CheckCancelledAsync();
+ } else if (has_notify_when_done_tag_) {
+ // When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
- // when using sync API, the result is always valid
+ // when using sync API, the result is always valid
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
}