aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-03-15 21:33:41 +0300
committerarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-03-15 21:33:41 +0300
commit3dd665b514943f69657b593eb51af90b99b1206b (patch)
tree0eb633e628bb1fe6c639574b1184d43def7c0a73 /contrib/libs/grpc/src/cpp
parenta68afc731202027f105bc5723ee11788017c29e2 (diff)
downloadydb-3dd665b514943f69657b593eb51af90b99b1206b.tar.gz
intermediate changes
ref:953ca886ec160075b38c0f3614de029b423f0a9e
Diffstat (limited to 'contrib/libs/grpc/src/cpp')
-rwxr-xr-xcontrib/libs/grpc/src/cpp/README.md4
-rw-r--r--contrib/libs/grpc/src/cpp/client/channel_cc.cc31
-rw-r--r--contrib/libs/grpc/src/cpp/client/client_callback.cc16
-rw-r--r--contrib/libs/grpc/src/cpp/client/client_context.cc1
-rw-r--r--contrib/libs/grpc/src/cpp/client/insecure_credentials.cc3
-rw-r--r--contrib/libs/grpc/src/cpp/client/secure_credentials.cc35
-rw-r--r--contrib/libs/grpc/src/cpp/client/secure_credentials.h12
-rw-r--r--contrib/libs/grpc/src/cpp/client/xds_credentials.cc41
-rw-r--r--contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt18
-rw-r--r--contrib/libs/grpc/src/cpp/common/alarm.cc30
-rw-r--r--contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc2
-rw-r--r--contrib/libs/grpc/src/cpp/common/channel_filter.cc2
-rw-r--r--contrib/libs/grpc/src/cpp/common/channel_filter.h2
-rw-r--r--contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc110
-rw-r--r--contrib/libs/grpc/src/cpp/common/secure_auth_context.h2
-rw-r--r--contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc59
-rw-r--r--contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc275
-rw-r--r--contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc75
-rw-r--r--contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h16
-rw-r--r--contrib/libs/grpc/src/cpp/common/version_cc.cc2
-rw-r--r--contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc5
-rw-r--r--contrib/libs/grpc/src/cpp/server/channel_argument_option.cc8
-rw-r--r--contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc9
-rw-r--r--contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc15
-rw-r--r--contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h4
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc14
-rw-r--r--contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h2
-rw-r--r--contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc3
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/constants.h2
-rw-r--r--contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc2
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc9
-rw-r--r--contrib/libs/grpc/src/cpp/server/secure_server_credentials.h6
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_builder.cc23
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_callback.cc34
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_cc.cc522
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_context.cc54
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_credentials.cc5
-rw-r--r--contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc41
-rw-r--r--contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc10
-rw-r--r--contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h2
-rw-r--r--contrib/libs/grpc/src/cpp/util/error_details.cc31
41 files changed, 813 insertions, 724 deletions
diff --git a/contrib/libs/grpc/src/cpp/README.md b/contrib/libs/grpc/src/cpp/README.md
index 967a0a43b7..134f4db56c 100755
--- a/contrib/libs/grpc/src/cpp/README.md
+++ b/contrib/libs/grpc/src/cpp/README.md
@@ -25,10 +25,10 @@ To add gRPC as a dependency in bazel:
],
strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA",
)
-
load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")
-
grpc_deps()
+ load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")
+ grpc_extra_deps()
```
## CMake
diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
index ac95c29efc..1793161d6c 100644
--- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc
+++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
@@ -38,7 +38,9 @@
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/status.h>
+
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
@@ -56,7 +58,12 @@ Channel::Channel(const TString& host, grpc_channel* channel,
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ callback_cq_->Shutdown();
+ } else {
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
+ }
}
}
@@ -238,13 +245,21 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
// if there is no explicit per-channel CQ registered
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) {
- auto* shutdown_callback = new ShutdownCallback;
- callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
- shutdown_callback});
-
- // Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+
+ auto* shutdown_callback = new ShutdownCallback;
+ callback_cq_ =
+ new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK,
+ GRPC_CQ_DEFAULT_POLLING, shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ } else {
+ // Otherwise we need to use the alternative CQ variant
+ callback_cq_ = CompletionQueue::CallbackAlternativeCQ();
+ }
}
return callback_cq_;
}
diff --git a/contrib/libs/grpc/src/cpp/client/client_callback.cc b/contrib/libs/grpc/src/cpp/client/client_callback.cc
index f4cbc97d34..a05761ad06 100644
--- a/contrib/libs/grpc/src/cpp/client/client_callback.cc
+++ b/contrib/libs/grpc/src/cpp/client/client_callback.cc
@@ -34,14 +34,14 @@ void ClientReactor::InternalScheduleOnDone(grpc::Status s) {
const grpc::Status status;
ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s)
: reactor(reactor_arg), status(std::move(s)) {
- GRPC_CLOSURE_INIT(&closure,
- [](void* void_arg, grpc_error*) {
- ClosureWithArg* arg =
- static_cast<ClosureWithArg*>(void_arg);
- arg->reactor->OnDone(arg->status);
- delete arg;
- },
- this, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(
+ &closure,
+ [](void* void_arg, grpc_error*) {
+ ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
+ arg->reactor->OnDone(arg->status);
+ delete arg;
+ },
+ this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this, std::move(s));
diff --git a/contrib/libs/grpc/src/cpp/client/client_context.cc b/contrib/libs/grpc/src/cpp/client/client_context.cc
index b75343d089..8f18bfe724 100644
--- a/contrib/libs/grpc/src/cpp/client/client_context.cc
+++ b/contrib/libs/grpc/src/cpp/client/client_context.cc
@@ -62,6 +62,7 @@ ClientContext::ClientContext()
propagate_from_call_(nullptr),
compression_algorithm_(GRPC_COMPRESS_NONE),
initial_metadata_corked_(false) {
+ g_gli_initializer.summon();
g_client_callbacks->DefaultConstructor(this);
}
diff --git a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc
index e5bafff70a..b7036e541c 100644
--- a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc
@@ -51,6 +51,9 @@ class InsecureChannelCredentialsImpl final : public ChannelCredentials {
}
SecureChannelCredentials* AsSecureCredentials() override { return nullptr; }
+
+ private:
+ bool IsInsecure() const override { return true; }
};
} // namespace
diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc
index 0f6db3caa5..109b50448d 100644
--- a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc
@@ -28,6 +28,9 @@
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/support/channel_arguments.h>
+#include "y_absl/strings/str_join.h"
+
+// TODO(yashykt): We shouldn't be including "src/core" headers.
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
@@ -79,7 +82,8 @@ bool SecureCallCredentials::ApplyToCall(grpc_call* call) {
return grpc_call_set_credentials(call, c_creds_) == GRPC_CALL_OK;
}
-namespace {
+namespace internal {
+
std::shared_ptr<ChannelCredentials> WrapChannelCredentials(
grpc_channel_credentials* creds) {
return creds == nullptr ? nullptr
@@ -87,6 +91,10 @@ std::shared_ptr<ChannelCredentials> WrapChannelCredentials(
new SecureChannelCredentials(creds));
}
+} // namespace internal
+
+namespace {
+
std::shared_ptr<CallCredentials> WrapCallCredentials(
grpc_call_credentials* creds) {
return creds == nullptr ? nullptr
@@ -97,10 +105,17 @@ std::shared_ptr<CallCredentials> WrapCallCredentials(
std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() {
grpc::GrpcLibraryCodegen init; // To call grpc_init().
- return WrapChannelCredentials(
+ return internal::WrapChannelCredentials(
grpc_google_default_credentials_create(nullptr));
}
+std::shared_ptr<CallCredentials> ExternalAccountCredentials(
+ const grpc::string& json_string, const std::vector<grpc::string>& scopes) {
+ grpc::GrpcLibraryCodegen init; // To call grpc_init().
+ return WrapCallCredentials(grpc_external_account_credentials_create(
+ json_string.c_str(), y_absl::StrJoin(scopes, ",").c_str()));
+}
+
// Builds SSL Credentials given SSL specific options
std::shared_ptr<ChannelCredentials> SslCredentials(
const SslCredentialsOptions& options) {
@@ -112,7 +127,7 @@ std::shared_ptr<ChannelCredentials> SslCredentials(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr,
nullptr);
- return WrapChannelCredentials(c_creds);
+ return internal::WrapChannelCredentials(c_creds);
}
namespace experimental {
@@ -277,20 +292,20 @@ std::shared_ptr<ChannelCredentials> AltsCredentials(
}
grpc_channel_credentials* c_creds = grpc_alts_credentials_create(c_options);
grpc_alts_credentials_options_destroy(c_options);
- return WrapChannelCredentials(c_creds);
+ return internal::WrapChannelCredentials(c_creds);
}
// Builds Local Credentials
std::shared_ptr<ChannelCredentials> LocalCredentials(
grpc_local_connect_type type) {
grpc::GrpcLibraryCodegen init; // To call grpc_init().
- return WrapChannelCredentials(grpc_local_credentials_create(type));
+ return internal::WrapChannelCredentials(grpc_local_credentials_create(type));
}
// Builds TLS Credentials given TLS options.
std::shared_ptr<ChannelCredentials> TlsCredentials(
- const TlsCredentialsOptions& options) {
- return WrapChannelCredentials(
+ const TlsChannelCredentialsOptions& options) {
+ return internal::WrapChannelCredentials(
grpc_tls_credentials_create(options.c_credentials_options()));
}
@@ -356,8 +371,10 @@ std::shared_ptr<ChannelCredentials> CompositeChannelCredentials(
channel_creds->AsSecureCredentials();
SecureCallCredentials* s_call_creds = call_creds->AsSecureCredentials();
if (s_channel_creds && s_call_creds) {
- return WrapChannelCredentials(grpc_composite_channel_credentials_create(
- s_channel_creds->GetRawCreds(), s_call_creds->GetRawCreds(), nullptr));
+ return internal::WrapChannelCredentials(
+ grpc_composite_channel_credentials_create(
+ s_channel_creds->GetRawCreds(), s_call_creds->GetRawCreds(),
+ nullptr));
}
return nullptr;
}
diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.h b/contrib/libs/grpc/src/cpp/client/secure_credentials.h
index 4fc79346bf..9325f365b1 100644
--- a/contrib/libs/grpc/src/cpp/client/secure_credentials.h
+++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.h
@@ -26,6 +26,7 @@
#include <grpcpp/support/config.h>
#include "y_absl/strings/str_cat.h"
+// TODO(yashykt): We shouldn't be including "src/core" headers.
#include "src/core/lib/security/credentials/credentials.h"
#include "src/cpp/server/thread_pool_interface.h"
@@ -36,7 +37,7 @@ class Channel;
class SecureChannelCredentials final : public ChannelCredentials {
public:
explicit SecureChannelCredentials(grpc_channel_credentials* c_creds);
- ~SecureChannelCredentials() {
+ ~SecureChannelCredentials() override {
if (c_creds_ != nullptr) c_creds_->Unref();
}
grpc_channel_credentials* GetRawCreds() { return c_creds_; }
@@ -58,7 +59,7 @@ class SecureChannelCredentials final : public ChannelCredentials {
class SecureCallCredentials final : public CallCredentials {
public:
explicit SecureCallCredentials(grpc_call_credentials* c_creds);
- ~SecureCallCredentials() {
+ ~SecureCallCredentials() override {
if (c_creds_ != nullptr) c_creds_->Unref();
}
grpc_call_credentials* GetRawCreds() { return c_creds_; }
@@ -74,6 +75,13 @@ class SecureCallCredentials final : public CallCredentials {
grpc_call_credentials* const c_creds_;
};
+namespace internal {
+
+std::shared_ptr<ChannelCredentials> WrapChannelCredentials(
+ grpc_channel_credentials* creds);
+
+} // namespace internal
+
namespace experimental {
// Transforms C++ STS Credentials options to core options. The pointers of the
diff --git a/contrib/libs/grpc/src/cpp/client/xds_credentials.cc b/contrib/libs/grpc/src/cpp/client/xds_credentials.cc
new file mode 100644
index 0000000000..63b48837ef
--- /dev/null
+++ b/contrib/libs/grpc/src/cpp/client/xds_credentials.cc
@@ -0,0 +1,41 @@
+//
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//
+
+#include "src/cpp/client/secure_credentials.h"
+
+namespace grpc {
+namespace experimental {
+
+std::shared_ptr<ChannelCredentials> XdsCredentials(
+ const std::shared_ptr<ChannelCredentials>& fallback_creds) {
+ GPR_ASSERT(fallback_creds != nullptr);
+ if (fallback_creds->IsInsecure()) {
+ grpc_channel_credentials* insecure_creds =
+ grpc_insecure_credentials_create();
+ auto xds_creds = internal::WrapChannelCredentials(
+ grpc_xds_credentials_create(insecure_creds));
+ grpc_channel_credentials_release(insecure_creds);
+ return xds_creds;
+ } else {
+ return internal::WrapChannelCredentials(grpc_xds_credentials_create(
+ fallback_creds->AsSecureCredentials()->GetRawCreds()));
+ }
+}
+
+} // namespace experimental
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt b/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt
index a5d42d5b53..ed4a3efa9f 100644
--- a/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt
+++ b/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt
@@ -12,6 +12,20 @@
* limitations under the License.
+====================Apache-2.0====================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
====================COPYRIGHT====================
* Copyright 2015 gRPC authors.
@@ -25,4 +39,8 @@
====================COPYRIGHT====================
+ * Copyright 2020 gRPC authors.
+
+
+====================COPYRIGHT====================
# Copyright 2019 gRPC authors.
diff --git a/contrib/libs/grpc/src/cpp/common/alarm.cc b/contrib/libs/grpc/src/cpp/common/alarm.cc
index a2612874b2..ac1b747b49 100644
--- a/contrib/libs/grpc/src/cpp/common/alarm.cc
+++ b/contrib/libs/grpc/src/cpp/common/alarm.cc
@@ -41,7 +41,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
}
- ~AlarmImpl() {}
+ ~AlarmImpl() override {}
bool FinalizeResult(void** tag, bool* /*status*/) override {
*tag = tag_;
Unref();
@@ -80,20 +80,20 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
Ref();
- GRPC_CLOSURE_INIT(&on_alarm_,
- [](void* arg, grpc_error* error) {
- grpc_core::Executor::Run(
- GRPC_CLOSURE_CREATE(
- [](void* arg, grpc_error* error) {
- AlarmImpl* alarm =
- static_cast<AlarmImpl*>(arg);
- alarm->callback_(error == GRPC_ERROR_NONE);
- alarm->Unref();
- },
- arg, nullptr),
- error);
- },
- this, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(
+ &on_alarm_,
+ [](void* arg, grpc_error* error) {
+ grpc_core::Executor::Run(
+ GRPC_CLOSURE_CREATE(
+ [](void* arg, grpc_error* error) {
+ AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
+ alarm->callback_(error == GRPC_ERROR_NONE);
+ alarm->Unref();
+ },
+ arg, nullptr),
+ error);
+ },
+ this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
&on_alarm_);
}
diff --git a/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc b/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc
index fbb18e9915..0f380b0950 100644
--- a/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc
+++ b/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc
@@ -61,7 +61,7 @@ bool AuthPropertyIterator::operator!=(const AuthPropertyIterator& rhs) const {
return !operator==(rhs);
}
-const AuthProperty AuthPropertyIterator::operator*() {
+AuthProperty AuthPropertyIterator::operator*() {
return std::pair<grpc::string_ref, grpc::string_ref>(
property_->name,
grpc::string_ref(property_->value, property_->value_length));
diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.cc b/contrib/libs/grpc/src/cpp/common/channel_filter.cc
index 8df6c7b98f..ab56d6073f 100644
--- a/contrib/libs/grpc/src/cpp/common/channel_filter.cc
+++ b/contrib/libs/grpc/src/cpp/common/channel_filter.cc
@@ -87,7 +87,7 @@ void ChannelFilterPluginInit() {
for (size_t i = 0; i < channel_filters->size(); ++i) {
FilterRecord& filter = (*channel_filters)[i];
grpc_channel_init_register_stage(filter.stack_type, filter.priority,
- MaybeAddFilter, (void*)&filter);
+ MaybeAddFilter, &filter);
}
}
diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.h b/contrib/libs/grpc/src/cpp/common/channel_filter.h
index 5ce720b307..b16f06bddc 100644
--- a/contrib/libs/grpc/src/cpp/common/channel_filter.h
+++ b/contrib/libs/grpc/src/cpp/common/channel_filter.h
@@ -60,7 +60,7 @@ class MetadataBatch {
const grpc_mdelem> {
public:
const grpc_mdelem& operator*() const { return elem_->md; }
- const grpc_mdelem operator->() const { return elem_->md; }
+ grpc_mdelem operator->() const { return elem_->md; }
const_iterator& operator++() {
elem_ = elem_->next;
diff --git a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc
index 96a7105eaf..6774559074 100644
--- a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc
+++ b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc
@@ -20,13 +20,106 @@
#include <memory>
#include <grpc/grpc.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/support/time.h>
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
+namespace {
+
+internal::GrpcLibraryInitializer g_gli_initializer;
+
+gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
+grpc_core::Mutex* g_callback_alternative_mu;
+
+// Implement a ref-counted callback CQ for global use in the alternative
+// implementation so that its threads are only created once. Do this using
+// explicit ref-counts and raw pointers rather than a shared-ptr since that
+// has a non-trivial destructor and thus can't be used for global variables.
+struct CallbackAlternativeCQ {
+ int refs Y_ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
+ CompletionQueue* cq Y_ABSL_GUARDED_BY(g_callback_alternative_mu);
+ std::vector<grpc_core::Thread>* nexting_threads
+ Y_ABSL_GUARDED_BY(g_callback_alternative_mu);
-static internal::GrpcLibraryInitializer g_gli_initializer;
+ CompletionQueue* Ref() {
+ grpc_core::MutexLock lock(&*g_callback_alternative_mu);
+ refs++;
+ if (refs == 1) {
+ cq = new CompletionQueue;
+ int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores() / 2, 2, 16);
+ nexting_threads = new std::vector<grpc_core::Thread>;
+ for (int i = 0; i < num_nexting_threads; i++) {
+ nexting_threads->emplace_back(
+ "nexting_thread",
+ [](void* arg) {
+ grpc_completion_queue* cq =
+ static_cast<CompletionQueue*>(arg)->cq();
+ while (true) {
+ // Use the raw Core next function rather than the C++ Next since
+ // Next incorporates FinalizeResult and we actually want that
+ // called from the callback functor itself.
+ // TODO(vjpai): Migrate below to next without a timeout or idle
+ // phase. That's currently starving out some other polling,
+ // though.
+ auto ev = grpc_completion_queue_next(
+ cq,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(1000, GPR_TIMESPAN)),
+ nullptr);
+ if (ev.type == GRPC_QUEUE_SHUTDOWN) {
+ return;
+ }
+ if (ev.type == GRPC_QUEUE_TIMEOUT) {
+ gpr_sleep_until(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(100, GPR_TIMESPAN)));
+ continue;
+ }
+ GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE);
+ // We can always execute the callback inline rather than
+ // pushing it to another Executor thread because this
+ // thread is definitely running on a background thread, does not
+ // hold any application locks before executing the callback,
+ // and cannot be entered recursively.
+ auto* functor =
+ static_cast<grpc_experimental_completion_queue_functor*>(
+ ev.tag);
+ functor->functor_run(functor, ev.success);
+ }
+ },
+ cq);
+ }
+ for (auto& th : *nexting_threads) {
+ th.Start();
+ }
+ }
+ return cq;
+ }
+
+ void Unref() {
+ grpc_core::MutexLock lock(g_callback_alternative_mu);
+ refs--;
+ if (refs == 0) {
+ cq->Shutdown();
+ for (auto& th : *nexting_threads) {
+ th.Join();
+ }
+ delete nexting_threads;
+ delete cq;
+ }
+ }
+};
+
+CallbackAlternativeCQ g_callback_alternative_cq;
+
+} // namespace
// 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
// i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
@@ -96,4 +189,19 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
return false;
}
+CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
+ gpr_once_init(&g_once_init_callback_alternative,
+ [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
+ return g_callback_alternative_cq.Ref();
+}
+
+void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
+ Y_ABSL_NO_THREAD_SAFETY_ANALYSIS {
+ (void)cq;
+ // This accesses g_callback_alternative_cq without acquiring the mutex
+ // but it's considered safe because it just reads the pointer address.
+ GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
+ g_callback_alternative_cq.Unref();
+}
+
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h
index 51013efac7..0e85a7077b 100644
--- a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h
+++ b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h
@@ -49,7 +49,7 @@ class SecureAuthContext final : public AuthContext {
void AddProperty(const TString& key,
const grpc::string_ref& value) override;
- virtual bool SetPeerIdentityPropertyName(const TString& name) override;
+ bool SetPeerIdentityPropertyName(const TString& name) override;
private:
grpc_core::RefCountedPtr<grpc_auth_context> ctx_;
diff --git a/contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc b/contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc
new file mode 100644
index 0000000000..8e99b571a1
--- /dev/null
+++ b/contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc
@@ -0,0 +1,59 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
+#include <grpcpp/security/tls_certificate_provider.h>
+
+#include "y_absl/container/inlined_vector.h"
+
+namespace grpc {
+namespace experimental {
+
+StaticDataCertificateProvider::StaticDataCertificateProvider(
+ const TString& root_certificate,
+ const std::vector<IdentityKeyCertPair>& identity_key_cert_pairs) {
+ GPR_ASSERT(!root_certificate.empty() || !identity_key_cert_pairs.empty());
+ grpc_tls_identity_pairs* pairs_core = grpc_tls_identity_pairs_create();
+ for (const IdentityKeyCertPair& pair : identity_key_cert_pairs) {
+ grpc_tls_identity_pairs_add_pair(pairs_core, pair.private_key.c_str(),
+ pair.certificate_chain.c_str());
+ }
+ c_provider_ = grpc_tls_certificate_provider_static_data_create(
+ root_certificate.c_str(), pairs_core);
+ GPR_ASSERT(c_provider_ != nullptr);
+};
+
+StaticDataCertificateProvider::~StaticDataCertificateProvider() {
+ grpc_tls_certificate_provider_release(c_provider_);
+};
+
+FileWatcherCertificateProvider::FileWatcherCertificateProvider(
+ const TString& private_key_path,
+ const TString& identity_certificate_path,
+ const TString& root_cert_path, unsigned int refresh_interval_sec) {
+ c_provider_ = grpc_tls_certificate_provider_file_watcher_create(
+ private_key_path.c_str(), identity_certificate_path.c_str(),
+ root_cert_path.c_str(), refresh_interval_sec);
+ GPR_ASSERT(c_provider_ != nullptr);
+};
+
+FileWatcherCertificateProvider::~FileWatcherCertificateProvider() {
+ grpc_tls_certificate_provider_release(c_provider_);
+};
+
+} // namespace experimental
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc b/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc
index 7e435ac1de..d612bd062e 100644
--- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc
+++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc
@@ -16,178 +16,22 @@
*
*/
+#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpcpp/security/tls_credentials_options.h>
#include "y_absl/container/inlined_vector.h"
-#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h"
#include "src/cpp/common/tls_credentials_options_util.h"
namespace grpc {
namespace experimental {
-/** TLS key materials config API implementation **/
-void TlsKeyMaterialsConfig::set_pem_root_certs(
- const TString& pem_root_certs) {
- pem_root_certs_ = pem_root_certs;
-}
-
-void TlsKeyMaterialsConfig::add_pem_key_cert_pair(
- const PemKeyCertPair& pem_key_cert_pair) {
- pem_key_cert_pair_list_.push_back(pem_key_cert_pair);
-}
-
-void TlsKeyMaterialsConfig::set_key_materials(
- const TString& pem_root_certs,
- const std::vector<PemKeyCertPair>& pem_key_cert_pair_list) {
- pem_key_cert_pair_list_ = pem_key_cert_pair_list;
- pem_root_certs_ = pem_root_certs;
-}
-
-/** TLS credential reload arg API implementation **/
-TlsCredentialReloadArg::TlsCredentialReloadArg(
- grpc_tls_credential_reload_arg* arg)
- : c_arg_(arg) {
- if (c_arg_ != nullptr && c_arg_->context != nullptr) {
- gpr_log(GPR_ERROR, "c_arg context has already been set");
- }
- c_arg_->context = static_cast<void*>(this);
- c_arg_->destroy_context = &TlsCredentialReloadArgDestroyContext;
-}
-
-TlsCredentialReloadArg::~TlsCredentialReloadArg() {}
-
-void* TlsCredentialReloadArg::cb_user_data() const {
- return c_arg_->cb_user_data;
-}
-bool TlsCredentialReloadArg::is_pem_key_cert_pair_list_empty() const {
- return c_arg_->key_materials_config->pem_key_cert_pair_list().empty();
-}
-
-grpc_ssl_certificate_config_reload_status TlsCredentialReloadArg::status()
- const {
- return c_arg_->status;
-}
-
-TString TlsCredentialReloadArg::error_details() const {
- return c_arg_->error_details->error_details();
-}
-
-void TlsCredentialReloadArg::set_cb_user_data(void* cb_user_data) {
- c_arg_->cb_user_data = cb_user_data;
-}
-
-void TlsCredentialReloadArg::set_pem_root_certs(
- const TString& pem_root_certs) {
- ::grpc_core::UniquePtr<char> c_pem_root_certs(
- gpr_strdup(pem_root_certs.c_str()));
- c_arg_->key_materials_config->set_pem_root_certs(std::move(c_pem_root_certs));
-}
-
-namespace {
-
-::grpc_core::PemKeyCertPair ConvertToCorePemKeyCertPair(
- const TlsKeyMaterialsConfig::PemKeyCertPair& pem_key_cert_pair) {
- grpc_ssl_pem_key_cert_pair* ssl_pair =
- (grpc_ssl_pem_key_cert_pair*)gpr_malloc(
- sizeof(grpc_ssl_pem_key_cert_pair));
- ssl_pair->private_key = gpr_strdup(pem_key_cert_pair.private_key.c_str());
- ssl_pair->cert_chain = gpr_strdup(pem_key_cert_pair.cert_chain.c_str());
- return ::grpc_core::PemKeyCertPair(ssl_pair);
-}
-
-} // namespace
-
-void TlsCredentialReloadArg::add_pem_key_cert_pair(
- const TlsKeyMaterialsConfig::PemKeyCertPair& pem_key_cert_pair) {
- c_arg_->key_materials_config->add_pem_key_cert_pair(
- ConvertToCorePemKeyCertPair(pem_key_cert_pair));
-}
-
-void TlsCredentialReloadArg::set_key_materials(
- const TString& pem_root_certs,
- std::vector<TlsKeyMaterialsConfig::PemKeyCertPair> pem_key_cert_pair_list) {
- /** Initialize the |key_materials_config| field of |c_arg_|, if it has not
- * already been done. **/
- if (c_arg_->key_materials_config == nullptr) {
- c_arg_->key_materials_config = grpc_tls_key_materials_config_create();
- }
- /** Convert |pem_key_cert_pair_list| to an inlined vector of ssl pairs. **/
- ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1>
- c_pem_key_cert_pair_list;
- for (const auto& key_cert_pair : pem_key_cert_pair_list) {
- c_pem_key_cert_pair_list.emplace_back(
- ConvertToCorePemKeyCertPair(key_cert_pair));
- }
- /** Populate the key materials config field of |c_arg_|. **/
- c_arg_->key_materials_config->set_key_materials(pem_root_certs.c_str(),
- c_pem_key_cert_pair_list);
-}
-
-void TlsCredentialReloadArg::set_key_materials_config(
- const std::shared_ptr<TlsKeyMaterialsConfig>& key_materials_config) {
- if (key_materials_config == nullptr) {
- c_arg_->key_materials_config = nullptr;
- return;
- }
- ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1>
- c_pem_key_cert_pair_list;
- for (const auto& key_cert_pair :
- key_materials_config->pem_key_cert_pair_list()) {
- grpc_ssl_pem_key_cert_pair* ssl_pair =
- (grpc_ssl_pem_key_cert_pair*)gpr_malloc(
- sizeof(grpc_ssl_pem_key_cert_pair));
- ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str());
- ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str());
- ::grpc_core::PemKeyCertPair c_pem_key_cert_pair =
- ::grpc_core::PemKeyCertPair(ssl_pair);
- c_pem_key_cert_pair_list.emplace_back(std::move(c_pem_key_cert_pair));
- }
- ::grpc_core::UniquePtr<char> c_pem_root_certs(
- gpr_strdup(key_materials_config->pem_root_certs().c_str()));
- if (c_arg_->key_materials_config == nullptr) {
- c_arg_->key_materials_config = grpc_tls_key_materials_config_create();
- }
- c_arg_->key_materials_config->set_key_materials(
- key_materials_config->pem_root_certs().c_str(), c_pem_key_cert_pair_list);
- c_arg_->key_materials_config->set_version(key_materials_config->version());
-}
-
-void TlsCredentialReloadArg::set_status(
- grpc_ssl_certificate_config_reload_status status) {
- c_arg_->status = status;
-}
-
-void TlsCredentialReloadArg::set_error_details(
- const TString& error_details) {
- c_arg_->error_details->set_error_details(error_details.c_str());
-}
-
-void TlsCredentialReloadArg::OnCredentialReloadDoneCallback() {
- if (c_arg_->cb == nullptr) {
- gpr_log(GPR_ERROR, "credential reload arg callback API is nullptr");
- return;
- }
- c_arg_->cb(c_arg_);
-}
-
-/** gRPC TLS credential reload config API implementation **/
-TlsCredentialReloadConfig::TlsCredentialReloadConfig(
- std::shared_ptr<TlsCredentialReloadInterface> credential_reload_interface)
- : credential_reload_interface_(std::move(credential_reload_interface)) {
- c_config_ = grpc_tls_credential_reload_config_create(
- nullptr, &TlsCredentialReloadConfigCSchedule,
- &TlsCredentialReloadConfigCCancel, nullptr);
- c_config_->set_context(static_cast<void*>(this));
-}
-
-TlsCredentialReloadConfig::~TlsCredentialReloadConfig() {}
-
/** gRPC TLS server authorization check arg API implementation **/
TlsServerAuthorizationCheckArg::TlsServerAuthorizationCheckArg(
grpc_tls_server_authorization_check_arg* arg)
: c_arg_(arg) {
- if (c_arg_ != nullptr && c_arg_->context != nullptr) {
+ GPR_ASSERT(c_arg_ != nullptr);
+ if (c_arg_->context != nullptr) {
gpr_log(GPR_ERROR, "c_arg context has already been set");
}
c_arg_->context = static_cast<void*>(this);
@@ -265,7 +109,6 @@ void TlsServerAuthorizationCheckArg::OnServerAuthorizationCheckDoneCallback() {
c_arg_->cb(c_arg_);
}
-/** gRPC TLS server authorization check config API implementation. **/
TlsServerAuthorizationCheckConfig::TlsServerAuthorizationCheckConfig(
std::shared_ptr<TlsServerAuthorizationCheckInterface>
server_authorization_check_interface)
@@ -277,67 +120,69 @@ TlsServerAuthorizationCheckConfig::TlsServerAuthorizationCheckConfig(
c_config_->set_context(static_cast<void*>(this));
}
-TlsServerAuthorizationCheckConfig::~TlsServerAuthorizationCheckConfig() {}
-
-/** gRPC TLS credential options API implementation **/
-TlsCredentialsOptions::TlsCredentialsOptions(
- grpc_tls_server_verification_option server_verification_option,
- std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config,
- std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config,
- std::shared_ptr<TlsServerAuthorizationCheckConfig>
- server_authorization_check_config)
- : TlsCredentialsOptions(
- GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, server_verification_option,
- std::move(key_materials_config), std::move(credential_reload_config),
- std::move(server_authorization_check_config)) {}
-
-TlsCredentialsOptions::TlsCredentialsOptions(
- grpc_ssl_client_certificate_request_type cert_request_type,
- std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config,
- std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config)
- : TlsCredentialsOptions(cert_request_type, GRPC_TLS_SERVER_VERIFICATION,
- std::move(key_materials_config),
- std::move(credential_reload_config), nullptr) {}
-
-TlsCredentialsOptions::TlsCredentialsOptions(
- grpc_ssl_client_certificate_request_type cert_request_type,
- grpc_tls_server_verification_option server_verification_option,
- std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config,
- std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config,
- std::shared_ptr<TlsServerAuthorizationCheckConfig>
- server_authorization_check_config)
- : cert_request_type_(cert_request_type),
- server_verification_option_(server_verification_option),
- key_materials_config_(std::move(key_materials_config)),
- credential_reload_config_(std::move(credential_reload_config)),
- server_authorization_check_config_(
- std::move(server_authorization_check_config)) {
+TlsServerAuthorizationCheckConfig::~TlsServerAuthorizationCheckConfig() {
+ grpc_tls_server_authorization_check_config_release(c_config_);
+}
+
+TlsCredentialsOptions::TlsCredentialsOptions() {
c_credentials_options_ = grpc_tls_credentials_options_create();
- grpc_tls_credentials_options_set_cert_request_type(c_credentials_options_,
- cert_request_type_);
- if (key_materials_config_ != nullptr) {
- grpc_tls_credentials_options_set_key_materials_config(
- c_credentials_options_,
- ConvertToCKeyMaterialsConfig(key_materials_config_));
- }
- if (credential_reload_config_ != nullptr) {
- grpc_tls_credentials_options_set_credential_reload_config(
- c_credentials_options_, credential_reload_config_->c_config());
+}
+
+void TlsCredentialsOptions::set_certificate_provider(
+ std::shared_ptr<CertificateProviderInterface> certificate_provider) {
+ certificate_provider_ = std::move(certificate_provider);
+ if (certificate_provider_ != nullptr) {
+ grpc_tls_credentials_options_set_certificate_provider(
+ c_credentials_options_, certificate_provider_->c_provider());
}
- if (server_authorization_check_config_ != nullptr) {
+}
+
+void TlsCredentialsOptions::watch_root_certs() {
+ grpc_tls_credentials_options_watch_root_certs(c_credentials_options_);
+}
+
+void TlsCredentialsOptions::set_root_cert_name(
+ const TString& root_cert_name) {
+ grpc_tls_credentials_options_set_root_cert_name(c_credentials_options_,
+ root_cert_name.c_str());
+}
+
+void TlsCredentialsOptions::watch_identity_key_cert_pairs() {
+ grpc_tls_credentials_options_watch_identity_key_cert_pairs(
+ c_credentials_options_);
+}
+
+void TlsCredentialsOptions::set_identity_cert_name(
+ const TString& identity_cert_name) {
+ grpc_tls_credentials_options_set_identity_cert_name(
+ c_credentials_options_, identity_cert_name.c_str());
+}
+
+void TlsChannelCredentialsOptions::set_server_verification_option(
+ grpc_tls_server_verification_option server_verification_option) {
+ grpc_tls_credentials_options* options = c_credentials_options();
+ GPR_ASSERT(options != nullptr);
+ grpc_tls_credentials_options_set_server_verification_option(
+ options, server_verification_option);
+}
+
+void TlsChannelCredentialsOptions::set_server_authorization_check_config(
+ std::shared_ptr<TlsServerAuthorizationCheckConfig> config) {
+ grpc_tls_credentials_options* options = c_credentials_options();
+ GPR_ASSERT(options != nullptr);
+ if (config != nullptr) {
grpc_tls_credentials_options_set_server_authorization_check_config(
- c_credentials_options_, server_authorization_check_config_->c_config());
+ options, config->c_config());
}
- grpc_tls_credentials_options_set_server_verification_option(
- c_credentials_options_, server_verification_option);
}
-/** Whenever a TlsCredentialsOptions instance is created, the caller takes
- * ownership of the c_credentials_options_ pointer (see e.g. the implementation
- * of the TlsCredentials API in secure_credentials.cc). For this reason, the
- * TlsCredentialsOptions destructor is not responsible for freeing
- * c_credentials_options_. **/
-TlsCredentialsOptions::~TlsCredentialsOptions() {}
+void TlsServerCredentialsOptions::set_cert_request_type(
+ grpc_ssl_client_certificate_request_type cert_request_type) {
+ grpc_tls_credentials_options* options = c_credentials_options();
+ GPR_ASSERT(options != nullptr);
+ grpc_tls_credentials_options_set_cert_request_type(options,
+ cert_request_type);
+}
} // namespace experimental
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc
index ed84003212..920f897bd7 100644
--- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc
+++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc
@@ -19,85 +19,12 @@
#include "y_absl/container/inlined_vector.h"
#include <grpcpp/security/tls_credentials_options.h>
+
#include "src/cpp/common/tls_credentials_options_util.h"
namespace grpc {
namespace experimental {
-/** Converts the Cpp key materials to C key materials; this allocates memory for
- * the C key materials. Note that the user must free
- * the underlying pointer to private key and cert chain duplicates; they are not
- * freed when the grpc_core::UniquePtr<char> member variables of PemKeyCertPair
- * are unused. Similarly, the user must free the underlying pointer to
- * c_pem_root_certs. **/
-grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig(
- const std::shared_ptr<TlsKeyMaterialsConfig>& config) {
- if (config == nullptr) {
- return nullptr;
- }
- grpc_tls_key_materials_config* c_config =
- grpc_tls_key_materials_config_create();
- ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1>
- c_pem_key_cert_pair_list;
- for (const auto& key_cert_pair : config->pem_key_cert_pair_list()) {
- grpc_ssl_pem_key_cert_pair* ssl_pair =
- (grpc_ssl_pem_key_cert_pair*)gpr_malloc(
- sizeof(grpc_ssl_pem_key_cert_pair));
- ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str());
- ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str());
- ::grpc_core::PemKeyCertPair c_pem_key_cert_pair =
- ::grpc_core::PemKeyCertPair(ssl_pair);
- c_pem_key_cert_pair_list.push_back(::std::move(c_pem_key_cert_pair));
- }
- c_config->set_key_materials(config->pem_root_certs().c_str(),
- c_pem_key_cert_pair_list);
- c_config->set_version(config->version());
- return c_config;
-}
-
-/** The C schedule and cancel functions for the credential reload config.
- * They populate a C credential reload arg with the result of a C++ credential
- * reload schedule/cancel API. **/
-int TlsCredentialReloadConfigCSchedule(void* /*config_user_data*/,
- grpc_tls_credential_reload_arg* arg) {
- if (arg == nullptr || arg->config == nullptr ||
- arg->config->context() == nullptr) {
- gpr_log(GPR_ERROR, "credential reload arg was not properly initialized");
- return 1;
- }
- TlsCredentialReloadConfig* cpp_config =
- static_cast<TlsCredentialReloadConfig*>(arg->config->context());
- TlsCredentialReloadArg* cpp_arg = new TlsCredentialReloadArg(arg);
- int schedule_result = cpp_config->Schedule(cpp_arg);
- return schedule_result;
-}
-
-void TlsCredentialReloadConfigCCancel(void* /*config_user_data*/,
- grpc_tls_credential_reload_arg* arg) {
- if (arg == nullptr || arg->config == nullptr ||
- arg->config->context() == nullptr) {
- gpr_log(GPR_ERROR, "credential reload arg was not properly initialized");
- return;
- }
- if (arg->context == nullptr) {
- gpr_log(GPR_ERROR, "credential reload arg schedule has already completed");
- return;
- }
- TlsCredentialReloadConfig* cpp_config =
- static_cast<TlsCredentialReloadConfig*>(arg->config->context());
- TlsCredentialReloadArg* cpp_arg =
- static_cast<TlsCredentialReloadArg*>(arg->context);
- cpp_config->Cancel(cpp_arg);
-}
-
-void TlsCredentialReloadArgDestroyContext(void* context) {
- if (context != nullptr) {
- TlsCredentialReloadArg* cpp_arg =
- static_cast<TlsCredentialReloadArg*>(context);
- delete cpp_arg;
- }
-}
-
/** The C schedule and cancel functions for the server authorization check
* config. They populate a C server authorization check arg with the result
* of a C++ server authorization check schedule/cancel API. **/
diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h
index 4ee04d15d7..d6247219fb 100644
--- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h
+++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h
@@ -27,29 +27,15 @@
namespace grpc {
namespace experimental {
-/** The following function is exposed for testing purposes. **/
-grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig(
- const std::shared_ptr<TlsKeyMaterialsConfig>& config);
-
-/** The following 4 functions convert the user-provided schedule or cancel
+/** The following 2 functions convert the user-provided schedule or cancel
* functions into C style schedule or cancel functions. These are internal
* functions, not meant to be accessed by the user. **/
-int TlsCredentialReloadConfigCSchedule(void* config_user_data,
- grpc_tls_credential_reload_arg* arg);
-
-void TlsCredentialReloadConfigCCancel(void* config_user_data,
- grpc_tls_credential_reload_arg* arg);
-
int TlsServerAuthorizationCheckConfigCSchedule(
void* config_user_data, grpc_tls_server_authorization_check_arg* arg);
void TlsServerAuthorizationCheckConfigCCancel(
void* config_user_data, grpc_tls_server_authorization_check_arg* arg);
-/** The following 2 functions cleanup data created in the above C schedule
- * functions. **/
-void TlsCredentialReloadArgDestroyContext(void* context);
-
void TlsServerAuthorizationCheckArgDestroyContext(void* context);
} // namespace experimental
diff --git a/contrib/libs/grpc/src/cpp/common/version_cc.cc b/contrib/libs/grpc/src/cpp/common/version_cc.cc
index 7f4228346a..cb9895b58f 100644
--- a/contrib/libs/grpc/src/cpp/common/version_cc.cc
+++ b/contrib/libs/grpc/src/cpp/common/version_cc.cc
@@ -22,5 +22,5 @@
#include <grpcpp/grpcpp.h>
namespace grpc {
-TString Version() { return "1.33.2"; }
+TString Version() { return "1.37.1"; }
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc
index 1b388210c0..2a610ff8a9 100644
--- a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc
+++ b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc
@@ -28,7 +28,6 @@ using grpc::StatusCode;
using grpc::reflection::v1alpha::ErrorResponse;
using grpc::reflection::v1alpha::ExtensionNumberResponse;
using grpc::reflection::v1alpha::ExtensionRequest;
-using grpc::reflection::v1alpha::FileDescriptorResponse;
using grpc::reflection::v1alpha::ListServiceResponse;
using grpc::reflection::v1alpha::ServerReflectionRequest;
using grpc::reflection::v1alpha::ServerReflectionResponse;
@@ -110,14 +109,14 @@ Status ProtoServerReflection::ListService(ServerContext* /*context*/,
}
Status ProtoServerReflection::GetFileByName(
- ServerContext* /*context*/, const TString& filename,
+ ServerContext* /*context*/, const TString& file_name,
ServerReflectionResponse* response) {
if (descriptor_pool_ == nullptr) {
return Status::CANCELLED;
}
const protobuf::FileDescriptor* file_desc =
- descriptor_pool_->FindFileByName(TProtoStringType(filename));
+ descriptor_pool_->FindFileByName(TProtoStringType(file_name));
if (file_desc == nullptr) {
return Status(StatusCode::NOT_FOUND, "File not found.");
}
diff --git a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc
index 9aad932429..fce23e49d5 100644
--- a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc
+++ b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc
@@ -27,10 +27,10 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
StringOption(const TString& name, const TString& value)
: name_(name), value_(value) {}
- virtual void UpdateArguments(ChannelArguments* args) override {
+ void UpdateArguments(ChannelArguments* args) override {
args->SetString(name_, value_);
}
- virtual void UpdatePlugins(
+ void UpdatePlugins(
std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/)
override {}
@@ -48,10 +48,10 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption(
IntOption(const TString& name, int value)
: name_(name), value_(value) {}
- virtual void UpdateArguments(ChannelArguments* args) override {
+ void UpdateArguments(ChannelArguments* args) override {
args->SetInt(name_, value_);
}
- virtual void UpdatePlugins(
+ void UpdatePlugins(
std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/)
override {}
diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
index ae26a447ab..b18624e3b6 100644
--- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
+++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc
@@ -67,13 +67,6 @@ CreateChannelzServicePlugin() {
new ChannelzServicePlugin());
}
-} // namespace experimental
-} // namespace channelz
-} // namespace grpc
-namespace grpc_impl {
-namespace channelz {
-namespace experimental {
-
void InitChannelzService() {
static struct Initializer {
Initializer() {
@@ -85,4 +78,4 @@ void InitChannelzService() {
} // namespace experimental
} // namespace channelz
-} // namespace grpc_impl
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc
index 77c5d6a263..e96dc4c455 100644
--- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc
+++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc
@@ -27,11 +27,12 @@ namespace grpc {
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
: pool_(pool),
- thd_("grpcpp_dynamic_pool",
- [](void* th) {
- static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
- },
- this) {
+ thd_(
+ "grpcpp_dynamic_pool",
+ [](void* th) {
+ static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
+ },
+ this) {
thd_.Start();
}
DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
@@ -67,7 +68,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!callbacks_.empty()) {
auto cb = callbacks_.front();
callbacks_.pop();
- lock.Unlock();
+ lock.Release();
cb();
} else if (shutdown_) {
break;
@@ -96,7 +97,7 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
DynamicThreadPool::~DynamicThreadPool() {
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
- cv_.Broadcast();
+ cv_.SignalAll();
while (nthreads_ != 0) {
shutdown_cv_.Wait(&mu_);
}
diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h
index 6f9f943bc3..954aaff35d 100644
--- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h
+++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h
@@ -34,14 +34,14 @@ namespace grpc {
class DynamicThreadPool final : public ThreadPoolInterface {
public:
explicit DynamicThreadPool(int reserve_threads);
- ~DynamicThreadPool();
+ ~DynamicThreadPool() override;
void Add(const std::function<void()>& callback) override;
private:
class DynamicThread {
public:
- DynamicThread(DynamicThreadPool* pool);
+ explicit DynamicThread(DynamicThreadPool* pool);
~DynamicThread();
private:
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
index 3cc508d0cb..f39270924b 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc
@@ -18,6 +18,7 @@
#include <memory>
+#include "y_absl/memory/memory.h"
#include "upb/upb.hpp"
#include <grpc/slice.h>
@@ -114,7 +115,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq) {
GPR_ASSERT(impl_ == nullptr);
- impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
+ impl_ = y_absl::make_unique<HealthCheckServiceImpl>(this, std::move(cq));
return impl_.get();
}
@@ -160,8 +161,8 @@ DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
// Create serving thread.
- thread_ = std::unique_ptr<::grpc_core::Thread>(
- new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
+ thread_ = y_absl::make_unique<::grpc_core::Thread>("grpc_health_check_service",
+ Serve, this);
}
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
@@ -242,10 +243,9 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
grpc_health_v1_HealthCheckResponse_new(arena.ptr());
grpc_health_v1_HealthCheckResponse_set_status(
response_struct,
- status == NOT_FOUND
- ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
- : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
- : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
+ status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
+ : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
+ : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
size_t buf_length;
char* buf = grpc_health_v1_HealthCheckResponse_serialize(
response_struct, arena.ptr(), &buf_length);
diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
index 9da1dfc15f..fe3d2b219a 100644
--- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
+++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h
@@ -56,7 +56,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
HealthCheckServiceImpl(DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq);
- ~HealthCheckServiceImpl();
+ ~HealthCheckServiceImpl() override;
void StartServingThread();
diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
index 3f33f4e045..4eb492e073 100644
--- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc
@@ -33,6 +33,9 @@ class InsecureServerCredentialsImpl final : public ServerCredentials {
(void)processor;
GPR_ASSERT(0); // Should not be called on InsecureServerCredentials.
}
+
+ private:
+ bool IsInsecure() const override { return true; }
};
} // namespace
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
index 00ad794a04..dc9fadeab0 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h
@@ -21,6 +21,8 @@
#include <grpc/impl/codegen/port_platform.h>
+#include <stddef.h>
+
namespace grpc {
namespace load_reporter {
diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
index f07fa812a7..16542bfddf 100644
--- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
+++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc
@@ -43,7 +43,7 @@ bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
auto it = map.find(key);
if (it != map.end()) {
size_t erased = it->second.erase(value);
- if (it->second.size() == 0) {
+ if (it->second.empty()) {
map.erase(it);
}
return erased;
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
index 732602bcb7..b6ea87f4fe 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc
@@ -55,18 +55,18 @@ void AuthMetadataProcessorAyncWrapper::Process(
}
void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
- grpc_auth_context* ctx, const grpc_metadata* md, size_t num_md,
+ grpc_auth_context* context, const grpc_metadata* md, size_t num_md,
grpc_process_auth_metadata_done_cb cb, void* user_data) {
AuthMetadataProcessor::InputMetadata metadata;
for (size_t i = 0; i < num_md; i++) {
metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key),
StringRefFromSlice(&md[i].value)));
}
- SecureAuthContext context(ctx);
+ SecureAuthContext ctx(context);
AuthMetadataProcessor::OutputMetadata consumed_metadata;
AuthMetadataProcessor::OutputMetadata response_metadata;
- Status status = processor_->Process(metadata, &context, &consumed_metadata,
+ Status status = processor_->Process(metadata, &ctx, &consumed_metadata,
&response_metadata);
std::vector<grpc_metadata> consumed_md;
@@ -145,8 +145,7 @@ std::shared_ptr<ServerCredentials> LocalServerCredentials(
}
std::shared_ptr<ServerCredentials> TlsServerCredentials(
- const grpc::experimental::TlsCredentialsOptions& options) {
- grpc::GrpcLibraryCodegen init;
+ const grpc::experimental::TlsServerCredentialsOptions& options) {
return std::shared_ptr<ServerCredentials>(new SecureServerCredentials(
grpc_tls_server_credentials_create(options.c_credentials_options())));
}
diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
index 9e3fb3f9eb..f2b65c2864 100644
--- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
+++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h
@@ -40,7 +40,7 @@ class AuthMetadataProcessorAyncWrapper final {
const grpc_metadata* md, size_t num_md,
grpc_process_auth_metadata_done_cb cb, void* user_data);
- AuthMetadataProcessorAyncWrapper(
+ explicit AuthMetadataProcessorAyncWrapper(
const std::shared_ptr<AuthMetadataProcessor>& processor)
: processor_(processor) {
if (processor && processor->IsBlocking()) {
@@ -69,7 +69,11 @@ class SecureServerCredentials final : public ServerCredentials {
void SetAuthMetadataProcessor(
const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override;
+ grpc_server_credentials* c_creds() { return creds_; }
+
private:
+ SecureServerCredentials* AsSecureServerCredentials() override { return this; }
+
grpc_server_credentials* creds_;
std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_;
};
diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc
index 0cc00b365f..502d4f20ae 100644
--- a/contrib/libs/grpc/src/cpp/server/server_builder.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc
@@ -83,9 +83,9 @@ ServerBuilder& ServerBuilder::RegisterService(Service* service) {
return *this;
}
-ServerBuilder& ServerBuilder::RegisterService(const TString& addr,
+ServerBuilder& ServerBuilder::RegisterService(const TString& host,
Service* service) {
- services_.emplace_back(new NamedService(addr, service));
+ services_.emplace_back(new NamedService(host, service));
return *this;
}
@@ -95,7 +95,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple generic services is unsupported for now. "
"Dropping the service %p",
- (void*)service);
+ service);
} else {
generic_service_ = service;
}
@@ -122,7 +122,7 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
gpr_log(GPR_ERROR,
"Adding multiple generic services is unsupported for now. "
"Dropping the service %p",
- (void*)service);
+ service);
} else {
builder_->callback_generic_service_ = service;
}
@@ -130,6 +130,12 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
}
#endif
+ServerBuilder& ServerBuilder::experimental_type::SetContextAllocator(
+ std::unique_ptr<grpc::ContextAllocator> context_allocator) {
+ builder_->context_allocator_ = std::move(context_allocator);
+ return *builder_;
+}
+
std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
experimental_type::ExternalConnectionType type,
@@ -331,7 +337,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<grpc::Server> server(new grpc::Server(
&args, sync_server_cqs, sync_server_settings_.min_pollers,
sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
- std::move(acceptors_), resource_quota_,
+ std::move(acceptors_), server_config_fetcher_, resource_quota_,
std::move(interceptor_creators_)));
ServerInitializer* initializer = server->initializer();
@@ -369,6 +375,13 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
+#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
+ server->RegisterContextAllocator(std::move(context_allocator_));
+#else
+ server->experimental_registration()->RegisterContextAllocator(
+ std::move(context_allocator_));
+#endif
+
for (const auto& value : services_) {
if (!server->RegisterService(value->host.get(), value->service)) {
return nullptr;
diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc
index 40aef8e735..f6b72c0fcc 100644
--- a/contrib/libs/grpc/src/cpp/server/server_callback.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc
@@ -35,14 +35,14 @@ void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) {
grpc_closure closure;
ServerCallbackCall* call;
explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) {
- GRPC_CLOSURE_INIT(&closure,
- [](void* void_arg, grpc_error*) {
- ClosureWithArg* arg =
- static_cast<ClosureWithArg*>(void_arg);
- arg->call->CallOnDone();
- delete arg;
- },
- this, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(
+ &closure,
+ [](void* void_arg, grpc_error*) {
+ ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
+ arg->call->CallOnDone();
+ delete arg;
+ },
+ this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this);
@@ -64,15 +64,15 @@ void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) {
ServerReactor* reactor;
ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg)
: call(call_arg), reactor(reactor_arg) {
- GRPC_CLOSURE_INIT(&closure,
- [](void* void_arg, grpc_error*) {
- ClosureWithArg* arg =
- static_cast<ClosureWithArg*>(void_arg);
- arg->reactor->OnCancel();
- arg->call->MaybeDone();
- delete arg;
- },
- this, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(
+ &closure,
+ [](void* void_arg, grpc_error*) {
+ ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
+ arg->reactor->OnCancel();
+ arg->call->MaybeDone();
+ delete arg;
+ },
+ this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this, reactor);
diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc
index c2a911c7f7..dab7f488a7 100644
--- a/contrib/libs/grpc/src/cpp/server/server_cc.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc
@@ -42,8 +42,12 @@
#include <grpcpp/server_context.h>
#include <grpcpp/support/time.h>
+#include "y_absl/memory/memory.h"
+
#include "src/core/ext/transport/inproc/inproc_transport.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -83,12 +87,16 @@ void InitGlobalCallbacks() {
class ShutdownTag : public internal::CompletionQueueTag {
public:
- bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; }
+ bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
+ return false;
+ }
};
-class DummyTag : public internal::CompletionQueueTag {
+class PhonyTag : public internal::CompletionQueueTag {
public:
- bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; }
+ bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
+ return true;
+ }
};
class UnimplementedAsyncRequestContext {
@@ -311,8 +319,8 @@ class Server::UnimplementedAsyncResponse final
grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus> {
public:
- UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
- ~UnimplementedAsyncResponse() { delete request_; }
+ explicit UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
+ ~UnimplementedAsyncResponse() override { delete request_; }
bool FinalizeResult(void** tag, bool* status) override {
if (grpc::internal::CallOpSet<
@@ -332,199 +340,171 @@ class Server::UnimplementedAsyncResponse final
class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
public:
- SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag)
- : method_(method),
- method_tag_(method_tag),
- in_flight_(false),
- has_request_payload_(method->method_type() ==
- grpc::internal::RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- grpc::internal::RpcMethod::SERVER_STREAMING),
- call_details_(nullptr),
- cq_(nullptr) {
- grpc_metadata_array_init(&request_metadata_);
- }
-
- ~SyncRequest() {
- if (call_details_) {
- delete call_details_;
- }
- grpc_metadata_array_destroy(&request_metadata_);
- }
-
- void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
-
- void TeardownRequest() {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
+ grpc_core::Server::RegisteredCallAllocation* data)
+ : SyncRequest(server, method) {
+ CommonSetup(data);
+ data->deadline = &deadline_;
+ data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
}
- void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
- GPR_ASSERT(cq_ && !in_flight_);
- in_flight_ = true;
- if (method_tag_) {
- if (grpc_server_request_registered_call(
- server, method_tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this) != GRPC_CALL_OK) {
- TeardownRequest();
- return;
- }
- } else {
- if (!call_details_) {
- call_details_ = new grpc_call_details;
- grpc_call_details_init(call_details_);
- }
- if (grpc_server_request_call(server, &call_, call_details_,
- &request_metadata_, cq_, notify_cq,
- this) != GRPC_CALL_OK) {
- TeardownRequest();
- return;
- }
- }
+ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
+ grpc_core::Server::BatchCallAllocation* data)
+ : SyncRequest(server, method) {
+ CommonSetup(data);
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ data->details = call_details_;
}
- void PostShutdownCleanup() {
- if (call_) {
- grpc_call_unref(call_);
- call_ = nullptr;
+ ~SyncRequest() override {
+ // The destructor should only cleanup those objects created in the
+ // constructor, since some paths may or may not actually go through the
+ // Run stage where other objects are allocated.
+ if (has_request_payload_ && request_payload_) {
+ grpc_byte_buffer_destroy(request_payload_);
}
- if (cq_) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ if (call_details_ != nullptr) {
+ grpc_call_details_destroy(call_details_);
+ delete call_details_;
}
+ grpc_metadata_array_destroy(&request_metadata_);
+ server_->UnrefWithPossibleNotify();
}
bool FinalizeResult(void** /*tag*/, bool* status) override {
if (!*status) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ delete this;
+ return false;
}
if (call_details_) {
deadline_ = call_details_->deadline;
- grpc_call_details_destroy(call_details_);
- grpc_call_details_init(call_details_);
}
return true;
}
- // The CallData class represents a call that is "active" as opposed
- // to just being requested. It wraps and takes ownership of the cq from
- // the call request
- class CallData final {
- public:
- explicit CallData(Server* server, SyncRequest* mrd)
- : cq_(mrd->cq_),
- ctx_(mrd->deadline_, &mrd->request_metadata_),
- has_request_payload_(mrd->has_request_payload_),
- request_payload_(has_request_payload_ ? mrd->request_payload_
- : nullptr),
- request_(nullptr),
- method_(mrd->method_),
- call_(
- mrd->call_, server, &cq_, server->max_receive_message_size(),
- ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
- server->interceptor_creators_)),
- server_(server),
- global_callbacks_(nullptr),
- resources_(false) {
- ctx_.set_call(mrd->call_);
- ctx_.cq_ = &cq_;
- GPR_ASSERT(mrd->in_flight_);
- mrd->in_flight_ = false;
- mrd->request_metadata_.count = 0;
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
+ ctx_.Init(deadline_, &request_metadata_);
+ wrapped_call_.Init(
+ call_, server_, &cq_, server_->max_receive_message_size(),
+ ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(),
+ server_->interceptor_creators_));
+ ctx_->ctx.set_call(call_);
+ ctx_->ctx.cq_ = &cq_;
+ request_metadata_.count = 0;
+
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
+
+ interceptor_methods_.SetCall(&*wrapped_call_);
+ interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
+ grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_);
+
+ if (has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ deserialized_request_ = handler->Deserialize(call_, request_payload_,
+ &request_status_, nullptr);
+
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
+ grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr);
}
- ~CallData() {
- if (has_request_payload_ && request_payload_) {
- grpc_byte_buffer_destroy(request_payload_);
- }
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
}
+ }
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
- bool resources) {
- global_callbacks_ = global_callbacks;
- resources_ = resources;
+ void ContinueRunAfterInterception() {
+ ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr);
+ global_callbacks_->PreSynchronousRequest(&ctx_->ctx);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
+ &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_,
+ nullptr, nullptr));
+ global_callbacks_->PostSynchronousRequest(&ctx_->ctx);
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- interceptor_methods_.AddInterceptionHookPoint(
- grpc::experimental::InterceptionHookPoints::
- POST_RECV_INITIAL_METADATA);
- interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+ cq_.Shutdown();
- if (has_request_payload_) {
- // Set interception point for RECV MESSAGE
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- request_ = handler->Deserialize(call_.call(), request_payload_,
- &request_status_, nullptr);
+ grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag();
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
- request_payload_ = nullptr;
- interceptor_methods_.AddInterceptionHookPoint(
- grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_, nullptr);
- }
+ // Ensure the cq_ is shutdown
+ grpc::PhonyTag ignored_tag;
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- if (interceptor_methods_.RunInterceptors(
- [this]() { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
- }
+ // Cleanup structures allocated during Run/ContinueRunAfterInterception
+ wrapped_call_.Destroy();
+ ctx_.Destroy();
- void ContinueRunAfterInterception() {
- {
- ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
- global_callbacks_->PreSynchronousRequest(&ctx_);
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
- &call_, &ctx_, request_, request_status_, nullptr, nullptr));
- request_ = nullptr;
- global_callbacks_->PostSynchronousRequest(&ctx_);
-
- cq_.Shutdown();
-
- grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
- cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
-
- /* Ensure the cq_ is shutdown */
- grpc::DummyTag ignored_tag;
- GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- }
- delete this;
- }
+ delete this;
+ }
- private:
- grpc::CompletionQueue cq_;
- grpc::ServerContext ctx_;
- const bool has_request_payload_;
- grpc_byte_buffer* request_payload_;
- void* request_;
- grpc::Status request_status_;
- grpc::internal::RpcServiceMethod* const method_;
- grpc::internal::Call call_;
- Server* server_;
- std::shared_ptr<GlobalCallbacks> global_callbacks_;
- bool resources_;
- grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
- };
+ // For requests that must be only cleaned up but not actually Run
+ void Cleanup() {
+ cq_.Shutdown();
+ grpc_call_unref(call_);
+ delete this;
+ }
private:
+ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method)
+ : server_(server),
+ method_(method),
+ has_request_payload_(method->method_type() ==
+ grpc::internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() ==
+ grpc::internal::RpcMethod::SERVER_STREAMING),
+ cq_(grpc_completion_queue_create_for_pluck(nullptr)) {}
+
+ template <class CallAllocation>
+ void CommonSetup(CallAllocation* data) {
+ server_->Ref();
+ grpc_metadata_array_init(&request_metadata_);
+ data->tag = static_cast<void*>(this);
+ data->call = &call_;
+ data->initial_metadata = &request_metadata_;
+ data->cq = cq_.cq();
+ }
+
+ Server* const server_;
grpc::internal::RpcServiceMethod* const method_;
- void* const method_tag_;
- bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
- grpc_call_details* call_details_;
+ grpc_call_details* call_details_ = nullptr;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
- grpc_byte_buffer* request_payload_;
- grpc_completion_queue* cq_;
+ grpc_byte_buffer* request_payload_ = nullptr;
+ grpc::CompletionQueue cq_;
+ grpc::Status request_status_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
+ void* deserialized_request_ = nullptr;
+ grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
+
+ // ServerContextWrapper allows ManualConstructor while using a private
+ // contructor of ServerContext via this friend class.
+ struct ServerContextWrapper {
+ ServerContext ctx;
+
+ ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr)
+ : ctx(deadline, arr) {}
+ };
+
+ grpc_core::ManualConstructor<ServerContextWrapper> ctx_;
+ grpc_core::ManualConstructor<internal::Call> wrapped_call_;
};
template <class ServerContextType>
@@ -548,7 +528,10 @@ class Server::CallbackRequest final
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
cq_(cq),
- tag_(this) {
+ tag_(this),
+ ctx_(server_->context_allocator() != nullptr
+ ? server_->context_allocator()->NewCallbackServerContext()
+ : nullptr) {
CommonSetup(server, data);
data->deadline = &deadline_;
data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
@@ -563,18 +546,25 @@ class Server::CallbackRequest final
has_request_payload_(false),
call_details_(new grpc_call_details),
cq_(cq),
- tag_(this) {
+ tag_(this),
+ ctx_(server_->context_allocator() != nullptr
+ ? server_->context_allocator()
+ ->NewGenericCallbackServerContext()
+ : nullptr) {
CommonSetup(server, data);
grpc_call_details_init(call_details_);
data->details = call_details_;
}
- ~CallbackRequest() {
+ ~CallbackRequest() override {
delete call_details_;
grpc_metadata_array_destroy(&request_metadata_);
if (has_request_payload_ && request_payload_) {
grpc_byte_buffer_destroy(request_payload_);
}
+ if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) {
+ default_ctx_.Destroy();
+ }
server_->UnrefWithPossibleNotify();
}
@@ -588,7 +578,7 @@ class Server::CallbackRequest final
class CallbackCallTag : public grpc_experimental_completion_queue_functor {
public:
- CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
+ explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
functor_run = &CallbackCallTag::StaticRun;
// Set inlineable to true since this callback is internally-controlled
@@ -627,10 +617,10 @@ class Server::CallbackRequest final
}
// Bind the call, deadline, and metadata from what we got
- req_->ctx_.set_call(req_->call_);
- req_->ctx_.cq_ = req_->cq_;
- req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
- &req_->request_metadata_);
+ req_->ctx_->set_call(req_->call_);
+ req_->ctx_->cq_ = req_->cq_;
+ req_->ctx_->BindDeadlineAndMetadata(req_->deadline_,
+ &req_->request_metadata_);
req_->request_metadata_.count = 0;
// Create a C++ Call to control the underlying core call
@@ -639,7 +629,7 @@ class Server::CallbackRequest final
grpc::internal::Call(
req_->call_, req_->server_, req_->cq_,
req_->server_->max_receive_message_size(),
- req_->ctx_.set_server_rpc_info(
+ req_->ctx_->set_server_rpc_info(
req_->method_name(),
(req_->method_ != nullptr)
? req_->method_->method_type()
@@ -653,7 +643,7 @@ class Server::CallbackRequest final
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
req_->interceptor_methods_.SetRecvInitialMetadata(
- &req_->ctx_.client_metadata_);
+ &req_->ctx_->client_metadata_);
if (req_->has_request_payload_) {
// Set interception point for RECV MESSAGE
@@ -679,7 +669,7 @@ class Server::CallbackRequest final
? req_->method_->handler()
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
- call_, &req_->ctx_, req_->request_, req_->request_status_,
+ call_, req_->ctx_, req_->request_, req_->request_status_,
req_->handler_data_, [this] { delete req_; }));
}
};
@@ -688,9 +678,16 @@ class Server::CallbackRequest final
void CommonSetup(Server* server, CallAllocation* data) {
server->Ref();
grpc_metadata_array_init(&request_metadata_);
- data->tag = &tag_;
+ data->tag = static_cast<void*>(&tag_);
data->call = &call_;
data->initial_metadata = &request_metadata_;
+ if (ctx_ == nullptr) {
+ default_ctx_.Init();
+ ctx_ = &*default_ctx_;
+ ctx_alloc_by_default_ = true;
+ }
+ ctx_->set_context_allocator(server->context_allocator());
+ data->cq = cq_->cq();
}
Server* const server_;
@@ -705,8 +702,10 @@ class Server::CallbackRequest final
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
grpc::CompletionQueue* const cq_;
+ bool ctx_alloc_by_default_ = false;
CallbackCallTag tag_;
- ServerContextType ctx_;
+ ServerContextType* ctx_ = nullptr;
+ grpc_core::ManualConstructor<ServerContextType> default_ctx_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
@@ -723,8 +722,8 @@ bool Server::CallbackRequest<
if (*status) {
deadline_ = call_details_->deadline;
// TODO(yangg) remove the copy here
- ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method);
- ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host);
+ ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method);
+ ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host);
}
grpc_slice_unref(call_details_->method);
grpc_slice_unref(call_details_->host);
@@ -740,7 +739,7 @@ const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
template <>
const char* Server::CallbackRequest<
grpc::GenericCallbackServerContext>::method_name() const {
- return ctx_.method().c_str();
+ return ctx_->method().c_str();
}
// Implementation of ThreadManager. Each instance of SyncRequestThreadManager
@@ -779,44 +778,39 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
void DoWork(void* tag, bool ok, bool resources) override {
+ (void)ok;
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- if (!sync_req) {
- // No tag. Nothing to work on. This is an unlikley scenario and possibly a
- // bug in RPC Manager implementation.
- gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
- return;
- }
-
- if (ok) {
- // Calldata takes ownership of the completion queue and interceptors
- // inside sync_req
- auto* cd = new SyncRequest::CallData(server_, sync_req);
- // Prepare for the next request
- if (!IsShutdown()) {
- sync_req->SetupRequest(); // Create new completion queue for sync_req
- sync_req->Request(server_->c_server(), server_cq_->cq());
- }
+ // Under the AllocatingRequestMatcher model we will never see an invalid tag
+ // here.
+ GPR_DEBUG_ASSERT(sync_req != nullptr);
+ GPR_DEBUG_ASSERT(ok);
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd->Run(global_callbacks_, resources);
- }
- // TODO (sreek) If ok is false here (which it isn't in case of
- // grpc_request_registered_call), we should still re-queue the request
- // object
+ GPR_TIMER_SCOPE("sync_req->Run()", 0);
+ sync_req->Run(global_callbacks_, resources);
}
void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
- sync_requests_.emplace_back(new SyncRequest(method, tag));
+ server_->server()->core_server->SetRegisteredMethodAllocator(
+ server_cq_->cq(), tag, [this, method] {
+ grpc_core::Server::RegisteredCallAllocation result;
+ new SyncRequest(server_, method, &result);
+ return result;
+ });
+ has_sync_method_ = true;
}
void AddUnknownSyncMethod() {
- if (!sync_requests_.empty()) {
- unknown_method_.reset(new grpc::internal::RpcServiceMethod(
+ if (has_sync_method_) {
+ unknown_method_ = y_absl::make_unique<grpc::internal::RpcServiceMethod>(
"unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
- new grpc::internal::UnknownMethodHandler));
- sync_requests_.emplace_back(
- new SyncRequest(unknown_method_.get(), nullptr));
+ new grpc::internal::UnknownMethodHandler);
+ server_->server()->core_server->SetBatchMethodAllocator(
+ server_cq_->cq(), [this] {
+ grpc_core::Server::BatchCallAllocation result;
+ new SyncRequest(server_, unknown_method_.get(), &result);
+ return result;
+ });
}
}
@@ -831,27 +825,14 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
- if (ok) {
- // If a request was pulled off the queue, it means that the thread
- // handling the request added it to the completion queue after shutdown
- // was called - because the thread had already started and checked the
- // shutdown flag before shutdown was called. In this case, we simply
- // clean it up here, *after* calling wait on all the worker threads, at
- // which point we are certain no in-flight requests will add more to the
- // queue. This fixes an intermittent memory leak on shutdown.
- SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- sync_req->PostShutdownCleanup();
- }
+ // This problem can arise if the server CQ gets a request queued to it
+ // before it gets shutdown but then pulls it after shutdown.
+ static_cast<SyncRequest*>(tag)->Cleanup();
}
}
void Start() {
- if (!sync_requests_.empty()) {
- for (const auto& value : sync_requests_) {
- value->SetupRequest();
- value->Request(server_->c_server(), server_cq_->cq());
- }
-
+ if (has_sync_method_) {
Initialize(); // ThreadManager's Initialize()
}
}
@@ -860,7 +841,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
Server* server_;
grpc::CompletionQueue* server_cq_;
int cq_timeout_msec_;
- std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
+ bool has_sync_method_ = false;
std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -873,6 +854,7 @@ Server::Server(
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
+ grpc_server_config_fetcher* server_config_fetcher,
grpc_resource_quota* server_rq,
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
@@ -936,13 +918,14 @@ Server::Server(
}
}
server_ = grpc_server_create(&channel_args, nullptr);
+ grpc_server_set_config_fetcher(server_, server_config_fetcher);
}
Server::~Server() {
{
grpc::internal::ReleasableMutexLock lock(&mu_);
if (started_ && !shutdown_) {
- lock.Unlock();
+ lock.Release();
Shutdown();
} else if (!started_) {
// Shutdown the completion queues
@@ -950,7 +933,12 @@ Server::~Server() {
value->Shutdown();
}
if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ callback_cq_->Shutdown();
+ } else {
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
+ }
callback_cq_ = nullptr;
}
}
@@ -1005,7 +993,7 @@ static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
}
-bool Server::RegisterService(const TString* host, grpc::Service* service) {
+bool Server::RegisterService(const TString* addr, grpc::Service* service) {
bool has_async_methods = service->has_async_methods();
if (has_async_methods) {
GPR_ASSERT(service->server_ == nullptr &&
@@ -1016,12 +1004,12 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) {
const char* method_name = nullptr;
for (const auto& method : service->methods_) {
- if (method.get() == nullptr) { // Handled by generic service if any.
+ if (method == nullptr) { // Handled by generic service if any.
continue;
}
void* method_registration_tag = grpc_server_register_method(
- server_, method->name(), host ? host->c_str() : nullptr,
+ server_, method->name(), addr ? addr->c_str() : nullptr,
PayloadHandlingForMethod(method.get()), 0);
if (method_registration_tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
@@ -1118,7 +1106,9 @@ void Server::UnrefAndWaitLocked() {
shutdown_done_ = true;
return; // no need to wait on CV since done condition already set
}
- shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; });
+ grpc::internal::WaitUntil(
+ &shutdown_done_cv_, &mu_,
+ [this]() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; });
}
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
@@ -1157,7 +1147,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// service to handle any unimplemented methods using the default reactor
// creator
if (has_callback_methods_ && !has_callback_generic_service_) {
- unimplemented_service_.reset(new grpc::CallbackGenericService);
+ unimplemented_service_ = y_absl::make_unique<grpc::CallbackGenericService>();
RegisterCallbackGenericService(unimplemented_service_.get());
}
@@ -1167,13 +1157,27 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
}
#endif
- grpc_server_start(server_);
+ // If we have a generic service, all unmatched method names go there.
+ // Otherwise, we must provide at least one RPC request for an "unimplemented"
+ // RPC, which covers any RPC for a method name that isn't matched. If we
+ // have a sync service, let it be a sync unimplemented RPC, which must be
+ // registered before server start (to initialize an AllocatingRequestMatcher).
+ // If we have an AllocatingRequestMatcher, we can't also specify other
+ // unimplemented RPCs via explicit async requests, so we won't do so. If we
+ // only have async services, we can specify unimplemented RPCs on each async
+ // CQ so that some user polling thread will move them along as long as some
+ // progress is being made on any RPCs in the system.
+ bool unknown_rpc_needed =
+ !has_async_generic_service_ && !has_callback_generic_service_;
+
+ if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
+ sync_req_mgrs_[0]->AddUnknownSyncMethod();
+ unknown_rpc_needed = false;
+ }
- if (!has_async_generic_service_ && !has_callback_generic_service_) {
- for (const auto& value : sync_req_mgrs_) {
- value->AddUnknownSyncMethod();
- }
+ grpc_server_start(server_);
+ if (unknown_rpc_needed) {
for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]);
@@ -1182,14 +1186,15 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq);
}
+ unknown_rpc_needed = false;
}
// If this server has any support for synchronous methods (has any sync
// server CQs), make sure that we have a ResourceExhausted handler
// to deal with the case of thread exhaustion
if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
- resource_exhausted_handler_.reset(
- new grpc::internal::ResourceExhaustedHandler);
+ resource_exhausted_handler_ =
+ y_absl::make_unique<grpc::internal::ResourceExhaustedHandler>();
}
for (const auto& value : sync_req_mgrs_) {
@@ -1219,7 +1224,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
/// The completion queue to use for server shutdown completion notification
grpc::CompletionQueue shutdown_cq;
- grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag
+ grpc::ShutdownTag shutdown_tag; // Phony shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
shutdown_cq.Shutdown();
@@ -1237,6 +1242,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
+ // Drop the shutdown ref and wait for all other refs to drop as well.
+ UnrefAndWaitLocked();
+
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (const auto& value : sync_req_mgrs_) {
@@ -1248,13 +1256,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
value->Wait();
}
- // Drop the shutdown ref and wait for all other refs to drop as well.
- UnrefAndWaitLocked();
-
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ callback_cq_->Shutdown();
+ } else {
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
+ }
callback_cq_ = nullptr;
}
@@ -1265,7 +1275,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
shutdown_notified_ = true;
- shutdown_cv_.Broadcast();
+ shutdown_cv_.SignalAll();
#ifndef NDEBUG
// Unregister this server with the CQs passed into it by the user so that
@@ -1326,13 +1336,19 @@ grpc::CompletionQueue* Server::CallbackCQ() {
if (callback_cq_ != nullptr) {
return callback_cq_;
}
- auto* shutdown_callback = new grpc::ShutdownCallback;
- callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
- shutdown_callback});
-
- // Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ auto* shutdown_callback = new grpc::ShutdownCallback;
+ callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ } else {
+ // Otherwise we need to use the alternative CQ variant
+ callback_cq_ = CompletionQueue::CallbackAlternativeCQ();
+ }
return callback_cq_;
}
diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc
index 458ac20d87..df82c69ed8 100644
--- a/contrib/libs/grpc/src/cpp/server/server_context.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_context.cc
@@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/time.h>
@@ -37,6 +38,8 @@
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
+
// CompletionOp
class ServerContextBase::CompletionOp final
@@ -62,7 +65,7 @@ class ServerContextBase::CompletionOp final
CompletionOp(CompletionOp&&) = delete;
CompletionOp& operator=(CompletionOp&&) = delete;
- ~CompletionOp() {
+ ~CompletionOp() override {
if (call_.server_rpc_info()) {
call_.server_rpc_info()->Unref();
}
@@ -121,11 +124,12 @@ class ServerContextBase::CompletionOp final
void ContinueFinalizeResultAfterInterception() override {
done_intercepting_ = true;
if (!has_tag_) {
- /* We don't have a tag to return. */
+ // We don't have a tag to return.
Unref();
+ // Unref can delete this, so do not access anything from this afterward.
return;
}
- /* Start a dummy op so that we can return the tag */
+ /* Start a phony op so that we can return the tag */
GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
nullptr) == GRPC_CALL_OK);
}
@@ -174,33 +178,41 @@ void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
}
bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
- // Decide whether to call the cancel callback within the lock
- bool call_cancel;
+ // Decide whether to do the unref or call the cancel callback within the lock
+ bool do_unref = false;
+ bool has_tag = false;
+ bool call_cancel = false;
{
grpc_core::MutexLock lock(&mu_);
if (done_intercepting_) {
// We are done intercepting.
- bool has_tag = has_tag_;
+ has_tag = has_tag_;
if (has_tag) {
*tag = tag_;
}
- Unref();
- return has_tag;
- }
- finalized_ = true;
+ // Release the lock before unreffing as Unref may delete this object
+ do_unref = true;
+ } else {
+ finalized_ = true;
+
+ // If for some reason the incoming status is false, mark that as a
+ // cancellation.
+ // TODO(vjpai): does this ever happen?
+ if (!*status) {
+ cancelled_ = 1;
+ }
- // If for some reason the incoming status is false, mark that as a
- // cancellation.
- // TODO(vjpai): does this ever happen?
- if (!*status) {
- cancelled_ = 1;
+ call_cancel = (cancelled_ != 0);
+ // Release the lock since we may call a callback and interceptors.
}
-
- call_cancel = (cancelled_ != 0);
- // Release the lock since we may call a callback and interceptors.
}
+ if (do_unref) {
+ Unref();
+ // Unref can delete this, so do not access anything from this afterward.
+ return has_tag;
+ }
if (call_cancel && callback_controller_ != nullptr) {
callback_controller_->MaybeCallOnCancel();
}
@@ -214,6 +226,7 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
*tag = tag_;
}
Unref();
+ // Unref can delete this, so do not access anything from this afterward.
return has_tag;
}
// There are interceptors to be run. Return false for now.
@@ -223,7 +236,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
// ServerContextBase body
ServerContextBase::ServerContextBase()
- : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
+ : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {
+ g_gli_initializer.summon();
+}
ServerContextBase::ServerContextBase(gpr_timespec deadline,
grpc_metadata_array* arr)
@@ -240,6 +255,7 @@ void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
ServerContextBase::~ServerContextBase() {
if (completion_op_) {
completion_op_->Unref();
+ // Unref can delete completion_op_, so do not access it afterward.
}
if (rpc_info_) {
rpc_info_->Unref();
diff --git a/contrib/libs/grpc/src/cpp/server/server_credentials.cc b/contrib/libs/grpc/src/cpp/server/server_credentials.cc
index c3b3a8b379..36b5a52dc7 100644
--- a/contrib/libs/grpc/src/cpp/server/server_credentials.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_credentials.cc
@@ -18,8 +18,13 @@
#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/impl/grpc_library.h>
+
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
+ServerCredentials::ServerCredentials() { g_gli_initializer.summon(); }
+
ServerCredentials::~ServerCredentials() {}
} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc
new file mode 100644
index 0000000000..b543f3f172
--- /dev/null
+++ b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc
@@ -0,0 +1,41 @@
+//
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//
+
+#include "src/cpp/server/secure_server_credentials.h"
+
+namespace grpc {
+namespace experimental {
+
+std::shared_ptr<ServerCredentials> XdsServerCredentials(
+ const std::shared_ptr<ServerCredentials>& fallback_credentials) {
+ GPR_ASSERT(fallback_credentials != nullptr);
+ if (fallback_credentials->IsInsecure()) {
+ grpc_server_credentials* insecure_creds =
+ grpc_insecure_server_credentials_create();
+ auto xds_creds = std::make_shared<SecureServerCredentials>(
+ grpc_xds_server_credentials_create(insecure_creds));
+ grpc_server_credentials_release(insecure_creds);
+ return xds_creds;
+ }
+ return std::make_shared<SecureServerCredentials>(
+ grpc_xds_server_credentials_create(
+ fallback_credentials->AsSecureServerCredentials()->c_creds()));
+}
+
+} // namespace experimental
+} // namespace grpc
diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc
index c8560aa81d..5155f610a8 100644
--- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc
+++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc
@@ -152,7 +152,7 @@ void ThreadManager::MainWorkLoop() {
bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);
- grpc_core::ReleasableMutexLock lock(&mu_);
+ grpc_core::LockableAndReleasableMutexLock lock(&mu_);
// Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
bool done = false;
@@ -179,7 +179,7 @@ void ThreadManager::MainWorkLoop() {
max_active_threads_sofar_ = num_threads_;
}
// Drop lock before spawning thread to avoid contention
- lock.Unlock();
+ lock.Release();
WorkerThread* worker = new WorkerThread(this);
if (worker->created()) {
worker->Start();
@@ -195,17 +195,17 @@ void ThreadManager::MainWorkLoop() {
// There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would
// like to have (min_pollers_)
- lock.Unlock();
+ lock.Release();
} else {
// There are no pollers to spare and we couldn't allocate
// a new thread, so resources are exhausted!
- lock.Unlock();
+ lock.Release();
resource_exhausted = true;
}
} else {
// There are a sufficient number of pollers available so we can do
// the work and continue polling with our existing poller threads
- lock.Unlock();
+ lock.Release();
}
// Lock is always released at this point - do the application work
// or return resource exhausted if there is new work but we couldn't
diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h
index 43f1fd5585..aae2417787 100644
--- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h
+++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h
@@ -119,7 +119,7 @@ class ThreadManager {
// not be called (and the need for this WorkerThread class is eliminated)
class WorkerThread {
public:
- WorkerThread(ThreadManager* thd_mgr);
+ explicit WorkerThread(ThreadManager* thd_mgr);
~WorkerThread();
bool created() const { return created_; }
diff --git a/contrib/libs/grpc/src/cpp/util/error_details.cc b/contrib/libs/grpc/src/cpp/util/error_details.cc
index dfd3351be1..0330f012c2 100644
--- a/contrib/libs/grpc/src/cpp/util/error_details.cc
+++ b/contrib/libs/grpc/src/cpp/util/error_details.cc
@@ -17,34 +17,3 @@
*/
#include <grpcpp/support/error_details.h>
-
-#include "src/proto/grpc/status/status.pb.h"
-
-namespace grpc {
-
-grpc::Status ExtractErrorDetails(const grpc::Status& from,
- ::google::rpc::Status* to) {
- if (to == nullptr) {
- return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "");
- }
- if (!to->ParseFromString(TProtoStringType(from.error_details()))) {
- return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "");
- }
- return grpc::Status::OK;
-}
-
-grpc::Status SetErrorDetails(const ::google::rpc::Status& from,
- grpc::Status* to) {
- if (to == nullptr) {
- return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "");
- }
- grpc::StatusCode code = grpc::StatusCode::UNKNOWN;
- if (from.code() >= grpc::StatusCode::OK &&
- from.code() <= grpc::StatusCode::UNAUTHENTICATED) {
- code = static_cast<grpc::StatusCode>(from.code());
- }
- *to = grpc::Status(code, from.message(), from.SerializeAsString());
- return grpc::Status::OK;
-}
-
-} // namespace grpc