aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp/server
diff options
context:
space:
mode:
authorheretic <heretic@yandex-team.ru>2022-02-10 16:45:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:43 +0300
commit397cbe258b9e064f49c4ca575279f02f39fef76e (patch)
treea0b0eb3cca6a14e4e8ea715393637672fa651284 /contrib/libs/grpc/src/cpp/server
parent43f5a35593ebc9f6bcea619bb170394ea7ae468e (diff)
downloadydb-397cbe258b9e064f49c4ca575279f02f39fef76e.tar.gz
Restoring authorship annotation for <heretic@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/src/cpp/server')
-rw-r--r--contrib/libs/grpc/src/cpp/server/async_generic_service.cc4
-rw-r--r--contrib/libs/grpc/src/cpp/server/channel_argument_option.cc14
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc10
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc16
-rw-r--r--contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc4
-rw-r--r--contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h4
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc22
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h16
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/health_check_service.cc6
-rw-r--r--contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc6
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc50
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h114
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/util.cc8
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc14
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.h6
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_builder.cc128
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_callback.cc94
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc336
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_context.cc152
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_credentials.cc6
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_posix.cc4
21 files changed, 507 insertions, 507 deletions
diff --git a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc
index 07697a52d1..0e4e95465f 100644
--- a/contrib/libs/grpc/src/cpp/server/async_generic_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/async_generic_service.cc
@@ -24,8 +24,8 @@ namespace grpc {
void AsyncGenericService::RequestCall(
GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer,
- ::grpc::CompletionQueue* call_cq,
- ::grpc::ServerCompletionQueue* notification_cq, void* tag) {
+ ::grpc::CompletionQueue* call_cq,
+ ::grpc::ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq,
tag);
}
diff --git a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc
index 9aad932429..69ee6eed68 100644
--- a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc
+++ b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc
@@ -21,10 +21,10 @@
namespace grpc {
std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
- const TString& name, const TString& value) {
+ const TString& name, const TString& value) {
class StringOption final : public ServerBuilderOption {
public:
- StringOption(const TString& name, const TString& value)
+ StringOption(const TString& name, const TString& value)
: name_(name), value_(value) {}
virtual void UpdateArguments(ChannelArguments* args) override {
@@ -35,17 +35,17 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
override {}
private:
- const TString name_;
- const TString value_;
+ const TString name_;
+ const TString value_;
};
return std::unique_ptr<ServerBuilderOption>(new StringOption(name, value));
}
std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
- const TString& name, int value) {
+ const TString& name, int value) {
class IntOption final : public ServerBuilderOption {
public:
- IntOption(const TString& name, int value)
+ IntOption(const TString& name, int value)
: name_(name), value_(value) {}
virtual void UpdateArguments(ChannelArguments* args) override {
@@ -56,7 +56,7 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
override {}
private:
- const TString name_;
+ const TString name_;
const int value_;
};
return std::unique_ptr<ServerBuilderOption>(new IntOption(name, value));
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
index 6dcf84bf40..1f402ef53d 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service.cc
@@ -24,9 +24,9 @@
#include <grpc/support/alloc.h>
namespace grpc {
-
-namespace {
-
+
+namespace {
+
grpc::protobuf::util::Status ParseJson(const char* json_str,
grpc::protobuf::Message* message) {
grpc::protobuf::json::JsonParseOptions options;
@@ -34,8 +34,8 @@ grpc::protobuf::util::Status ParseJson(const char* json_str,
return grpc::protobuf::json::JsonStringToMessage(json_str, message, options);
}
-} // namespace
-
+} // namespace
+
Status ChannelzService::GetTopChannels(
ServerContext* /*unused*/,
const channelz::v1::GetTopChannelsRequest* request,
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
index ae26a447ab..bae1b59544 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
@@ -33,7 +33,7 @@ class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
public:
ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
- TString name() override { return "channelz_service"; }
+ TString name() override { return "channelz_service"; }
void InitServer(grpc::ServerInitializer* si) override {
si->RegisterService(channelz_service_);
@@ -41,7 +41,7 @@ class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
void Finish(grpc::ServerInitializer* /*si*/) override {}
- void ChangeArguments(const TString& /*name*/, void* /*value*/) override {}
+ void ChangeArguments(const TString& /*name*/, void* /*value*/) override {}
bool has_sync_methods() const override {
if (channelz_service_) {
@@ -75,12 +75,12 @@ namespace channelz {
namespace experimental {
void InitChannelzService() {
- static struct Initializer {
- Initializer() {
- ::grpc::ServerBuilder::InternalAddPluginFactory(
- &grpc::channelz::experimental::CreateChannelzServicePlugin);
- }
- } initialize;
+ static struct Initializer {
+ Initializer() {
+ ::grpc::ServerBuilder::InternalAddPluginFactory(
+ &grpc::channelz::experimental::CreateChannelzServicePlugin);
+ }
+ } initialize;
}
} // namespace experimental
diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc
index 09d2a9d3b5..9641301ab7 100644
--- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc
+++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.cc
@@ -20,7 +20,7 @@
#include <memory>
-#include <grpcpp/server_builder.h>
+#include <grpcpp/server_builder.h>
#include <grpcpp/support/channel_arguments.h>
namespace grpc {
@@ -42,7 +42,7 @@ class AcceptorWrapper : public experimental::ExternalConnectionAcceptor {
} // namespace
ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl(
- const TString& name,
+ const TString& name,
ServerBuilder::experimental_type::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds)
: name_(name), creds_(std::move(creds)) {
diff --git a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h
index 430c72862e..8a1179bef2 100644
--- a/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h
+++ b/contrib/libs/grpc/src/cpp/server/external_connection_acceptor_impl.h
@@ -36,7 +36,7 @@ class ExternalConnectionAcceptorImpl
: public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> {
public:
ExternalConnectionAcceptorImpl(
- const TString& name,
+ const TString& name,
ServerBuilder::experimental_type::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds);
// Should only be called once.
@@ -56,7 +56,7 @@ class ExternalConnectionAcceptorImpl
void SetToChannelArgs(::grpc::ChannelArguments* args);
private:
- const TString name_;
+ const TString name_;
std::shared_ptr<ServerCredentials> creds_;
grpc_core::TcpServerFdHandler* handler_ = nullptr; // not owned
grpc_core::Mutex mu_;
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
index 3cc508d0cb..059c97a414 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
@@ -18,16 +18,16 @@
#include <memory>
-#include "upb/upb.hpp"
-
+#include "upb/upb.hpp"
+
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpcpp/impl/codegen/method_handler.h>
+#include <grpcpp/impl/codegen/method_handler.h>
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/proto/grpc/health/v1/health.upb.h"
-#include "upb/upb.hpp"
+#include "upb/upb.hpp"
#define MAX_SERVICE_NAME_LENGTH 200
@@ -42,7 +42,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
}
void DefaultHealthCheckService::SetServingStatus(
- const TString& service_name, bool serving) {
+ const TString& service_name, bool serving) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map.
@@ -77,7 +77,7 @@ void DefaultHealthCheckService::Shutdown() {
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
- const TString& service_name) const {
+ const TString& service_name) const {
grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) {
@@ -88,7 +88,7 @@ DefaultHealthCheckService::GetServingStatus(
}
void DefaultHealthCheckService::RegisterCallHandler(
- const TString& service_name,
+ const TString& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
grpc_core::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name];
@@ -98,7 +98,7 @@ void DefaultHealthCheckService::RegisterCallHandler(
}
void DefaultHealthCheckService::UnregisterCallHandler(
- const TString& service_name,
+ const TString& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
@@ -185,7 +185,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
}
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
- HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
+ HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
void* tag;
bool ok;
while (true) {
@@ -200,7 +200,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
- const ByteBuffer& request, TString* service_name) {
+ const ByteBuffer& request, TString* service_name) {
std::vector<Slice> slices;
if (!request.Dump(&slices).ok()) return false;
uint8_t* request_bytes = nullptr;
@@ -301,7 +301,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
// Process request.
gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
this);
- TString service_name;
+ TString service_name;
grpc::Status status = Status::OK;
ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) {
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
index 9da1dfc15f..6d7479b016 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
@@ -194,7 +194,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
HealthCheckServiceImpl* service_;
ByteBuffer request_;
- TString service_name_;
+ TString service_name_;
GenericServerAsyncWriter stream_;
ServerContext ctx_;
@@ -213,7 +213,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// Returns true on success.
static bool DecodeRequest(const ByteBuffer& request,
- TString* service_name);
+ TString* service_name);
static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
// Needed to appease Windows compilers, which don't seem to allow
@@ -234,12 +234,12 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
DefaultHealthCheckService();
- void SetServingStatus(const TString& service_name, bool serving) override;
+ void SetServingStatus(const TString& service_name, bool serving) override;
void SetServingStatus(bool serving) override;
void Shutdown() override;
- ServingStatus GetServingStatus(const TString& service_name) const;
+ ServingStatus GetServingStatus(const TString& service_name) const;
HealthCheckServiceImpl* GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq);
@@ -266,16 +266,16 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
};
void RegisterCallHandler(
- const TString& service_name,
+ const TString& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void UnregisterCallHandler(
- const TString& service_name,
+ const TString& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable grpc_core::Mutex mu_;
- bool shutdown_ = false; // Guarded by mu_.
- std::map<TString, ServiceData> services_map_; // Guarded by mu_.
+ bool shutdown_ = false; // Guarded by mu_.
+ std::map<TString, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_;
};
diff --git a/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc
index a0fa2d62f5..35c194cb1b 100644
--- a/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/health/health_check_service.cc
@@ -16,9 +16,9 @@
*
*/
-#include <grpcpp/health_check_service_interface.h>
+#include <grpcpp/health_check_service_interface.h>
-namespace grpc {
+namespace grpc {
namespace {
bool g_grpc_default_health_check_service_enabled = false;
} // namespace
@@ -31,4 +31,4 @@ void EnableDefaultHealthCheckService(bool enable) {
g_grpc_default_health_check_service_enabled = enable;
}
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
index 3f33f4e045..f78a4daf07 100644
--- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
@@ -21,11 +21,11 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-namespace grpc {
+namespace grpc {
namespace {
class InsecureServerCredentialsImpl final : public ServerCredentials {
public:
- int AddPortToServer(const TString& addr, grpc_server* server) override {
+ int AddPortToServer(const TString& addr, grpc_server* server) override {
return grpc_server_add_insecure_http2_port(server, addr.c_str());
}
void SetAuthMetadataProcessor(
@@ -41,4 +41,4 @@ std::shared_ptr<ServerCredentials> InsecureServerCredentials() {
new InsecureServerCredentialsImpl());
}
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
index f07fa812a7..46a368da91 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
@@ -77,8 +77,8 @@ const typename C::value_type* RandomElement(const C& container) {
} // namespace
-LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token,
- TString user_id)
+LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token,
+ TString user_id)
: user_id_(std::move(user_id)) {
GPR_ASSERT(client_ip_and_token.size() >= 2);
int ip_hex_size;
@@ -98,7 +98,7 @@ LoadRecordKey::LoadRecordKey(const TString& client_ip_and_token,
}
}
-TString LoadRecordKey::GetClientIpBytes() const {
+TString LoadRecordKey::GetClientIpBytes() const {
if (client_ip_hex_.empty()) {
return "";
} else if (client_ip_hex_.size() == kIpv4AddressLength) {
@@ -110,8 +110,8 @@ TString LoadRecordKey::GetClientIpBytes() const {
return "";
}
ip_bytes = grpc_htonl(ip_bytes);
- return TString(reinterpret_cast<const char*>(&ip_bytes),
- sizeof(ip_bytes));
+ return TString(reinterpret_cast<const char*>(&ip_bytes),
+ sizeof(ip_bytes));
} else if (client_ip_hex_.size() == kIpv6AddressLength) {
uint32_t ip_bytes[4];
for (size_t i = 0; i < 4; ++i) {
@@ -125,14 +125,14 @@ TString LoadRecordKey::GetClientIpBytes() const {
}
ip_bytes[i] = grpc_htonl(ip_bytes[i]);
}
- return TString(reinterpret_cast<const char*>(ip_bytes),
- sizeof(ip_bytes));
+ return TString(reinterpret_cast<const char*>(ip_bytes),
+ sizeof(ip_bytes));
} else {
GPR_UNREACHABLE_CODE(return "");
}
}
-LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls,
+LoadRecordValue::LoadRecordValue(TString metric_name, uint64_t num_calls,
double total_metric_value) {
call_metrics_.emplace(std::move(metric_name),
CallMetricValue(num_calls, total_metric_value));
@@ -177,8 +177,8 @@ uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
return num_calls_in_progress_;
}
-void PerHostStore::ReportStreamCreated(const TString& lb_id,
- const TString& load_key) {
+void PerHostStore::ReportStreamCreated(const TString& lb_id,
+ const TString& load_key) {
GPR_ASSERT(lb_id != kInvalidLbId);
SetUpForNewLbId(lb_id, load_key);
// Prior to this one, there was no load balancer receiving report, so we may
@@ -188,7 +188,7 @@ void PerHostStore::ReportStreamCreated(const TString& lb_id,
// this stream. Need to discuss with LB team.
if (assigned_stores_.size() == 1) {
for (const auto& p : per_balancer_stores_) {
- const TString& other_lb_id = p.first;
+ const TString& other_lb_id = p.first;
const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
if (other_lb_id != lb_id) {
orphaned_store->Resume();
@@ -203,7 +203,7 @@ void PerHostStore::ReportStreamCreated(const TString& lb_id,
}
}
-void PerHostStore::ReportStreamClosed(const TString& lb_id) {
+void PerHostStore::ReportStreamClosed(const TString& lb_id) {
auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
// Remove this closed stream from our records.
@@ -215,7 +215,7 @@ void PerHostStore::ReportStreamClosed(const TString& lb_id) {
// The stores that were assigned to this balancer are orphaned now. They
// should be re-assigned to other balancers which are still receiving reports.
for (PerBalancerStore* orphaned_store : orphaned_stores) {
- const TString* new_receiver = nullptr;
+ const TString* new_receiver = nullptr;
auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
if (it != load_key_to_receiving_lb_ids_.end()) {
// First, try to pick from the active balancers with the same load key.
@@ -235,21 +235,21 @@ void PerHostStore::ReportStreamClosed(const TString& lb_id) {
}
PerBalancerStore* PerHostStore::FindPerBalancerStore(
- const TString& lb_id) const {
+ const TString& lb_id) const {
return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
? per_balancer_stores_.find(lb_id)->second.get()
: nullptr;
}
const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
- const TString& lb_id) const {
+ const TString& lb_id) const {
auto it = assigned_stores_.find(lb_id);
if (it == assigned_stores_.end()) return nullptr;
return &(it->second);
}
void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
- const TString& new_receiver) {
+ const TString& new_receiver) {
auto it = assigned_stores_.find(new_receiver);
GPR_ASSERT(it != assigned_stores_.end());
it->second.insert(orphaned_store);
@@ -260,8 +260,8 @@ void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
new_receiver.c_str());
}
-void PerHostStore::SetUpForNewLbId(const TString& lb_id,
- const TString& load_key) {
+void PerHostStore::SetUpForNewLbId(const TString& lb_id,
+ const TString& load_key) {
// The top-level caller (i.e., LoadReportService) should guarantee the
// lb_id is unique for each reporting stream.
GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
@@ -284,7 +284,7 @@ PerBalancerStore* LoadDataStore::FindPerBalancerStore(
}
}
-void LoadDataStore::MergeRow(const TString& hostname,
+void LoadDataStore::MergeRow(const TString& hostname,
const LoadRecordKey& key,
const LoadRecordValue& value) {
PerBalancerStore* per_balancer_store =
@@ -315,20 +315,20 @@ void LoadDataStore::MergeRow(const TString& hostname,
}
const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
- const TString& hostname, const TString& lb_id) {
+ const TString& hostname, const TString& lb_id) {
auto it = per_host_stores_.find(hostname);
if (it == per_host_stores_.end()) return nullptr;
return it->second.GetAssignedStores(lb_id);
}
-void LoadDataStore::ReportStreamCreated(const TString& hostname,
- const TString& lb_id,
- const TString& load_key) {
+void LoadDataStore::ReportStreamCreated(const TString& hostname,
+ const TString& lb_id,
+ const TString& load_key) {
per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
}
-void LoadDataStore::ReportStreamClosed(const TString& hostname,
- const TString& lb_id) {
+void LoadDataStore::ReportStreamClosed(const TString& hostname,
+ const TString& lb_id) {
auto it_per_host_store = per_host_stores_.find(hostname);
GPR_ASSERT(it_per_host_store != per_host_stores_.end());
it_per_host_store->second.ReportStreamClosed(lb_id);
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
index 61ba618331..f6a31b87bc 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.h
@@ -30,8 +30,8 @@
#include "src/cpp/server/load_reporter/constants.h"
-#include <util/string/cast.h>
-
+#include <util/string/cast.h>
+
namespace grpc {
namespace load_reporter {
@@ -69,17 +69,17 @@ class CallMetricValue {
// The key of a load record.
class LoadRecordKey {
public:
- LoadRecordKey(TString lb_id, TString lb_tag, TString user_id,
- TString client_ip_hex)
+ LoadRecordKey(TString lb_id, TString lb_tag, TString user_id,
+ TString client_ip_hex)
: lb_id_(std::move(lb_id)),
lb_tag_(std::move(lb_tag)),
user_id_(std::move(user_id)),
client_ip_hex_(std::move(client_ip_hex)) {}
// Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
- LoadRecordKey(const TString& client_ip_and_token, TString user_id);
+ LoadRecordKey(const TString& client_ip_and_token, TString user_id);
- TString ToString() const {
+ TString ToString() const {
return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
"]";
@@ -91,17 +91,17 @@ class LoadRecordKey {
}
// Gets the client IP bytes in network order (i.e., big-endian).
- TString GetClientIpBytes() const;
+ TString GetClientIpBytes() const;
// Getters.
- const TString& lb_id() const { return lb_id_; }
- const TString& lb_tag() const { return lb_tag_; }
- const TString& user_id() const { return user_id_; }
- const TString& client_ip_hex() const { return client_ip_hex_; }
+ const TString& lb_id() const { return lb_id_; }
+ const TString& lb_tag() const { return lb_tag_; }
+ const TString& user_id() const { return user_id_; }
+ const TString& client_ip_hex() const { return client_ip_hex_; }
struct Hasher {
- void hash_combine(size_t* seed, const TString& k) const {
- *seed ^= std::hash<TString>()(k) + 0x9e3779b9 + (*seed << 6) +
+ void hash_combine(size_t* seed, const TString& k) const {
+ *seed ^= std::hash<TString>()(k) + 0x9e3779b9 + (*seed << 6) +
(*seed >> 2);
}
@@ -116,10 +116,10 @@ class LoadRecordKey {
};
private:
- TString lb_id_;
- TString lb_tag_;
- TString user_id_;
- TString client_ip_hex_;
+ TString lb_id_;
+ TString lb_tag_;
+ TString user_id_;
+ TString client_ip_hex_;
};
// The value of a load record.
@@ -135,7 +135,7 @@ class LoadRecordValue {
bytes_recv_(bytes_recv),
latency_ms_(latency_ms) {}
- LoadRecordValue(TString metric_name, uint64_t num_calls,
+ LoadRecordValue(TString metric_name, uint64_t num_calls,
double total_metric_value);
void MergeFrom(const LoadRecordValue& other) {
@@ -146,7 +146,7 @@ class LoadRecordValue {
bytes_recv_ += other.bytes_recv_;
latency_ms_ += other.latency_ms_;
for (const auto& p : other.call_metrics_) {
- const TString& key = p.first;
+ const TString& key = p.first;
const CallMetricValue& value = p.second;
call_metrics_[key].MergeFrom(value);
}
@@ -156,17 +156,17 @@ class LoadRecordValue {
return static_cast<int64_t>(start_count_ - ok_count_ - error_count_);
}
- TString ToString() const {
- return "[start_count_=" + ::ToString(start_count_) +
- ", ok_count_=" + ::ToString(ok_count_) +
- ", error_count_=" + ::ToString(error_count_) +
- ", bytes_sent_=" + ::ToString(bytes_sent_) +
- ", bytes_recv_=" + ::ToString(bytes_recv_) +
- ", latency_ms_=" + ::ToString(latency_ms_) + ", " +
- ::ToString(call_metrics_.size()) + " other call metric(s)]";
+ TString ToString() const {
+ return "[start_count_=" + ::ToString(start_count_) +
+ ", ok_count_=" + ::ToString(ok_count_) +
+ ", error_count_=" + ::ToString(error_count_) +
+ ", bytes_sent_=" + ::ToString(bytes_sent_) +
+ ", bytes_recv_=" + ::ToString(bytes_recv_) +
+ ", latency_ms_=" + ::ToString(latency_ms_) + ", " +
+ ::ToString(call_metrics_.size()) + " other call metric(s)]";
}
- bool InsertCallMetric(const TString& metric_name,
+ bool InsertCallMetric(const TString& metric_name,
const CallMetricValue& metric_value) {
return call_metrics_.insert({metric_name, metric_value}).second;
}
@@ -178,7 +178,7 @@ class LoadRecordValue {
uint64_t bytes_sent() const { return bytes_sent_; }
uint64_t bytes_recv() const { return bytes_recv_; }
uint64_t latency_ms() const { return latency_ms_; }
- const std::unordered_map<TString, CallMetricValue>& call_metrics() const {
+ const std::unordered_map<TString, CallMetricValue>& call_metrics() const {
return call_metrics_;
}
@@ -189,7 +189,7 @@ class LoadRecordValue {
uint64_t bytes_sent_ = 0;
uint64_t bytes_recv_ = 0;
uint64_t latency_ms_ = 0;
- std::unordered_map<TString, CallMetricValue> call_metrics_;
+ std::unordered_map<TString, CallMetricValue> call_metrics_;
};
// Stores the data associated with a particular LB ID.
@@ -198,7 +198,7 @@ class PerBalancerStore {
using LoadRecordMap =
std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>;
- PerBalancerStore(TString lb_id, TString load_key)
+ PerBalancerStore(TString lb_id, TString load_key)
: lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {}
// Merge a load record with the given key and value if the store is not
@@ -218,7 +218,7 @@ class PerBalancerStore {
uint64_t GetNumCallsInProgressForReport();
- TString ToString() {
+ TString ToString() {
return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ +
"]";
}
@@ -226,14 +226,14 @@ class PerBalancerStore {
void ClearLoadRecordMap() { load_record_map_.clear(); }
// Getters.
- const TString& lb_id() const { return lb_id_; }
- const TString& load_key() const { return load_key_; }
+ const TString& lb_id() const { return lb_id_; }
+ const TString& load_key() const { return load_key_; }
const LoadRecordMap& load_record_map() const { return load_record_map_; }
private:
- TString lb_id_;
+ TString lb_id_;
// TODO(juanlishen): Use bytestring protobuf type?
- TString load_key_;
+ TString load_key_;
LoadRecordMap load_record_map_;
uint64_t num_calls_in_progress_ = 0;
uint64_t last_reported_num_calls_in_progress_ = 0;
@@ -247,39 +247,39 @@ class PerHostStore {
// LB ID (guaranteed unique) associated with that stream. If it is the only
// active store, adopt all the orphaned stores. If it is the first created
// store, adopt the store of kInvalidLbId.
- void ReportStreamCreated(const TString& lb_id,
- const TString& load_key);
+ void ReportStreamCreated(const TString& lb_id,
+ const TString& load_key);
// When a report stream is closed, the PerBalancerStores assigned to the
// associate LB ID need to be re-assigned to other active balancers,
// ideally with the same load key. If there is no active balancer, we have
// to suspend those stores and drop the incoming load data until they are
// resumed.
- void ReportStreamClosed(const TString& lb_id);
+ void ReportStreamClosed(const TString& lb_id);
// Returns null if not found. Caller doesn't own the returned store.
- PerBalancerStore* FindPerBalancerStore(const TString& lb_id) const;
+ PerBalancerStore* FindPerBalancerStore(const TString& lb_id) const;
// Returns null if lb_id is not found. The returned pointer points to the
// underlying data structure, which is not owned by the caller.
const std::set<PerBalancerStore*>* GetAssignedStores(
- const TString& lb_id) const;
+ const TString& lb_id) const;
private:
// Creates a PerBalancerStore for the given LB ID, assigns the store to
// itself, and records the LB ID to the load key.
- void SetUpForNewLbId(const TString& lb_id, const TString& load_key);
+ void SetUpForNewLbId(const TString& lb_id, const TString& load_key);
void AssignOrphanedStore(PerBalancerStore* orphaned_store,
- const TString& new_receiver);
+ const TString& new_receiver);
- std::unordered_map<TString, std::set<TString>>
+ std::unordered_map<TString, std::set<TString>>
load_key_to_receiving_lb_ids_;
// Key: LB ID. The key set includes all the LB IDs that have been
// allocated for reporting streams so far.
// Value: the unique pointer to the PerBalancerStore of the LB ID.
- std::unordered_map<TString, std::unique_ptr<PerBalancerStore>>
+ std::unordered_map<TString, std::unique_ptr<PerBalancerStore>>
per_balancer_stores_;
// Key: LB ID. The key set includes the LB IDs of the balancers that are
@@ -287,7 +287,7 @@ class PerHostStore {
// Value: the set of raw pointers to the PerBalancerStores assigned to the LB
// ID. Note that the sets in assigned_stores_ form a division of the value set
// of per_balancer_stores_.
- std::unordered_map<TString, std::set<PerBalancerStore*>> assigned_stores_;
+ std::unordered_map<TString, std::set<PerBalancerStore*>> assigned_stores_;
};
// Thread-unsafe two-level bookkeeper of all the load data.
@@ -302,8 +302,8 @@ class PerHostStore {
class LoadDataStore {
public:
// Returns null if not found. Caller doesn't own the returned store.
- PerBalancerStore* FindPerBalancerStore(const TString& hostname,
- const TString& lb_id) const;
+ PerBalancerStore* FindPerBalancerStore(const TString& hostname,
+ const TString& lb_id) const;
// Returns null if hostname or lb_id is not found. The returned pointer points
// to the underlying data structure, which is not owned by the caller.
@@ -313,33 +313,33 @@ class LoadDataStore {
// If a PerBalancerStore can be found by the hostname and LB ID in
// LoadRecordKey, the load data will be merged to that store. Otherwise,
// only track the number of the in-progress calls for this unknown LB ID.
- void MergeRow(const TString& hostname, const LoadRecordKey& key,
+ void MergeRow(const TString& hostname, const LoadRecordKey& key,
const LoadRecordValue& value);
// Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
// with some received load data but unknown to this load data store)?
- bool IsTrackedUnknownBalancerId(const TString& lb_id) const {
+ bool IsTrackedUnknownBalancerId(const TString& lb_id) const {
return unknown_balancer_id_trackers_.find(lb_id) !=
unknown_balancer_id_trackers_.end();
}
// Wrapper around PerHostStore::ReportStreamCreated.
- void ReportStreamCreated(const TString& hostname,
- const TString& lb_id,
- const TString& load_key);
+ void ReportStreamCreated(const TString& hostname,
+ const TString& lb_id,
+ const TString& load_key);
// Wrapper around PerHostStore::ReportStreamClosed.
- void ReportStreamClosed(const TString& hostname,
- const TString& lb_id);
+ void ReportStreamClosed(const TString& hostname,
+ const TString& lb_id);
private:
// Buffered data that was fetched from Census but hasn't been sent to
// balancer. We need to keep this data ourselves because Census will
// delete the data once it's returned.
- std::unordered_map<TString, PerHostStore> per_host_stores_;
+ std::unordered_map<TString, PerHostStore> per_host_stores_;
// Tracks the number of in-progress calls for each unknown LB ID.
- std::unordered_map<TString, uint64_t> unknown_balancer_id_trackers_;
+ std::unordered_map<TString, uint64_t> unknown_balancer_id_trackers_;
};
} // namespace load_reporter
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
index 24ad9f3f24..b1acde1010 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/util.cc
@@ -24,14 +24,14 @@
#include <grpc/support/log.h>
-namespace grpc {
+namespace grpc {
namespace load_reporter {
namespace experimental {
void AddLoadReportingCost(grpc::ServerContext* ctx,
- const TString& cost_name, double cost_value) {
+ const TString& cost_name, double cost_value) {
if (std::isnormal(cost_value)) {
- TString buf;
+ TString buf;
buf.resize(sizeof(cost_value) + cost_name.size());
memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value));
memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(),
@@ -44,4 +44,4 @@ void AddLoadReportingCost(grpc::ServerContext* ctx,
} // namespace experimental
} // namespace load_reporter
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
index 732602bcb7..b084b01836 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
@@ -21,7 +21,7 @@
#include <memory>
#include <grpcpp/impl/codegen/slice.h>
-#include <grpcpp/impl/grpc_library.h>
+#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/security/auth_metadata_processor.h>
#include "src/cpp/common/secure_auth_context.h"
@@ -92,7 +92,7 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
status.error_message().c_str());
}
-int SecureServerCredentials::AddPortToServer(const TString& addr,
+int SecureServerCredentials::AddPortToServer(const TString& addr,
grpc_server* server) {
return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_);
}
@@ -145,11 +145,11 @@ std::shared_ptr<ServerCredentials> LocalServerCredentials(
}
std::shared_ptr<ServerCredentials> TlsServerCredentials(
- const grpc::experimental::TlsCredentialsOptions& options) {
- grpc::GrpcLibraryCodegen init;
- return std::shared_ptr<ServerCredentials>(new SecureServerCredentials(
- grpc_tls_server_credentials_create(options.c_credentials_options())));
+ const grpc::experimental::TlsCredentialsOptions& options) {
+ grpc::GrpcLibraryCodegen init;
+ return std::shared_ptr<ServerCredentials>(new SecureServerCredentials(
+ grpc_tls_server_credentials_create(options.c_credentials_options())));
}
} // namespace experimental
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
index 9e3fb3f9eb..9a09c2314f 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
@@ -28,7 +28,7 @@
#include "src/cpp/server/thread_pool_interface.h"
-namespace grpc {
+namespace grpc {
class SecureServerCredentials;
@@ -64,7 +64,7 @@ class SecureServerCredentials final : public ServerCredentials {
grpc_server_credentials_release(creds_);
}
- int AddPortToServer(const TString& addr, grpc_server* server) override;
+ int AddPortToServer(const TString& addr, grpc_server* server) override;
void SetAuthMetadataProcessor(
const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override;
@@ -74,6 +74,6 @@ class SecureServerCredentials final : public ServerCredentials {
std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_;
};
-} // namespace grpc
+} // namespace grpc
#endif // GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H
diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc
index 0cc00b365f..e3a9f4e4a6 100644
--- a/contrib/libs/grpc/src/cpp/server/server_builder.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc
@@ -26,21 +26,21 @@
#include <utility>
-#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include "src/cpp/server/thread_pool_interface.h"
-namespace grpc {
+namespace grpc {
-static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
+static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
g_plugin_factory_list;
static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
static void do_plugin_list_init(void) {
g_plugin_factory_list =
- new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
+ new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
}
ServerBuilder::ServerBuilder()
@@ -68,29 +68,29 @@ ServerBuilder::~ServerBuilder() {
}
}
-std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
+std::unique_ptr<grpc::ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
- grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue(
+ grpc::ServerCompletionQueue* cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT,
is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
nullptr);
cqs_.push_back(cq);
- return std::unique_ptr<grpc::ServerCompletionQueue>(cq);
+ return std::unique_ptr<grpc::ServerCompletionQueue>(cq);
}
-ServerBuilder& ServerBuilder::RegisterService(Service* service) {
+ServerBuilder& ServerBuilder::RegisterService(Service* service) {
services_.emplace_back(new NamedService(service));
return *this;
}
-ServerBuilder& ServerBuilder::RegisterService(const TString& addr,
- Service* service) {
+ServerBuilder& ServerBuilder::RegisterService(const TString& addr,
+ Service* service) {
services_.emplace_back(new NamedService(addr, service));
return *this;
}
ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
- AsyncGenericService* service) {
+ AsyncGenericService* service) {
if (generic_service_ || callback_generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple generic services is unsupported for now. "
@@ -102,22 +102,22 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
return *this;
}
-#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
-ServerBuilder& ServerBuilder::RegisterCallbackGenericService(
- CallbackGenericService* service) {
- if (generic_service_ || callback_generic_service_) {
- gpr_log(GPR_ERROR,
- "Adding multiple generic services is unsupported for now. "
- "Dropping the service %p",
- (void*)service);
- } else {
- callback_generic_service_ = service;
- }
- return *this;
-}
-#else
+#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+ServerBuilder& ServerBuilder::RegisterCallbackGenericService(
+ CallbackGenericService* service) {
+ if (generic_service_ || callback_generic_service_) {
+ gpr_log(GPR_ERROR,
+ "Adding multiple generic services is unsupported for now. "
+ "Dropping the service %p",
+ (void*)service);
+ } else {
+ callback_generic_service_ = service;
+ }
+ return *this;
+}
+#else
ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
- experimental::CallbackGenericService* service) {
+ experimental::CallbackGenericService* service) {
if (builder_->generic_service_ || builder_->callback_generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple generic services is unsupported for now. "
@@ -128,13 +128,13 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
}
return *builder_;
}
-#endif
+#endif
std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
experimental_type::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds) {
- TString name_prefix("external:");
+ TString name_prefix("external:");
char count_str[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str);
builder_->acceptors_.emplace_back(
@@ -144,7 +144,7 @@ ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
}
ServerBuilder& ServerBuilder::SetOption(
- std::unique_ptr<ServerBuilderOption> option) {
+ std::unique_ptr<ServerBuilderOption> option) {
options_.push_back(std::move(option));
return *this;
}
@@ -193,7 +193,7 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
}
ServerBuilder& ServerBuilder::SetResourceQuota(
- const grpc::ResourceQuota& resource_quota) {
+ const grpc::ResourceQuota& resource_quota) {
if (resource_quota_ != nullptr) {
grpc_resource_quota_unref(resource_quota_);
}
@@ -203,10 +203,10 @@ ServerBuilder& ServerBuilder::SetResourceQuota(
}
ServerBuilder& ServerBuilder::AddListeningPort(
- const TString& addr_uri, std::shared_ptr<ServerCredentials> creds,
- int* selected_port) {
- const TString uri_scheme = "dns:";
- TString addr = addr_uri;
+ const TString& addr_uri, std::shared_ptr<ServerCredentials> creds,
+ int* selected_port) {
+ const TString uri_scheme = "dns:";
+ TString addr = addr_uri;
if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
size_t pos = uri_scheme.size();
while (addr_uri[pos] == '/') ++pos; // Skip slashes.
@@ -222,13 +222,13 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
if (max_receive_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
- if (max_send_message_size_ >= -1) {
+ if (max_send_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
}
- for (const auto& option : options_) {
- option->UpdateArguments(&args);
- option->UpdatePlugins(&plugins_);
- }
+ for (const auto& option : options_) {
+ option->UpdateArguments(&args);
+ option->UpdatePlugins(&plugins_);
+ }
args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
enabled_compression_algorithms_bitset_);
if (maybe_default_compression_level_.is_set) {
@@ -245,11 +245,11 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
grpc_resource_quota_arg_vtable());
}
- for (const auto& plugin : plugins_) {
- plugin->UpdateServerBuilder(this);
- plugin->UpdateChannelArguments(&args);
- }
-
+ for (const auto& plugin : plugins_) {
+ plugin->UpdateServerBuilder(this);
+ plugin->UpdateChannelArguments(&args);
+ }
+
// == Determine if the server has any syncrhonous methods ==
bool has_sync_methods = false;
for (const auto& value : services_) {
@@ -275,10 +275,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// This is different from the completion queues added to the server via
// ServerBuilder's AddCompletionQueue() method (those completion queues
// are in 'cqs_' member variable of ServerBuilder object)
- std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
- sync_server_cqs(
- std::make_shared<
- std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>());
+ std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
+ sync_server_cqs(
+ std::make_shared<
+ std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>());
bool has_frequently_polled_cqs = false;
for (const auto& cq : cqs_) {
@@ -307,7 +307,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
sync_server_cqs->emplace_back(
- new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
+ new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
}
}
@@ -329,20 +329,20 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
}
std::unique_ptr<grpc::Server> server(new grpc::Server(
- &args, sync_server_cqs, sync_server_settings_.min_pollers,
- sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
- std::move(acceptors_), resource_quota_,
- std::move(interceptor_creators_)));
+ &args, sync_server_cqs, sync_server_settings_.min_pollers,
+ sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
+ std::move(acceptors_), resource_quota_,
+ std::move(interceptor_creators_)));
- ServerInitializer* initializer = server->initializer();
+ ServerInitializer* initializer = server->initializer();
// Register all the completion queues with the server. i.e
// 1. sync_server_cqs: internal completion queues created IF this is a sync
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
- for (const auto& cq : *sync_server_cqs) {
- grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
+ for (const auto& cq : *sync_server_cqs) {
+ grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
has_frequently_polled_cqs = true;
}
@@ -355,12 +355,12 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
// calling Next() or AsyncNext()) and hence are not safe to be used for
// listening to incoming channels. Such completion queues must be registered
- // as non-listening queues. In debug mode, these should have their server list
- // tracked since these are provided the user and must be Shutdown by the user
- // after the server is shutdown.
- for (const auto& cq : cqs_) {
- grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
- cq->RegisterServer(server.get());
+ // as non-listening queues. In debug mode, these should have their server list
+ // tracked since these are provided the user and must be Shutdown by the user
+ // after the server is shutdown.
+ for (const auto& cq : cqs_) {
+ grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
+ cq->RegisterServer(server.get());
}
if (!has_frequently_polled_cqs) {
@@ -416,7 +416,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
}
void ServerBuilder::InternalAddPluginFactory(
- std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
+ std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
(*g_plugin_factory_list).push_back(CreatePlugin);
}
@@ -431,4 +431,4 @@ ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
}
}
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc
index 40aef8e735..d2963851dc 100644
--- a/contrib/libs/grpc/src/cpp/server/server_callback.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc
@@ -15,70 +15,70 @@
*
*/
-#include <grpcpp/impl/codegen/server_callback.h>
+#include <grpcpp/impl/codegen/server_callback.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
-namespace grpc {
+namespace grpc {
namespace internal {
-void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) {
- if (inline_ondone) {
- CallOnDone();
- } else {
- // Unlike other uses of closure, do not Ref or Unref here since at this
- // point, all the Ref'fing and Unref'fing is done for this call.
- grpc_core::ExecCtx exec_ctx;
- struct ClosureWithArg {
- grpc_closure closure;
- ServerCallbackCall* call;
- explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) {
- GRPC_CLOSURE_INIT(&closure,
- [](void* void_arg, grpc_error*) {
- ClosureWithArg* arg =
- static_cast<ClosureWithArg*>(void_arg);
- arg->call->CallOnDone();
- delete arg;
- },
- this, grpc_schedule_on_exec_ctx);
- }
- };
- ClosureWithArg* arg = new ClosureWithArg(this);
- grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
- }
-}
-
+void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) {
+ if (inline_ondone) {
+ CallOnDone();
+ } else {
+ // Unlike other uses of closure, do not Ref or Unref here since at this
+ // point, all the Ref'fing and Unref'fing is done for this call.
+ grpc_core::ExecCtx exec_ctx;
+ struct ClosureWithArg {
+ grpc_closure closure;
+ ServerCallbackCall* call;
+ explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) {
+ GRPC_CLOSURE_INIT(&closure,
+ [](void* void_arg, grpc_error*) {
+ ClosureWithArg* arg =
+ static_cast<ClosureWithArg*>(void_arg);
+ arg->call->CallOnDone();
+ delete arg;
+ },
+ this, grpc_schedule_on_exec_ctx);
+ }
+ };
+ ClosureWithArg* arg = new ClosureWithArg(this);
+ grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
+ }
+}
+
void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) {
if (reactor->InternalInlineable()) {
reactor->OnCancel();
} else {
- // Ref to make sure that the closure executes before the whole call gets
- // destructed, and Unref within the closure.
+ // Ref to make sure that the closure executes before the whole call gets
+ // destructed, and Unref within the closure.
Ref();
grpc_core::ExecCtx exec_ctx;
- struct ClosureWithArg {
- grpc_closure closure;
+ struct ClosureWithArg {
+ grpc_closure closure;
ServerCallbackCall* call;
ServerReactor* reactor;
- ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg)
- : call(call_arg), reactor(reactor_arg) {
- GRPC_CLOSURE_INIT(&closure,
- [](void* void_arg, grpc_error*) {
- ClosureWithArg* arg =
- static_cast<ClosureWithArg*>(void_arg);
- arg->reactor->OnCancel();
- arg->call->MaybeDone();
- delete arg;
- },
- this, grpc_schedule_on_exec_ctx);
- }
+ ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg)
+ : call(call_arg), reactor(reactor_arg) {
+ GRPC_CLOSURE_INIT(&closure,
+ [](void* void_arg, grpc_error*) {
+ ClosureWithArg* arg =
+ static_cast<ClosureWithArg*>(void_arg);
+ arg->reactor->OnCancel();
+ arg->call->MaybeDone();
+ delete arg;
+ },
+ this, grpc_schedule_on_exec_ctx);
+ }
};
- ClosureWithArg* arg = new ClosureWithArg(this, reactor);
- grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
+ ClosureWithArg* arg = new ClosureWithArg(this, reactor);
+ grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
}
}
} // namespace internal
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index c2a911c7f7..d8725fe0eb 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -23,7 +23,7 @@
#include <utility>
#include <grpc/grpc.h>
-#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpcpp/completion_queue.h>
@@ -47,14 +47,14 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/completion_queue.h"
-#include "src/core/lib/surface/server.h"
+#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
-#include <util/stream/str.h>
-
+#include <util/stream/str.h>
+
namespace grpc {
namespace {
@@ -99,15 +99,15 @@ class UnimplementedAsyncRequestContext {
GenericServerAsyncReaderWriter generic_stream_;
};
-// TODO(vjpai): Just for this file, use some contents of the experimental
-// namespace here to make the code easier to read below. Remove this when
-// de-experimentalized fully.
-#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
-using ::grpc::experimental::CallbackGenericService;
-using ::grpc::experimental::CallbackServerContext;
-using ::grpc::experimental::GenericCallbackServerContext;
-#endif
-
+// TODO(vjpai): Just for this file, use some contents of the experimental
+// namespace here to make the code easier to read below. Remove this when
+// de-experimentalized fully.
+#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
+using ::grpc::experimental::CallbackGenericService;
+using ::grpc::experimental::CallbackServerContext;
+using ::grpc::experimental::GenericCallbackServerContext;
+#endif
+
} // namespace
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
@@ -293,10 +293,10 @@ class Server::UnimplementedAsyncRequest final
: private grpc::UnimplementedAsyncRequestContext,
public GenericAsyncRequest {
public:
- UnimplementedAsyncRequest(ServerInterface* server,
- grpc::ServerCompletionQueue* cq)
+ UnimplementedAsyncRequest(ServerInterface* server,
+ grpc::ServerCompletionQueue* cq)
: GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
- nullptr, false) {}
+ nullptr, false) {}
bool FinalizeResult(void** tag, bool* status) override;
@@ -528,54 +528,54 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
};
template <class ServerContextType>
-class Server::CallbackRequest final
- : public grpc::internal::CompletionQueueTag {
+class Server::CallbackRequest final
+ : public grpc::internal::CompletionQueueTag {
public:
- static_assert(
- std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value,
- "ServerContextType must be derived from CallbackServerContext");
-
- // For codegen services, the value of method represents the defined
- // characteristics of the method being requested. For generic services, method
- // is nullptr since these services don't have pre-defined methods.
- CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
- grpc::CompletionQueue* cq,
- grpc_core::Server::RegisteredCallAllocation* data)
+ static_assert(
+ std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value,
+ "ServerContextType must be derived from CallbackServerContext");
+
+ // For codegen services, the value of method represents the defined
+ // characteristics of the method being requested. For generic services, method
+ // is nullptr since these services don't have pre-defined methods.
+ CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
+ grpc::CompletionQueue* cq,
+ grpc_core::Server::RegisteredCallAllocation* data)
: server_(server),
method_(method),
- has_request_payload_(method->method_type() ==
- grpc::internal::RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- grpc::internal::RpcMethod::SERVER_STREAMING),
- cq_(cq),
+ has_request_payload_(method->method_type() ==
+ grpc::internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() ==
+ grpc::internal::RpcMethod::SERVER_STREAMING),
+ cq_(cq),
tag_(this) {
- CommonSetup(server, data);
- data->deadline = &deadline_;
- data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
+ CommonSetup(server, data);
+ data->deadline = &deadline_;
+ data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
}
- // For generic services, method is nullptr since these services don't have
- // pre-defined methods.
- CallbackRequest(Server* server, grpc::CompletionQueue* cq,
- grpc_core::Server::BatchCallAllocation* data)
- : server_(server),
- method_(nullptr),
- has_request_payload_(false),
- call_details_(new grpc_call_details),
- cq_(cq),
- tag_(this) {
- CommonSetup(server, data);
- grpc_call_details_init(call_details_);
- data->details = call_details_;
+ // For generic services, method is nullptr since these services don't have
+ // pre-defined methods.
+ CallbackRequest(Server* server, grpc::CompletionQueue* cq,
+ grpc_core::Server::BatchCallAllocation* data)
+ : server_(server),
+ method_(nullptr),
+ has_request_payload_(false),
+ call_details_(new grpc_call_details),
+ cq_(cq),
+ tag_(this) {
+ CommonSetup(server, data);
+ grpc_call_details_init(call_details_);
+ data->details = call_details_;
}
- ~CallbackRequest() {
- delete call_details_;
- grpc_metadata_array_destroy(&request_metadata_);
- if (has_request_payload_ && request_payload_) {
- grpc_byte_buffer_destroy(request_payload_);
- }
- server_->UnrefWithPossibleNotify();
+ ~CallbackRequest() {
+ delete call_details_;
+ grpc_metadata_array_destroy(&request_metadata_);
+ if (has_request_payload_ && request_payload_) {
+ grpc_byte_buffer_destroy(request_payload_);
+ }
+ server_->UnrefWithPossibleNotify();
}
// Needs specialization to account for different processing of metadata
@@ -680,48 +680,48 @@ class Server::CallbackRequest final
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
- req_->handler_data_, [this] { delete req_; }));
+ req_->handler_data_, [this] { delete req_; }));
}
};
- template <class CallAllocation>
- void CommonSetup(Server* server, CallAllocation* data) {
- server->Ref();
+ template <class CallAllocation>
+ void CommonSetup(Server* server, CallAllocation* data) {
+ server->Ref();
grpc_metadata_array_init(&request_metadata_);
- data->tag = &tag_;
- data->call = &call_;
- data->initial_metadata = &request_metadata_;
+ data->tag = &tag_;
+ data->call = &call_;
+ data->initial_metadata = &request_metadata_;
}
Server* const server_;
grpc::internal::RpcServiceMethod* const method_;
const bool has_request_payload_;
- grpc_byte_buffer* request_payload_ = nullptr;
- void* request_ = nullptr;
- void* handler_data_ = nullptr;
+ grpc_byte_buffer* request_payload_ = nullptr;
+ void* request_ = nullptr;
+ void* handler_data_ = nullptr;
grpc::Status request_status_;
- grpc_call_details* const call_details_ = nullptr;
+ grpc_call_details* const call_details_ = nullptr;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
- grpc::CompletionQueue* const cq_;
+ grpc::CompletionQueue* const cq_;
CallbackCallTag tag_;
ServerContextType ctx_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
template <>
-bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
- void** /*tag*/, bool* /*status*/) {
+bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
+ void** /*tag*/, bool* /*status*/) {
return false;
}
template <>
-bool Server::CallbackRequest<
- grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/,
- bool* status) {
+bool Server::CallbackRequest<
+ grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/,
+ bool* status) {
if (*status) {
- deadline_ = call_details_->deadline;
+ deadline_ = call_details_->deadline;
// TODO(yangg) remove the copy here
ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method);
ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host);
@@ -732,14 +732,14 @@ bool Server::CallbackRequest<
}
template <>
-const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
- const {
+const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
+ const {
return method_->name();
}
template <>
const char* Server::CallbackRequest<
- grpc::GenericCallbackServerContext>::method_name() const {
+ grpc::GenericCallbackServerContext>::method_name() const {
return ctx_.method().c_str();
}
@@ -867,7 +867,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
static grpc::internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(
- grpc::ChannelArguments* args,
+ grpc::ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
@@ -879,13 +879,13 @@ Server::Server(
interceptor_creators)
: acceptors_(std::move(acceptors)),
interceptor_creators_(std::move(interceptor_creators)),
- max_receive_message_size_(INT_MIN),
+ max_receive_message_size_(INT_MIN),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
shutdown_(false),
shutdown_notified_(false),
server_(nullptr),
- server_initializer_(new ServerInitializer(this)),
+ server_initializer_(new ServerInitializer(this)),
health_check_service_disabled_(false) {
g_gli_initializer.summon();
gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks);
@@ -930,10 +930,10 @@ Server::Server(
channel_args.args[i].value.pointer.p));
}
}
- if (0 ==
- strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) {
- max_receive_message_size_ = channel_args.args[i].value.integer;
- }
+ if (0 ==
+ strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) {
+ max_receive_message_size_ = channel_args.args[i].value.integer;
+ }
}
server_ = grpc_server_create(&channel_args, nullptr);
}
@@ -955,10 +955,10 @@ Server::~Server() {
}
}
}
- // Destroy health check service before we destroy the C server so that
- // it does not call grpc_server_request_registered_call() after the C
- // server has been destroyed.
- health_check_service_.reset();
+ // Destroy health check service before we destroy the C server so that
+ // it does not call grpc_server_request_registered_call() after the C
+ // server has been destroyed.
+ health_check_service_.reset();
grpc_server_destroy(server_);
}
@@ -1005,7 +1005,7 @@ static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
}
-bool Server::RegisterService(const TString* host, grpc::Service* service) {
+bool Server::RegisterService(const TString* host, grpc::Service* service) {
bool has_async_methods = service->has_async_methods();
if (has_async_methods) {
GPR_ASSERT(service->server_ == nullptr &&
@@ -1037,16 +1037,16 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) {
value->AddSyncMethod(method.get(), method_registration_tag);
}
} else {
- has_callback_methods_ = true;
- grpc::internal::RpcServiceMethod* method_value = method.get();
- grpc::CompletionQueue* cq = CallbackCQ();
- server_->core_server->SetRegisteredMethodAllocator(
- cq->cq(), method_registration_tag, [this, cq, method_value] {
- grpc_core::Server::RegisteredCallAllocation result;
- new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
- cq, &result);
- return result;
- });
+ has_callback_methods_ = true;
+ grpc::internal::RpcServiceMethod* method_value = method.get();
+ grpc::CompletionQueue* cq = CallbackCQ();
+ server_->core_server->SetRegisteredMethodAllocator(
+ cq->cq(), method_registration_tag, [this, cq, method_value] {
+ grpc_core::Server::RegisteredCallAllocation result;
+ new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
+ cq, &result);
+ return result;
+ });
}
method_name = method->name();
@@ -1055,10 +1055,10 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) {
// Parse service name.
if (method_name != nullptr) {
std::stringstream ss(method_name);
- std::string service_name;
+ std::string service_name;
if (std::getline(ss, service_name, '/') &&
std::getline(ss, service_name, '/')) {
- services_.push_back(service_name.c_str());
+ services_.push_back(service_name.c_str());
}
}
return true;
@@ -1072,7 +1072,7 @@ void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) {
}
void Server::RegisterCallbackGenericService(
- grpc::CallbackGenericService* service) {
+ grpc::CallbackGenericService* service) {
GPR_ASSERT(
service->server_ == nullptr &&
"Can only register a callback generic service against one server.");
@@ -1080,15 +1080,15 @@ void Server::RegisterCallbackGenericService(
has_callback_generic_service_ = true;
generic_handler_.reset(service->Handler());
- grpc::CompletionQueue* cq = CallbackCQ();
- server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] {
- grpc_core::Server::BatchCallAllocation result;
- new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
- return result;
- });
+ grpc::CompletionQueue* cq = CallbackCQ();
+ server_->core_server->SetBatchMethodAllocator(cq->cq(), [this, cq] {
+ grpc_core::Server::BatchCallAllocation result;
+ new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
+ return result;
+ });
}
-int Server::AddListeningPort(const TString& addr,
+int Server::AddListeningPort(const TString& addr,
grpc::ServerCredentials* creds) {
GPR_ASSERT(!started_);
int port = creds->AddPortToServer(addr, server_);
@@ -1096,31 +1096,31 @@ int Server::AddListeningPort(const TString& addr,
return port;
}
-void Server::Ref() {
- shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed);
-}
-
-void Server::UnrefWithPossibleNotify() {
- if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
- 1, std::memory_order_acq_rel) == 1)) {
- // No refs outstanding means that shutdown has been initiated and no more
- // callback requests are outstanding.
- grpc::internal::MutexLock lock(&mu_);
- GPR_ASSERT(shutdown_);
- shutdown_done_ = true;
- shutdown_done_cv_.Signal();
- }
-}
-
-void Server::UnrefAndWaitLocked() {
- if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
- 1, std::memory_order_acq_rel) == 1)) {
- shutdown_done_ = true;
- return; // no need to wait on CV since done condition already set
- }
- shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; });
-}
-
+void Server::Ref() {
+ shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed);
+}
+
+void Server::UnrefWithPossibleNotify() {
+ if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
+ 1, std::memory_order_acq_rel) == 1)) {
+ // No refs outstanding means that shutdown has been initiated and no more
+ // callback requests are outstanding.
+ grpc::internal::MutexLock lock(&mu_);
+ GPR_ASSERT(shutdown_);
+ shutdown_done_ = true;
+ shutdown_done_cv_.Signal();
+ }
+}
+
+void Server::UnrefAndWaitLocked() {
+ if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
+ 1, std::memory_order_acq_rel) == 1)) {
+ shutdown_done_ = true;
+ return; // no need to wait on CV since done condition already set
+ }
+ shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; });
+}
+
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
global_callbacks_->PreServerStart(this);
@@ -1156,17 +1156,17 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// If this server uses callback methods, then create a callback generic
// service to handle any unimplemented methods using the default reactor
// creator
- if (has_callback_methods_ && !has_callback_generic_service_) {
- unimplemented_service_.reset(new grpc::CallbackGenericService);
+ if (has_callback_methods_ && !has_callback_generic_service_) {
+ unimplemented_service_.reset(new grpc::CallbackGenericService);
RegisterCallbackGenericService(unimplemented_service_.get());
}
-#ifndef NDEBUG
- for (size_t i = 0; i < num_cqs; i++) {
- cq_list_.push_back(cqs[i]);
- }
-#endif
-
+#ifndef NDEBUG
+ for (size_t i = 0; i < num_cqs; i++) {
+ cq_list_.push_back(cqs[i]);
+ }
+#endif
+
grpc_server_start(server_);
if (!has_async_generic_service_ && !has_callback_generic_service_) {
@@ -1248,8 +1248,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
value->Wait();
}
- // Drop the shutdown ref and wait for all other refs to drop as well.
- UnrefAndWaitLocked();
+ // Drop the shutdown ref and wait for all other refs to drop as well.
+ UnrefAndWaitLocked();
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
@@ -1266,15 +1266,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
shutdown_notified_ = true;
shutdown_cv_.Broadcast();
-
-#ifndef NDEBUG
- // Unregister this server with the CQs passed into it by the user so that
- // those can be checked for properly-ordered shutdown.
- for (auto* cq : cq_list_) {
- cq->UnregisterServer(this);
- }
- cq_list_.clear();
-#endif
+
+#ifndef NDEBUG
+ // Unregister this server with the CQs passed into it by the user so that
+ // those can be checked for properly-ordered shutdown.
+ for (auto* cq : cq_list_) {
+ cq->UnregisterServer(this);
+ }
+ cq_list_.clear();
+#endif
}
void Server::Wait() {
@@ -1294,9 +1294,9 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
if (GenericAsyncRequest::FinalizeResult(tag, status)) {
// We either had no interceptors run or we are done intercepting
if (*status) {
- // Create a new request/response pair using the server and CQ values
- // stored in this object's base class.
- new UnimplementedAsyncRequest(server_, notification_cq_);
+ // Create a new request/response pair using the server and CQ values
+ // stored in this object's base class.
+ new UnimplementedAsyncRequest(server_, notification_cq_);
new UnimplementedAsyncResponse(this);
} else {
delete this;
@@ -1323,18 +1323,18 @@ grpc::CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered
grpc::internal::MutexLock l(&mu_);
- if (callback_cq_ != nullptr) {
- return callback_cq_;
- }
- auto* shutdown_callback = new grpc::ShutdownCallback;
- callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
- shutdown_callback});
-
- // Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
-
+ if (callback_cq_ != nullptr) {
+ return callback_cq_;
+ }
+ auto* shutdown_callback = new grpc::ShutdownCallback;
+ callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+
return callback_cq_;
}
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc
index 458ac20d87..1b93e3229a 100644
--- a/contrib/libs/grpc/src/cpp/server/server_context.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_context.cc
@@ -16,7 +16,7 @@
*
*/
-#include <grpcpp/impl/codegen/server_context.h>
+#include <grpcpp/impl/codegen/server_context.h>
#include <algorithm>
#include <utility>
@@ -27,7 +27,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
-#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/time.h>
@@ -35,17 +35,17 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/call.h"
-namespace grpc {
+namespace grpc {
// CompletionOp
class ServerContextBase::CompletionOp final
- : public internal::CallOpSetInterface {
+ : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
// must ref the call before calling constructor and after deleting this
- CompletionOp(internal::Call* call,
- ::grpc::internal::ServerCallbackCall* callback_controller)
+ CompletionOp(internal::Call* call,
+ ::grpc::internal::ServerCallbackCall* callback_controller)
: call_(*call),
callback_controller_(callback_controller),
has_tag_(false),
@@ -68,7 +68,7 @@ class ServerContextBase::CompletionOp final
}
}
- void FillOps(internal::Call* call) override;
+ void FillOps(internal::Call* call) override;
// This should always be arena allocated in the call, so override delete.
// But this class is not trivially destructible, so must actually call delete
@@ -136,8 +136,8 @@ class ServerContextBase::CompletionOp final
return finalized_ ? (cancelled_ != 0) : false;
}
- internal::Call call_;
- ::grpc::internal::ServerCallbackCall* const callback_controller_;
+ internal::Call call_;
+ ::grpc::internal::ServerCallbackCall* const callback_controller_;
bool has_tag_;
void* tag_;
void* core_cq_tag_;
@@ -146,7 +146,7 @@ class ServerContextBase::CompletionOp final
bool finalized_;
int cancelled_; // This is an int (not bool) because it is passed to core
bool done_intercepting_;
- internal::InterceptorBatchMethodsImpl interceptor_methods_;
+ internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
void ServerContextBase::CompletionOp::Unref() {
@@ -157,7 +157,7 @@ void ServerContextBase::CompletionOp::Unref() {
}
}
-void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
+void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
grpc_op ops;
ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops.data.recv_close_on_server.cancelled = &cancelled_;
@@ -174,31 +174,31 @@ void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
}
bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
- // Decide whether to call the cancel callback within the lock
- bool call_cancel;
-
- {
- grpc_core::MutexLock lock(&mu_);
- if (done_intercepting_) {
- // We are done intercepting.
- bool has_tag = has_tag_;
- if (has_tag) {
- *tag = tag_;
- }
- Unref();
- return has_tag;
- }
- finalized_ = true;
-
- // If for some reason the incoming status is false, mark that as a
- // cancellation.
- // TODO(vjpai): does this ever happen?
- if (!*status) {
- cancelled_ = 1;
+ // Decide whether to call the cancel callback within the lock
+ bool call_cancel;
+
+ {
+ grpc_core::MutexLock lock(&mu_);
+ if (done_intercepting_) {
+ // We are done intercepting.
+ bool has_tag = has_tag_;
+ if (has_tag) {
+ *tag = tag_;
+ }
+ Unref();
+ return has_tag;
}
-
- call_cancel = (cancelled_ != 0);
- // Release the lock since we may call a callback and interceptors.
+ finalized_ = true;
+
+ // If for some reason the incoming status is false, mark that as a
+ // cancellation.
+ // TODO(vjpai): does this ever happen?
+ if (!*status) {
+ cancelled_ = 1;
+ }
+
+ call_cancel = (cancelled_ != 0);
+ // Release the lock since we may call a callback and interceptors.
}
if (call_cancel && callback_controller_ != nullptr) {
@@ -206,28 +206,28 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
}
/* Add interception point and run through interceptors */
interceptor_methods_.AddInterceptionHookPoint(
- experimental::InterceptionHookPoints::POST_RECV_CLOSE);
+ experimental::InterceptionHookPoints::POST_RECV_CLOSE);
if (interceptor_methods_.RunInterceptors()) {
- // No interceptors were run
- bool has_tag = has_tag_;
- if (has_tag) {
+ // No interceptors were run
+ bool has_tag = has_tag_;
+ if (has_tag) {
*tag = tag_;
}
Unref();
- return has_tag;
+ return has_tag;
}
- // There are interceptors to be run. Return false for now.
+ // There are interceptors to be run. Return false for now.
return false;
}
// ServerContextBase body
-ServerContextBase::ServerContextBase()
- : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
+ServerContextBase::ServerContextBase()
+ : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
ServerContextBase::ServerContextBase(gpr_timespec deadline,
- grpc_metadata_array* arr)
- : deadline_(deadline) {
+ grpc_metadata_array* arr)
+ : deadline_(deadline) {
std::swap(*client_metadata_.arr(), *arr);
}
@@ -237,7 +237,7 @@ void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
std::swap(*client_metadata_.arr(), *arr);
}
-ServerContextBase::~ServerContextBase() {
+ServerContextBase::~ServerContextBase() {
if (completion_op_) {
completion_op_->Unref();
}
@@ -245,21 +245,21 @@ ServerContextBase::~ServerContextBase() {
rpc_info_->Unref();
}
if (default_reactor_used_.load(std::memory_order_relaxed)) {
- reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
- }
-}
-
-ServerContextBase::CallWrapper::~CallWrapper() {
- if (call) {
- // If the ServerContext is part of the call's arena, this could free the
- // object itself.
- grpc_call_unref(call);
+ reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
}
}
+ServerContextBase::CallWrapper::~CallWrapper() {
+ if (call) {
+ // If the ServerContext is part of the call's arena, this could free the
+ // object itself.
+ grpc_call_unref(call);
+ }
+}
+
void ServerContextBase::BeginCompletionOp(
- internal::Call* call, std::function<void(bool)> callback,
- ::grpc::internal::ServerCallbackCall* callback_controller) {
+ internal::Call* call, std::function<void(bool)> callback,
+ ::grpc::internal::ServerCallbackCall* callback_controller) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
@@ -279,30 +279,30 @@ void ServerContextBase::BeginCompletionOp(
call->PerformOps(completion_op_);
}
-internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
- return static_cast<internal::CompletionQueueTag*>(completion_op_);
+internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
+ return static_cast<internal::CompletionQueueTag*>(completion_op_);
}
-void ServerContextBase::AddInitialMetadata(const TString& key,
- const TString& value) {
+void ServerContextBase::AddInitialMetadata(const TString& key,
+ const TString& value) {
initial_metadata_.insert(std::make_pair(key, value));
}
-void ServerContextBase::AddTrailingMetadata(const TString& key,
- const TString& value) {
+void ServerContextBase::AddTrailingMetadata(const TString& key,
+ const TString& value) {
trailing_metadata_.insert(std::make_pair(key, value));
}
void ServerContextBase::TryCancel() const {
- internal::CancelInterceptorBatchMethods cancel_methods;
+ internal::CancelInterceptorBatchMethods cancel_methods;
if (rpc_info_) {
for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
rpc_info_->RunInterceptor(&cancel_methods, i);
}
}
- grpc_call_error err =
- grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
- "Cancelled on the server side", nullptr);
+ grpc_call_error err =
+ grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
+ "Cancelled on the server side", nullptr);
if (err != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
}
@@ -335,10 +335,10 @@ void ServerContextBase::set_compression_algorithm(
AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
}
-TString ServerContextBase::peer() const {
- TString peer;
- if (call_.call) {
- char* c_peer = grpc_call_get_peer(call_.call);
+TString ServerContextBase::peer() const {
+ TString peer;
+ if (call_.call) {
+ char* c_peer = grpc_call_get_peer(call_.call);
peer = c_peer;
gpr_free(c_peer);
}
@@ -346,16 +346,16 @@ TString ServerContextBase::peer() const {
}
const struct census_context* ServerContextBase::census_context() const {
- return call_.call == nullptr ? nullptr
- : grpc_census_call_get_context(call_.call);
+ return call_.call == nullptr ? nullptr
+ : grpc_census_call_get_context(call_.call);
}
void ServerContextBase::SetLoadReportingCosts(
- const std::vector<TString>& cost_data) {
- if (call_.call == nullptr) return;
+ const std::vector<TString>& cost_data) {
+ if (call_.call == nullptr) return;
for (const auto& cost_datum : cost_data) {
AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
}
}
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_credentials.cc b/contrib/libs/grpc/src/cpp/server/server_credentials.cc
index c3b3a8b379..449e583abd 100644
--- a/contrib/libs/grpc/src/cpp/server/server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_credentials.cc
@@ -16,10 +16,10 @@
*
*/
-#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/security/server_credentials.h>
-namespace grpc {
+namespace grpc {
ServerCredentials::~ServerCredentials() {}
-} // namespace grpc
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/server_posix.cc b/contrib/libs/grpc/src/cpp/server/server_posix.cc
index c3d40d4fa2..3aaa6db0ea 100644
--- a/contrib/libs/grpc/src/cpp/server/server_posix.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_posix.cc
@@ -20,7 +20,7 @@
#include <grpc/grpc_posix.h>
-namespace grpc {
+namespace grpc {
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
@@ -30,4 +30,4 @@ void AddInsecureChannelFromFd(grpc::Server* server, int fd) {
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
-} // namespace grpc
+} // namespace grpc