diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-03-15 21:33:41 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-03-15 21:33:41 +0300 |
commit | 3dd665b514943f69657b593eb51af90b99b1206b (patch) | |
tree | 0eb633e628bb1fe6c639574b1184d43def7c0a73 /contrib/libs/grpc/src/cpp | |
parent | a68afc731202027f105bc5723ee11788017c29e2 (diff) | |
download | ydb-3dd665b514943f69657b593eb51af90b99b1206b.tar.gz |
intermediate changes
ref:953ca886ec160075b38c0f3614de029b423f0a9e
Diffstat (limited to 'contrib/libs/grpc/src/cpp')
41 files changed, 813 insertions, 724 deletions
diff --git a/contrib/libs/grpc/src/cpp/README.md b/contrib/libs/grpc/src/cpp/README.md index 967a0a43b7..134f4db56c 100755 --- a/contrib/libs/grpc/src/cpp/README.md +++ b/contrib/libs/grpc/src/cpp/README.md @@ -25,10 +25,10 @@ To add gRPC as a dependency in bazel: ], strip_prefix = "grpc-YOUR_GRPC_COMMIT_SHA", ) - load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps") - grpc_deps() + load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps") + grpc_extra_deps() ``` ## CMake diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc index ac95c29efc..1793161d6c 100644 --- a/contrib/libs/grpc/src/cpp/client/channel_cc.cc +++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc @@ -38,7 +38,9 @@ #include <grpcpp/support/channel_arguments.h> #include <grpcpp/support/config.h> #include <grpcpp/support/status.h> + #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/surface/completion_queue.h" namespace grpc { @@ -56,7 +58,12 @@ Channel::Channel(const TString& host, grpc_channel* channel, Channel::~Channel() { grpc_channel_destroy(c_channel_); if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + callback_cq_->Shutdown(); + } else { + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + } } } @@ -238,13 +245,21 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { // if there is no explicit per-channel CQ registered grpc::internal::MutexLock l(&mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = new ShutdownCallback; - callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + + auto* shutdown_callback = new ShutdownCallback; + callback_cq_ = + new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, + GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } else { + // Otherwise we need to use the alternative CQ variant + callback_cq_ = CompletionQueue::CallbackAlternativeCQ(); + } } return callback_cq_; } diff --git a/contrib/libs/grpc/src/cpp/client/client_callback.cc b/contrib/libs/grpc/src/cpp/client/client_callback.cc index f4cbc97d34..a05761ad06 100644 --- a/contrib/libs/grpc/src/cpp/client/client_callback.cc +++ b/contrib/libs/grpc/src/cpp/client/client_callback.cc @@ -34,14 +34,14 @@ void ClientReactor::InternalScheduleOnDone(grpc::Status s) { const grpc::Status status; ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s) : reactor(reactor_arg), status(std::move(s)) { - GRPC_CLOSURE_INIT(&closure, - [](void* void_arg, grpc_error*) { - ClosureWithArg* arg = - static_cast<ClosureWithArg*>(void_arg); - arg->reactor->OnDone(arg->status); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT( + &closure, + [](void* void_arg, grpc_error*) { + ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg); + arg->reactor->OnDone(arg->status); + delete arg; + }, + this, grpc_schedule_on_exec_ctx); } }; ClosureWithArg* arg = new ClosureWithArg(this, std::move(s)); diff --git a/contrib/libs/grpc/src/cpp/client/client_context.cc b/contrib/libs/grpc/src/cpp/client/client_context.cc index b75343d089..8f18bfe724 100644 --- a/contrib/libs/grpc/src/cpp/client/client_context.cc +++ b/contrib/libs/grpc/src/cpp/client/client_context.cc @@ -62,6 +62,7 @@ ClientContext::ClientContext() propagate_from_call_(nullptr), compression_algorithm_(GRPC_COMPRESS_NONE), initial_metadata_corked_(false) { + g_gli_initializer.summon(); g_client_callbacks->DefaultConstructor(this); } diff --git a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc index e5bafff70a..b7036e541c 100644 --- a/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc +++ b/contrib/libs/grpc/src/cpp/client/insecure_credentials.cc @@ -51,6 +51,9 @@ class InsecureChannelCredentialsImpl final : public ChannelCredentials { } SecureChannelCredentials* AsSecureCredentials() override { return nullptr; } + + private: + bool IsInsecure() const override { return true; } }; } // namespace diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc index 0f6db3caa5..109b50448d 100644 --- a/contrib/libs/grpc/src/cpp/client/secure_credentials.cc +++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.cc @@ -28,6 +28,9 @@ #include <grpcpp/impl/grpc_library.h> #include <grpcpp/support/channel_arguments.h> +#include "y_absl/strings/str_join.h" + +// TODO(yashykt): We shouldn't be including "src/core" headers. #include "src/core/lib/gpr/env.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" @@ -79,7 +82,8 @@ bool SecureCallCredentials::ApplyToCall(grpc_call* call) { return grpc_call_set_credentials(call, c_creds_) == GRPC_CALL_OK; } -namespace { +namespace internal { + std::shared_ptr<ChannelCredentials> WrapChannelCredentials( grpc_channel_credentials* creds) { return creds == nullptr ? nullptr @@ -87,6 +91,10 @@ std::shared_ptr<ChannelCredentials> WrapChannelCredentials( new SecureChannelCredentials(creds)); } +} // namespace internal + +namespace { + std::shared_ptr<CallCredentials> WrapCallCredentials( grpc_call_credentials* creds) { return creds == nullptr ? nullptr @@ -97,10 +105,17 @@ std::shared_ptr<CallCredentials> WrapCallCredentials( std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() { grpc::GrpcLibraryCodegen init; // To call grpc_init(). - return WrapChannelCredentials( + return internal::WrapChannelCredentials( grpc_google_default_credentials_create(nullptr)); } +std::shared_ptr<CallCredentials> ExternalAccountCredentials( + const grpc::string& json_string, const std::vector<grpc::string>& scopes) { + grpc::GrpcLibraryCodegen init; // To call grpc_init(). + return WrapCallCredentials(grpc_external_account_credentials_create( + json_string.c_str(), y_absl::StrJoin(scopes, ",").c_str())); +} + // Builds SSL Credentials given SSL specific options std::shared_ptr<ChannelCredentials> SslCredentials( const SslCredentialsOptions& options) { @@ -112,7 +127,7 @@ std::shared_ptr<ChannelCredentials> SslCredentials( options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), options.pem_private_key.empty() ? nullptr : &pem_key_cert_pair, nullptr, nullptr); - return WrapChannelCredentials(c_creds); + return internal::WrapChannelCredentials(c_creds); } namespace experimental { @@ -277,20 +292,20 @@ std::shared_ptr<ChannelCredentials> AltsCredentials( } grpc_channel_credentials* c_creds = grpc_alts_credentials_create(c_options); grpc_alts_credentials_options_destroy(c_options); - return WrapChannelCredentials(c_creds); + return internal::WrapChannelCredentials(c_creds); } // Builds Local Credentials std::shared_ptr<ChannelCredentials> LocalCredentials( grpc_local_connect_type type) { grpc::GrpcLibraryCodegen init; // To call grpc_init(). - return WrapChannelCredentials(grpc_local_credentials_create(type)); + return internal::WrapChannelCredentials(grpc_local_credentials_create(type)); } // Builds TLS Credentials given TLS options. std::shared_ptr<ChannelCredentials> TlsCredentials( - const TlsCredentialsOptions& options) { - return WrapChannelCredentials( + const TlsChannelCredentialsOptions& options) { + return internal::WrapChannelCredentials( grpc_tls_credentials_create(options.c_credentials_options())); } @@ -356,8 +371,10 @@ std::shared_ptr<ChannelCredentials> CompositeChannelCredentials( channel_creds->AsSecureCredentials(); SecureCallCredentials* s_call_creds = call_creds->AsSecureCredentials(); if (s_channel_creds && s_call_creds) { - return WrapChannelCredentials(grpc_composite_channel_credentials_create( - s_channel_creds->GetRawCreds(), s_call_creds->GetRawCreds(), nullptr)); + return internal::WrapChannelCredentials( + grpc_composite_channel_credentials_create( + s_channel_creds->GetRawCreds(), s_call_creds->GetRawCreds(), + nullptr)); } return nullptr; } diff --git a/contrib/libs/grpc/src/cpp/client/secure_credentials.h b/contrib/libs/grpc/src/cpp/client/secure_credentials.h index 4fc79346bf..9325f365b1 100644 --- a/contrib/libs/grpc/src/cpp/client/secure_credentials.h +++ b/contrib/libs/grpc/src/cpp/client/secure_credentials.h @@ -26,6 +26,7 @@ #include <grpcpp/support/config.h> #include "y_absl/strings/str_cat.h" +// TODO(yashykt): We shouldn't be including "src/core" headers. #include "src/core/lib/security/credentials/credentials.h" #include "src/cpp/server/thread_pool_interface.h" @@ -36,7 +37,7 @@ class Channel; class SecureChannelCredentials final : public ChannelCredentials { public: explicit SecureChannelCredentials(grpc_channel_credentials* c_creds); - ~SecureChannelCredentials() { + ~SecureChannelCredentials() override { if (c_creds_ != nullptr) c_creds_->Unref(); } grpc_channel_credentials* GetRawCreds() { return c_creds_; } @@ -58,7 +59,7 @@ class SecureChannelCredentials final : public ChannelCredentials { class SecureCallCredentials final : public CallCredentials { public: explicit SecureCallCredentials(grpc_call_credentials* c_creds); - ~SecureCallCredentials() { + ~SecureCallCredentials() override { if (c_creds_ != nullptr) c_creds_->Unref(); } grpc_call_credentials* GetRawCreds() { return c_creds_; } @@ -74,6 +75,13 @@ class SecureCallCredentials final : public CallCredentials { grpc_call_credentials* const c_creds_; }; +namespace internal { + +std::shared_ptr<ChannelCredentials> WrapChannelCredentials( + grpc_channel_credentials* creds); + +} // namespace internal + namespace experimental { // Transforms C++ STS Credentials options to core options. The pointers of the diff --git a/contrib/libs/grpc/src/cpp/client/xds_credentials.cc b/contrib/libs/grpc/src/cpp/client/xds_credentials.cc new file mode 100644 index 0000000000..63b48837ef --- /dev/null +++ b/contrib/libs/grpc/src/cpp/client/xds_credentials.cc @@ -0,0 +1,41 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include "src/cpp/client/secure_credentials.h" + +namespace grpc { +namespace experimental { + +std::shared_ptr<ChannelCredentials> XdsCredentials( + const std::shared_ptr<ChannelCredentials>& fallback_creds) { + GPR_ASSERT(fallback_creds != nullptr); + if (fallback_creds->IsInsecure()) { + grpc_channel_credentials* insecure_creds = + grpc_insecure_credentials_create(); + auto xds_creds = internal::WrapChannelCredentials( + grpc_xds_credentials_create(insecure_creds)); + grpc_channel_credentials_release(insecure_creds); + return xds_creds; + } else { + return internal::WrapChannelCredentials(grpc_xds_credentials_create( + fallback_creds->AsSecureCredentials()->GetRawCreds())); + } +} + +} // namespace experimental +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt b/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt index a5d42d5b53..ed4a3efa9f 100644 --- a/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt +++ b/contrib/libs/grpc/src/cpp/common/.yandex_meta/licenses.list.txt @@ -12,6 +12,20 @@ * limitations under the License. +====================Apache-2.0==================== +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + ====================COPYRIGHT==================== * Copyright 2015 gRPC authors. @@ -25,4 +39,8 @@ ====================COPYRIGHT==================== + * Copyright 2020 gRPC authors. + + +====================COPYRIGHT==================== # Copyright 2019 gRPC authors. diff --git a/contrib/libs/grpc/src/cpp/common/alarm.cc b/contrib/libs/grpc/src/cpp/common/alarm.cc index a2612874b2..ac1b747b49 100644 --- a/contrib/libs/grpc/src/cpp/common/alarm.cc +++ b/contrib/libs/grpc/src/cpp/common/alarm.cc @@ -41,7 +41,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); } - ~AlarmImpl() {} + ~AlarmImpl() override {} bool FinalizeResult(void** tag, bool* /*status*/) override { *tag = tag_; Unref(); @@ -80,20 +80,20 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { // Don't use any CQ at all. Instead just use the timer to fire the function callback_ = std::move(f); Ref(); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { - grpc_core::Executor::Run( - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error* error) { - AlarmImpl* alarm = - static_cast<AlarmImpl*>(arg); - alarm->callback_(error == GRPC_ERROR_NONE); - alarm->Unref(); - }, - arg, nullptr), - error); - }, - this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error* error) { + grpc_core::Executor::Run( + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error* error) { + AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); + alarm->callback_(error == GRPC_ERROR_NONE); + alarm->Unref(); + }, + arg, nullptr), + error); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } diff --git a/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc b/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc index fbb18e9915..0f380b0950 100644 --- a/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc +++ b/contrib/libs/grpc/src/cpp/common/auth_property_iterator.cc @@ -61,7 +61,7 @@ bool AuthPropertyIterator::operator!=(const AuthPropertyIterator& rhs) const { return !operator==(rhs); } -const AuthProperty AuthPropertyIterator::operator*() { +AuthProperty AuthPropertyIterator::operator*() { return std::pair<grpc::string_ref, grpc::string_ref>( property_->name, grpc::string_ref(property_->value, property_->value_length)); diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.cc b/contrib/libs/grpc/src/cpp/common/channel_filter.cc index 8df6c7b98f..ab56d6073f 100644 --- a/contrib/libs/grpc/src/cpp/common/channel_filter.cc +++ b/contrib/libs/grpc/src/cpp/common/channel_filter.cc @@ -87,7 +87,7 @@ void ChannelFilterPluginInit() { for (size_t i = 0; i < channel_filters->size(); ++i) { FilterRecord& filter = (*channel_filters)[i]; grpc_channel_init_register_stage(filter.stack_type, filter.priority, - MaybeAddFilter, (void*)&filter); + MaybeAddFilter, &filter); } } diff --git a/contrib/libs/grpc/src/cpp/common/channel_filter.h b/contrib/libs/grpc/src/cpp/common/channel_filter.h index 5ce720b307..b16f06bddc 100644 --- a/contrib/libs/grpc/src/cpp/common/channel_filter.h +++ b/contrib/libs/grpc/src/cpp/common/channel_filter.h @@ -60,7 +60,7 @@ class MetadataBatch { const grpc_mdelem> { public: const grpc_mdelem& operator*() const { return elem_->md; } - const grpc_mdelem operator->() const { return elem_->md; } + grpc_mdelem operator->() const { return elem_->md; } const_iterator& operator++() { elem_ = elem_->next; diff --git a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc index 96a7105eaf..6774559074 100644 --- a/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc +++ b/contrib/libs/grpc/src/cpp/common/completion_queue_cc.cc @@ -20,13 +20,106 @@ #include <memory> #include <grpc/grpc.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/support/time.h> +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/thd.h" + namespace grpc { +namespace { + +internal::GrpcLibraryInitializer g_gli_initializer; + +gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT; +grpc_core::Mutex* g_callback_alternative_mu; + +// Implement a ref-counted callback CQ for global use in the alternative +// implementation so that its threads are only created once. Do this using +// explicit ref-counts and raw pointers rather than a shared-ptr since that +// has a non-trivial destructor and thus can't be used for global variables. +struct CallbackAlternativeCQ { + int refs Y_ABSL_GUARDED_BY(g_callback_alternative_mu) = 0; + CompletionQueue* cq Y_ABSL_GUARDED_BY(g_callback_alternative_mu); + std::vector<grpc_core::Thread>* nexting_threads + Y_ABSL_GUARDED_BY(g_callback_alternative_mu); -static internal::GrpcLibraryInitializer g_gli_initializer; + CompletionQueue* Ref() { + grpc_core::MutexLock lock(&*g_callback_alternative_mu); + refs++; + if (refs == 1) { + cq = new CompletionQueue; + int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores() / 2, 2, 16); + nexting_threads = new std::vector<grpc_core::Thread>; + for (int i = 0; i < num_nexting_threads; i++) { + nexting_threads->emplace_back( + "nexting_thread", + [](void* arg) { + grpc_completion_queue* cq = + static_cast<CompletionQueue*>(arg)->cq(); + while (true) { + // Use the raw Core next function rather than the C++ Next since + // Next incorporates FinalizeResult and we actually want that + // called from the callback functor itself. + // TODO(vjpai): Migrate below to next without a timeout or idle + // phase. That's currently starving out some other polling, + // though. + auto ev = grpc_completion_queue_next( + cq, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(1000, GPR_TIMESPAN)), + nullptr); + if (ev.type == GRPC_QUEUE_SHUTDOWN) { + return; + } + if (ev.type == GRPC_QUEUE_TIMEOUT) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(100, GPR_TIMESPAN))); + continue; + } + GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE); + // We can always execute the callback inline rather than + // pushing it to another Executor thread because this + // thread is definitely running on a background thread, does not + // hold any application locks before executing the callback, + // and cannot be entered recursively. + auto* functor = + static_cast<grpc_experimental_completion_queue_functor*>( + ev.tag); + functor->functor_run(functor, ev.success); + } + }, + cq); + } + for (auto& th : *nexting_threads) { + th.Start(); + } + } + return cq; + } + + void Unref() { + grpc_core::MutexLock lock(g_callback_alternative_mu); + refs--; + if (refs == 0) { + cq->Shutdown(); + for (auto& th : *nexting_threads) { + th.Join(); + } + delete nexting_threads; + delete cq; + } + } +}; + +CallbackAlternativeCQ g_callback_alternative_cq; + +} // namespace // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create @@ -96,4 +189,19 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { return false; } +CompletionQueue* CompletionQueue::CallbackAlternativeCQ() { + gpr_once_init(&g_once_init_callback_alternative, + [] { g_callback_alternative_mu = new grpc_core::Mutex(); }); + return g_callback_alternative_cq.Ref(); +} + +void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq) + Y_ABSL_NO_THREAD_SAFETY_ANALYSIS { + (void)cq; + // This accesses g_callback_alternative_cq without acquiring the mutex + // but it's considered safe because it just reads the pointer address. + GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq); + g_callback_alternative_cq.Unref(); +} + } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h index 51013efac7..0e85a7077b 100644 --- a/contrib/libs/grpc/src/cpp/common/secure_auth_context.h +++ b/contrib/libs/grpc/src/cpp/common/secure_auth_context.h @@ -49,7 +49,7 @@ class SecureAuthContext final : public AuthContext { void AddProperty(const TString& key, const grpc::string_ref& value) override; - virtual bool SetPeerIdentityPropertyName(const TString& name) override; + bool SetPeerIdentityPropertyName(const TString& name) override; private: grpc_core::RefCountedPtr<grpc_auth_context> ctx_; diff --git a/contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc b/contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc new file mode 100644 index 0000000000..8e99b571a1 --- /dev/null +++ b/contrib/libs/grpc/src/cpp/common/tls_certificate_provider.cc @@ -0,0 +1,59 @@ +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> +#include <grpcpp/security/tls_certificate_provider.h> + +#include "y_absl/container/inlined_vector.h" + +namespace grpc { +namespace experimental { + +StaticDataCertificateProvider::StaticDataCertificateProvider( + const TString& root_certificate, + const std::vector<IdentityKeyCertPair>& identity_key_cert_pairs) { + GPR_ASSERT(!root_certificate.empty() || !identity_key_cert_pairs.empty()); + grpc_tls_identity_pairs* pairs_core = grpc_tls_identity_pairs_create(); + for (const IdentityKeyCertPair& pair : identity_key_cert_pairs) { + grpc_tls_identity_pairs_add_pair(pairs_core, pair.private_key.c_str(), + pair.certificate_chain.c_str()); + } + c_provider_ = grpc_tls_certificate_provider_static_data_create( + root_certificate.c_str(), pairs_core); + GPR_ASSERT(c_provider_ != nullptr); +}; + +StaticDataCertificateProvider::~StaticDataCertificateProvider() { + grpc_tls_certificate_provider_release(c_provider_); +}; + +FileWatcherCertificateProvider::FileWatcherCertificateProvider( + const TString& private_key_path, + const TString& identity_certificate_path, + const TString& root_cert_path, unsigned int refresh_interval_sec) { + c_provider_ = grpc_tls_certificate_provider_file_watcher_create( + private_key_path.c_str(), identity_certificate_path.c_str(), + root_cert_path.c_str(), refresh_interval_sec); + GPR_ASSERT(c_provider_ != nullptr); +}; + +FileWatcherCertificateProvider::~FileWatcherCertificateProvider() { + grpc_tls_certificate_provider_release(c_provider_); +}; + +} // namespace experimental +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc b/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc index 7e435ac1de..d612bd062e 100644 --- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc +++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options.cc @@ -16,178 +16,22 @@ * */ +#include <grpc/grpc_security.h> #include <grpc/support/alloc.h> #include <grpcpp/security/tls_credentials_options.h> #include "y_absl/container/inlined_vector.h" -#include "src/core/lib/security/credentials/tls/grpc_tls_credentials_options.h" #include "src/cpp/common/tls_credentials_options_util.h" namespace grpc { namespace experimental { -/** TLS key materials config API implementation **/ -void TlsKeyMaterialsConfig::set_pem_root_certs( - const TString& pem_root_certs) { - pem_root_certs_ = pem_root_certs; -} - -void TlsKeyMaterialsConfig::add_pem_key_cert_pair( - const PemKeyCertPair& pem_key_cert_pair) { - pem_key_cert_pair_list_.push_back(pem_key_cert_pair); -} - -void TlsKeyMaterialsConfig::set_key_materials( - const TString& pem_root_certs, - const std::vector<PemKeyCertPair>& pem_key_cert_pair_list) { - pem_key_cert_pair_list_ = pem_key_cert_pair_list; - pem_root_certs_ = pem_root_certs; -} - -/** TLS credential reload arg API implementation **/ -TlsCredentialReloadArg::TlsCredentialReloadArg( - grpc_tls_credential_reload_arg* arg) - : c_arg_(arg) { - if (c_arg_ != nullptr && c_arg_->context != nullptr) { - gpr_log(GPR_ERROR, "c_arg context has already been set"); - } - c_arg_->context = static_cast<void*>(this); - c_arg_->destroy_context = &TlsCredentialReloadArgDestroyContext; -} - -TlsCredentialReloadArg::~TlsCredentialReloadArg() {} - -void* TlsCredentialReloadArg::cb_user_data() const { - return c_arg_->cb_user_data; -} -bool TlsCredentialReloadArg::is_pem_key_cert_pair_list_empty() const { - return c_arg_->key_materials_config->pem_key_cert_pair_list().empty(); -} - -grpc_ssl_certificate_config_reload_status TlsCredentialReloadArg::status() - const { - return c_arg_->status; -} - -TString TlsCredentialReloadArg::error_details() const { - return c_arg_->error_details->error_details(); -} - -void TlsCredentialReloadArg::set_cb_user_data(void* cb_user_data) { - c_arg_->cb_user_data = cb_user_data; -} - -void TlsCredentialReloadArg::set_pem_root_certs( - const TString& pem_root_certs) { - ::grpc_core::UniquePtr<char> c_pem_root_certs( - gpr_strdup(pem_root_certs.c_str())); - c_arg_->key_materials_config->set_pem_root_certs(std::move(c_pem_root_certs)); -} - -namespace { - -::grpc_core::PemKeyCertPair ConvertToCorePemKeyCertPair( - const TlsKeyMaterialsConfig::PemKeyCertPair& pem_key_cert_pair) { - grpc_ssl_pem_key_cert_pair* ssl_pair = - (grpc_ssl_pem_key_cert_pair*)gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair)); - ssl_pair->private_key = gpr_strdup(pem_key_cert_pair.private_key.c_str()); - ssl_pair->cert_chain = gpr_strdup(pem_key_cert_pair.cert_chain.c_str()); - return ::grpc_core::PemKeyCertPair(ssl_pair); -} - -} // namespace - -void TlsCredentialReloadArg::add_pem_key_cert_pair( - const TlsKeyMaterialsConfig::PemKeyCertPair& pem_key_cert_pair) { - c_arg_->key_materials_config->add_pem_key_cert_pair( - ConvertToCorePemKeyCertPair(pem_key_cert_pair)); -} - -void TlsCredentialReloadArg::set_key_materials( - const TString& pem_root_certs, - std::vector<TlsKeyMaterialsConfig::PemKeyCertPair> pem_key_cert_pair_list) { - /** Initialize the |key_materials_config| field of |c_arg_|, if it has not - * already been done. **/ - if (c_arg_->key_materials_config == nullptr) { - c_arg_->key_materials_config = grpc_tls_key_materials_config_create(); - } - /** Convert |pem_key_cert_pair_list| to an inlined vector of ssl pairs. **/ - ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1> - c_pem_key_cert_pair_list; - for (const auto& key_cert_pair : pem_key_cert_pair_list) { - c_pem_key_cert_pair_list.emplace_back( - ConvertToCorePemKeyCertPair(key_cert_pair)); - } - /** Populate the key materials config field of |c_arg_|. **/ - c_arg_->key_materials_config->set_key_materials(pem_root_certs.c_str(), - c_pem_key_cert_pair_list); -} - -void TlsCredentialReloadArg::set_key_materials_config( - const std::shared_ptr<TlsKeyMaterialsConfig>& key_materials_config) { - if (key_materials_config == nullptr) { - c_arg_->key_materials_config = nullptr; - return; - } - ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1> - c_pem_key_cert_pair_list; - for (const auto& key_cert_pair : - key_materials_config->pem_key_cert_pair_list()) { - grpc_ssl_pem_key_cert_pair* ssl_pair = - (grpc_ssl_pem_key_cert_pair*)gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair)); - ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str()); - ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str()); - ::grpc_core::PemKeyCertPair c_pem_key_cert_pair = - ::grpc_core::PemKeyCertPair(ssl_pair); - c_pem_key_cert_pair_list.emplace_back(std::move(c_pem_key_cert_pair)); - } - ::grpc_core::UniquePtr<char> c_pem_root_certs( - gpr_strdup(key_materials_config->pem_root_certs().c_str())); - if (c_arg_->key_materials_config == nullptr) { - c_arg_->key_materials_config = grpc_tls_key_materials_config_create(); - } - c_arg_->key_materials_config->set_key_materials( - key_materials_config->pem_root_certs().c_str(), c_pem_key_cert_pair_list); - c_arg_->key_materials_config->set_version(key_materials_config->version()); -} - -void TlsCredentialReloadArg::set_status( - grpc_ssl_certificate_config_reload_status status) { - c_arg_->status = status; -} - -void TlsCredentialReloadArg::set_error_details( - const TString& error_details) { - c_arg_->error_details->set_error_details(error_details.c_str()); -} - -void TlsCredentialReloadArg::OnCredentialReloadDoneCallback() { - if (c_arg_->cb == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg callback API is nullptr"); - return; - } - c_arg_->cb(c_arg_); -} - -/** gRPC TLS credential reload config API implementation **/ -TlsCredentialReloadConfig::TlsCredentialReloadConfig( - std::shared_ptr<TlsCredentialReloadInterface> credential_reload_interface) - : credential_reload_interface_(std::move(credential_reload_interface)) { - c_config_ = grpc_tls_credential_reload_config_create( - nullptr, &TlsCredentialReloadConfigCSchedule, - &TlsCredentialReloadConfigCCancel, nullptr); - c_config_->set_context(static_cast<void*>(this)); -} - -TlsCredentialReloadConfig::~TlsCredentialReloadConfig() {} - /** gRPC TLS server authorization check arg API implementation **/ TlsServerAuthorizationCheckArg::TlsServerAuthorizationCheckArg( grpc_tls_server_authorization_check_arg* arg) : c_arg_(arg) { - if (c_arg_ != nullptr && c_arg_->context != nullptr) { + GPR_ASSERT(c_arg_ != nullptr); + if (c_arg_->context != nullptr) { gpr_log(GPR_ERROR, "c_arg context has already been set"); } c_arg_->context = static_cast<void*>(this); @@ -265,7 +109,6 @@ void TlsServerAuthorizationCheckArg::OnServerAuthorizationCheckDoneCallback() { c_arg_->cb(c_arg_); } -/** gRPC TLS server authorization check config API implementation. **/ TlsServerAuthorizationCheckConfig::TlsServerAuthorizationCheckConfig( std::shared_ptr<TlsServerAuthorizationCheckInterface> server_authorization_check_interface) @@ -277,67 +120,69 @@ TlsServerAuthorizationCheckConfig::TlsServerAuthorizationCheckConfig( c_config_->set_context(static_cast<void*>(this)); } -TlsServerAuthorizationCheckConfig::~TlsServerAuthorizationCheckConfig() {} - -/** gRPC TLS credential options API implementation **/ -TlsCredentialsOptions::TlsCredentialsOptions( - grpc_tls_server_verification_option server_verification_option, - std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, - std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config, - std::shared_ptr<TlsServerAuthorizationCheckConfig> - server_authorization_check_config) - : TlsCredentialsOptions( - GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, server_verification_option, - std::move(key_materials_config), std::move(credential_reload_config), - std::move(server_authorization_check_config)) {} - -TlsCredentialsOptions::TlsCredentialsOptions( - grpc_ssl_client_certificate_request_type cert_request_type, - std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, - std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config) - : TlsCredentialsOptions(cert_request_type, GRPC_TLS_SERVER_VERIFICATION, - std::move(key_materials_config), - std::move(credential_reload_config), nullptr) {} - -TlsCredentialsOptions::TlsCredentialsOptions( - grpc_ssl_client_certificate_request_type cert_request_type, - grpc_tls_server_verification_option server_verification_option, - std::shared_ptr<TlsKeyMaterialsConfig> key_materials_config, - std::shared_ptr<TlsCredentialReloadConfig> credential_reload_config, - std::shared_ptr<TlsServerAuthorizationCheckConfig> - server_authorization_check_config) - : cert_request_type_(cert_request_type), - server_verification_option_(server_verification_option), - key_materials_config_(std::move(key_materials_config)), - credential_reload_config_(std::move(credential_reload_config)), - server_authorization_check_config_( - std::move(server_authorization_check_config)) { +TlsServerAuthorizationCheckConfig::~TlsServerAuthorizationCheckConfig() { + grpc_tls_server_authorization_check_config_release(c_config_); +} + +TlsCredentialsOptions::TlsCredentialsOptions() { c_credentials_options_ = grpc_tls_credentials_options_create(); - grpc_tls_credentials_options_set_cert_request_type(c_credentials_options_, - cert_request_type_); - if (key_materials_config_ != nullptr) { - grpc_tls_credentials_options_set_key_materials_config( - c_credentials_options_, - ConvertToCKeyMaterialsConfig(key_materials_config_)); - } - if (credential_reload_config_ != nullptr) { - grpc_tls_credentials_options_set_credential_reload_config( - c_credentials_options_, credential_reload_config_->c_config()); +} + +void TlsCredentialsOptions::set_certificate_provider( + std::shared_ptr<CertificateProviderInterface> certificate_provider) { + certificate_provider_ = std::move(certificate_provider); + if (certificate_provider_ != nullptr) { + grpc_tls_credentials_options_set_certificate_provider( + c_credentials_options_, certificate_provider_->c_provider()); } - if (server_authorization_check_config_ != nullptr) { +} + +void TlsCredentialsOptions::watch_root_certs() { + grpc_tls_credentials_options_watch_root_certs(c_credentials_options_); +} + +void TlsCredentialsOptions::set_root_cert_name( + const TString& root_cert_name) { + grpc_tls_credentials_options_set_root_cert_name(c_credentials_options_, + root_cert_name.c_str()); +} + +void TlsCredentialsOptions::watch_identity_key_cert_pairs() { + grpc_tls_credentials_options_watch_identity_key_cert_pairs( + c_credentials_options_); +} + +void TlsCredentialsOptions::set_identity_cert_name( + const TString& identity_cert_name) { + grpc_tls_credentials_options_set_identity_cert_name( + c_credentials_options_, identity_cert_name.c_str()); +} + +void TlsChannelCredentialsOptions::set_server_verification_option( + grpc_tls_server_verification_option server_verification_option) { + grpc_tls_credentials_options* options = c_credentials_options(); + GPR_ASSERT(options != nullptr); + grpc_tls_credentials_options_set_server_verification_option( + options, server_verification_option); +} + +void TlsChannelCredentialsOptions::set_server_authorization_check_config( + std::shared_ptr<TlsServerAuthorizationCheckConfig> config) { + grpc_tls_credentials_options* options = c_credentials_options(); + GPR_ASSERT(options != nullptr); + if (config != nullptr) { grpc_tls_credentials_options_set_server_authorization_check_config( - c_credentials_options_, server_authorization_check_config_->c_config()); + options, config->c_config()); } - grpc_tls_credentials_options_set_server_verification_option( - c_credentials_options_, server_verification_option); } -/** Whenever a TlsCredentialsOptions instance is created, the caller takes - * ownership of the c_credentials_options_ pointer (see e.g. the implementation - * of the TlsCredentials API in secure_credentials.cc). For this reason, the - * TlsCredentialsOptions destructor is not responsible for freeing - * c_credentials_options_. **/ -TlsCredentialsOptions::~TlsCredentialsOptions() {} +void TlsServerCredentialsOptions::set_cert_request_type( + grpc_ssl_client_certificate_request_type cert_request_type) { + grpc_tls_credentials_options* options = c_credentials_options(); + GPR_ASSERT(options != nullptr); + grpc_tls_credentials_options_set_cert_request_type(options, + cert_request_type); +} } // namespace experimental } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc index ed84003212..920f897bd7 100644 --- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc +++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.cc @@ -19,85 +19,12 @@ #include "y_absl/container/inlined_vector.h" #include <grpcpp/security/tls_credentials_options.h> + #include "src/cpp/common/tls_credentials_options_util.h" namespace grpc { namespace experimental { -/** Converts the Cpp key materials to C key materials; this allocates memory for - * the C key materials. Note that the user must free - * the underlying pointer to private key and cert chain duplicates; they are not - * freed when the grpc_core::UniquePtr<char> member variables of PemKeyCertPair - * are unused. Similarly, the user must free the underlying pointer to - * c_pem_root_certs. **/ -grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig( - const std::shared_ptr<TlsKeyMaterialsConfig>& config) { - if (config == nullptr) { - return nullptr; - } - grpc_tls_key_materials_config* c_config = - grpc_tls_key_materials_config_create(); - ::y_absl::InlinedVector<::grpc_core::PemKeyCertPair, 1> - c_pem_key_cert_pair_list; - for (const auto& key_cert_pair : config->pem_key_cert_pair_list()) { - grpc_ssl_pem_key_cert_pair* ssl_pair = - (grpc_ssl_pem_key_cert_pair*)gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair)); - ssl_pair->private_key = gpr_strdup(key_cert_pair.private_key.c_str()); - ssl_pair->cert_chain = gpr_strdup(key_cert_pair.cert_chain.c_str()); - ::grpc_core::PemKeyCertPair c_pem_key_cert_pair = - ::grpc_core::PemKeyCertPair(ssl_pair); - c_pem_key_cert_pair_list.push_back(::std::move(c_pem_key_cert_pair)); - } - c_config->set_key_materials(config->pem_root_certs().c_str(), - c_pem_key_cert_pair_list); - c_config->set_version(config->version()); - return c_config; -} - -/** The C schedule and cancel functions for the credential reload config. - * They populate a C credential reload arg with the result of a C++ credential - * reload schedule/cancel API. **/ -int TlsCredentialReloadConfigCSchedule(void* /*config_user_data*/, - grpc_tls_credential_reload_arg* arg) { - if (arg == nullptr || arg->config == nullptr || - arg->config->context() == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg was not properly initialized"); - return 1; - } - TlsCredentialReloadConfig* cpp_config = - static_cast<TlsCredentialReloadConfig*>(arg->config->context()); - TlsCredentialReloadArg* cpp_arg = new TlsCredentialReloadArg(arg); - int schedule_result = cpp_config->Schedule(cpp_arg); - return schedule_result; -} - -void TlsCredentialReloadConfigCCancel(void* /*config_user_data*/, - grpc_tls_credential_reload_arg* arg) { - if (arg == nullptr || arg->config == nullptr || - arg->config->context() == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg was not properly initialized"); - return; - } - if (arg->context == nullptr) { - gpr_log(GPR_ERROR, "credential reload arg schedule has already completed"); - return; - } - TlsCredentialReloadConfig* cpp_config = - static_cast<TlsCredentialReloadConfig*>(arg->config->context()); - TlsCredentialReloadArg* cpp_arg = - static_cast<TlsCredentialReloadArg*>(arg->context); - cpp_config->Cancel(cpp_arg); -} - -void TlsCredentialReloadArgDestroyContext(void* context) { - if (context != nullptr) { - TlsCredentialReloadArg* cpp_arg = - static_cast<TlsCredentialReloadArg*>(context); - delete cpp_arg; - } -} - /** The C schedule and cancel functions for the server authorization check * config. They populate a C server authorization check arg with the result * of a C++ server authorization check schedule/cancel API. **/ diff --git a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h index 4ee04d15d7..d6247219fb 100644 --- a/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h +++ b/contrib/libs/grpc/src/cpp/common/tls_credentials_options_util.h @@ -27,29 +27,15 @@ namespace grpc { namespace experimental { -/** The following function is exposed for testing purposes. **/ -grpc_tls_key_materials_config* ConvertToCKeyMaterialsConfig( - const std::shared_ptr<TlsKeyMaterialsConfig>& config); - -/** The following 4 functions convert the user-provided schedule or cancel +/** The following 2 functions convert the user-provided schedule or cancel * functions into C style schedule or cancel functions. These are internal * functions, not meant to be accessed by the user. **/ -int TlsCredentialReloadConfigCSchedule(void* config_user_data, - grpc_tls_credential_reload_arg* arg); - -void TlsCredentialReloadConfigCCancel(void* config_user_data, - grpc_tls_credential_reload_arg* arg); - int TlsServerAuthorizationCheckConfigCSchedule( void* config_user_data, grpc_tls_server_authorization_check_arg* arg); void TlsServerAuthorizationCheckConfigCCancel( void* config_user_data, grpc_tls_server_authorization_check_arg* arg); -/** The following 2 functions cleanup data created in the above C schedule - * functions. **/ -void TlsCredentialReloadArgDestroyContext(void* context); - void TlsServerAuthorizationCheckArgDestroyContext(void* context); } // namespace experimental diff --git a/contrib/libs/grpc/src/cpp/common/version_cc.cc b/contrib/libs/grpc/src/cpp/common/version_cc.cc index 7f4228346a..cb9895b58f 100644 --- a/contrib/libs/grpc/src/cpp/common/version_cc.cc +++ b/contrib/libs/grpc/src/cpp/common/version_cc.cc @@ -22,5 +22,5 @@ #include <grpcpp/grpcpp.h> namespace grpc { -TString Version() { return "1.33.2"; } +TString Version() { return "1.37.1"; } } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc index 1b388210c0..2a610ff8a9 100644 --- a/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc +++ b/contrib/libs/grpc/src/cpp/ext/proto_server_reflection.cc @@ -28,7 +28,6 @@ using grpc::StatusCode; using grpc::reflection::v1alpha::ErrorResponse; using grpc::reflection::v1alpha::ExtensionNumberResponse; using grpc::reflection::v1alpha::ExtensionRequest; -using grpc::reflection::v1alpha::FileDescriptorResponse; using grpc::reflection::v1alpha::ListServiceResponse; using grpc::reflection::v1alpha::ServerReflectionRequest; using grpc::reflection::v1alpha::ServerReflectionResponse; @@ -110,14 +109,14 @@ Status ProtoServerReflection::ListService(ServerContext* /*context*/, } Status ProtoServerReflection::GetFileByName( - ServerContext* /*context*/, const TString& filename, + ServerContext* /*context*/, const TString& file_name, ServerReflectionResponse* response) { if (descriptor_pool_ == nullptr) { return Status::CANCELLED; } const protobuf::FileDescriptor* file_desc = - descriptor_pool_->FindFileByName(TProtoStringType(filename)); + descriptor_pool_->FindFileByName(TProtoStringType(file_name)); if (file_desc == nullptr) { return Status(StatusCode::NOT_FOUND, "File not found."); } diff --git a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc index 9aad932429..fce23e49d5 100644 --- a/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc +++ b/contrib/libs/grpc/src/cpp/server/channel_argument_option.cc @@ -27,10 +27,10 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( StringOption(const TString& name, const TString& value) : name_(name), value_(value) {} - virtual void UpdateArguments(ChannelArguments* args) override { + void UpdateArguments(ChannelArguments* args) override { args->SetString(name_, value_); } - virtual void UpdatePlugins( + void UpdatePlugins( std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) override {} @@ -48,10 +48,10 @@ std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( IntOption(const TString& name, int value) : name_(name), value_(value) {} - virtual void UpdateArguments(ChannelArguments* args) override { + void UpdateArguments(ChannelArguments* args) override { args->SetInt(name_, value_); } - virtual void UpdatePlugins( + void UpdatePlugins( std::vector<std::unique_ptr<ServerBuilderPlugin>>* /*plugins*/) override {} diff --git a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc index ae26a447ab..b18624e3b6 100644 --- a/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc +++ b/contrib/libs/grpc/src/cpp/server/channelz/channelz_service_plugin.cc @@ -67,13 +67,6 @@ CreateChannelzServicePlugin() { new ChannelzServicePlugin()); } -} // namespace experimental -} // namespace channelz -} // namespace grpc -namespace grpc_impl { -namespace channelz { -namespace experimental { - void InitChannelzService() { static struct Initializer { Initializer() { @@ -85,4 +78,4 @@ void InitChannelzService() { } // namespace experimental } // namespace channelz -} // namespace grpc_impl +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc index 77c5d6a263..e96dc4c455 100644 --- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc +++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.cc @@ -27,11 +27,12 @@ namespace grpc { DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) : pool_(pool), - thd_("grpcpp_dynamic_pool", - [](void* th) { - static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc(); - }, - this) { + thd_( + "grpcpp_dynamic_pool", + [](void* th) { + static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc(); + }, + this) { thd_.Start(); } DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); } @@ -67,7 +68,7 @@ void DynamicThreadPool::ThreadFunc() { if (!callbacks_.empty()) { auto cb = callbacks_.front(); callbacks_.pop(); - lock.Unlock(); + lock.Release(); cb(); } else if (shutdown_) { break; @@ -96,7 +97,7 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { DynamicThreadPool::~DynamicThreadPool() { grpc_core::MutexLock lock(&mu_); shutdown_ = true; - cv_.Broadcast(); + cv_.SignalAll(); while (nthreads_ != 0) { shutdown_cv_.Wait(&mu_); } diff --git a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h index 6f9f943bc3..954aaff35d 100644 --- a/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h +++ b/contrib/libs/grpc/src/cpp/server/dynamic_thread_pool.h @@ -34,14 +34,14 @@ namespace grpc { class DynamicThreadPool final : public ThreadPoolInterface { public: explicit DynamicThreadPool(int reserve_threads); - ~DynamicThreadPool(); + ~DynamicThreadPool() override; void Add(const std::function<void()>& callback) override; private: class DynamicThread { public: - DynamicThread(DynamicThreadPool* pool); + explicit DynamicThread(DynamicThreadPool* pool); ~DynamicThread(); private: diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc index 3cc508d0cb..f39270924b 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.cc @@ -18,6 +18,7 @@ #include <memory> +#include "y_absl/memory/memory.h" #include "upb/upb.hpp" #include <grpc/slice.h> @@ -114,7 +115,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl* DefaultHealthCheckService::GetHealthCheckService( std::unique_ptr<ServerCompletionQueue> cq) { GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); + impl_ = y_absl::make_unique<HealthCheckServiceImpl>(this, std::move(cq)); return impl_.get(); } @@ -160,8 +161,8 @@ DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( AddMethod(new internal::RpcServiceMethod( kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); // Create serving thread. - thread_ = std::unique_ptr<::grpc_core::Thread>( - new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); + thread_ = y_absl::make_unique<::grpc_core::Thread>("grpc_health_check_service", + Serve, this); } DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { @@ -242,10 +243,9 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( grpc_health_v1_HealthCheckResponse_new(arena.ptr()); grpc_health_v1_HealthCheckResponse_set_status( response_struct, - status == NOT_FOUND - ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN - : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING - : grpc_health_v1_HealthCheckResponse_NOT_SERVING); + status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN + : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING + : grpc_health_v1_HealthCheckResponse_NOT_SERVING); size_t buf_length; char* buf = grpc_health_v1_HealthCheckResponse_serialize( response_struct, arena.ptr(), &buf_length); diff --git a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h index 9da1dfc15f..fe3d2b219a 100644 --- a/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h +++ b/contrib/libs/grpc/src/cpp/server/health/default_health_check_service.h @@ -56,7 +56,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { HealthCheckServiceImpl(DefaultHealthCheckService* database, std::unique_ptr<ServerCompletionQueue> cq); - ~HealthCheckServiceImpl(); + ~HealthCheckServiceImpl() override; void StartServingThread(); diff --git a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc index 3f33f4e045..4eb492e073 100644 --- a/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/insecure_server_credentials.cc @@ -33,6 +33,9 @@ class InsecureServerCredentialsImpl final : public ServerCredentials { (void)processor; GPR_ASSERT(0); // Should not be called on InsecureServerCredentials. } + + private: + bool IsInsecure() const override { return true; } }; } // namespace diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h index 00ad794a04..dc9fadeab0 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/constants.h @@ -21,6 +21,8 @@ #include <grpc/impl/codegen/port_platform.h> +#include <stddef.h> + namespace grpc { namespace load_reporter { diff --git a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc index f07fa812a7..16542bfddf 100644 --- a/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc +++ b/contrib/libs/grpc/src/cpp/server/load_reporter/load_data_store.cc @@ -43,7 +43,7 @@ bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map, auto it = map.find(key); if (it != map.end()) { size_t erased = it->second.erase(value); - if (it->second.size() == 0) { + if (it->second.empty()) { map.erase(it); } return erased; diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc index 732602bcb7..b6ea87f4fe 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.cc @@ -55,18 +55,18 @@ void AuthMetadataProcessorAyncWrapper::Process( } void AuthMetadataProcessorAyncWrapper::InvokeProcessor( - grpc_auth_context* ctx, const grpc_metadata* md, size_t num_md, + grpc_auth_context* context, const grpc_metadata* md, size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) { AuthMetadataProcessor::InputMetadata metadata; for (size_t i = 0; i < num_md; i++) { metadata.insert(std::make_pair(StringRefFromSlice(&md[i].key), StringRefFromSlice(&md[i].value))); } - SecureAuthContext context(ctx); + SecureAuthContext ctx(context); AuthMetadataProcessor::OutputMetadata consumed_metadata; AuthMetadataProcessor::OutputMetadata response_metadata; - Status status = processor_->Process(metadata, &context, &consumed_metadata, + Status status = processor_->Process(metadata, &ctx, &consumed_metadata, &response_metadata); std::vector<grpc_metadata> consumed_md; @@ -145,8 +145,7 @@ std::shared_ptr<ServerCredentials> LocalServerCredentials( } std::shared_ptr<ServerCredentials> TlsServerCredentials( - const grpc::experimental::TlsCredentialsOptions& options) { - grpc::GrpcLibraryCodegen init; + const grpc::experimental::TlsServerCredentialsOptions& options) { return std::shared_ptr<ServerCredentials>(new SecureServerCredentials( grpc_tls_server_credentials_create(options.c_credentials_options()))); } diff --git a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h index 9e3fb3f9eb..f2b65c2864 100644 --- a/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h +++ b/contrib/libs/grpc/src/cpp/server/secure_server_credentials.h @@ -40,7 +40,7 @@ class AuthMetadataProcessorAyncWrapper final { const grpc_metadata* md, size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data); - AuthMetadataProcessorAyncWrapper( + explicit AuthMetadataProcessorAyncWrapper( const std::shared_ptr<AuthMetadataProcessor>& processor) : processor_(processor) { if (processor && processor->IsBlocking()) { @@ -69,7 +69,11 @@ class SecureServerCredentials final : public ServerCredentials { void SetAuthMetadataProcessor( const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) override; + grpc_server_credentials* c_creds() { return creds_; } + private: + SecureServerCredentials* AsSecureServerCredentials() override { return this; } + grpc_server_credentials* creds_; std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; }; diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 0cc00b365f..502d4f20ae 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -83,9 +83,9 @@ ServerBuilder& ServerBuilder::RegisterService(Service* service) { return *this; } -ServerBuilder& ServerBuilder::RegisterService(const TString& addr, +ServerBuilder& ServerBuilder::RegisterService(const TString& host, Service* service) { - services_.emplace_back(new NamedService(addr, service)); + services_.emplace_back(new NamedService(host, service)); return *this; } @@ -95,7 +95,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple generic services is unsupported for now. " "Dropping the service %p", - (void*)service); + service); } else { generic_service_ = service; } @@ -122,7 +122,7 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( gpr_log(GPR_ERROR, "Adding multiple generic services is unsupported for now. " "Dropping the service %p", - (void*)service); + service); } else { builder_->callback_generic_service_ = service; } @@ -130,6 +130,12 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( } #endif +ServerBuilder& ServerBuilder::experimental_type::SetContextAllocator( + std::unique_ptr<grpc::ContextAllocator> context_allocator) { + builder_->context_allocator_ = std::move(context_allocator); + return *builder_; +} + std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> ServerBuilder::experimental_type::AddExternalConnectionAcceptor( experimental_type::ExternalConnectionType type, @@ -331,7 +337,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { std::unique_ptr<grpc::Server> server(new grpc::Server( &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec, - std::move(acceptors_), resource_quota_, + std::move(acceptors_), server_config_fetcher_, resource_quota_, std::move(interceptor_creators_))); ServerInitializer* initializer = server->initializer(); @@ -369,6 +375,13 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { return nullptr; } +#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL + server->RegisterContextAllocator(std::move(context_allocator_)); +#else + server->experimental_registration()->RegisterContextAllocator( + std::move(context_allocator_)); +#endif + for (const auto& value : services_) { if (!server->RegisterService(value->host.get(), value->service)) { return nullptr; diff --git a/contrib/libs/grpc/src/cpp/server/server_callback.cc b/contrib/libs/grpc/src/cpp/server/server_callback.cc index 40aef8e735..f6b72c0fcc 100644 --- a/contrib/libs/grpc/src/cpp/server/server_callback.cc +++ b/contrib/libs/grpc/src/cpp/server/server_callback.cc @@ -35,14 +35,14 @@ void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { grpc_closure closure; ServerCallbackCall* call; explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) { - GRPC_CLOSURE_INIT(&closure, - [](void* void_arg, grpc_error*) { - ClosureWithArg* arg = - static_cast<ClosureWithArg*>(void_arg); - arg->call->CallOnDone(); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT( + &closure, + [](void* void_arg, grpc_error*) { + ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg); + arg->call->CallOnDone(); + delete arg; + }, + this, grpc_schedule_on_exec_ctx); } }; ClosureWithArg* arg = new ClosureWithArg(this); @@ -64,15 +64,15 @@ void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { ServerReactor* reactor; ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg) : call(call_arg), reactor(reactor_arg) { - GRPC_CLOSURE_INIT(&closure, - [](void* void_arg, grpc_error*) { - ClosureWithArg* arg = - static_cast<ClosureWithArg*>(void_arg); - arg->reactor->OnCancel(); - arg->call->MaybeDone(); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT( + &closure, + [](void* void_arg, grpc_error*) { + ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg); + arg->reactor->OnCancel(); + arg->call->MaybeDone(); + delete arg; + }, + this, grpc_schedule_on_exec_ctx); } }; ClosureWithArg* arg = new ClosureWithArg(this, reactor); diff --git a/contrib/libs/grpc/src/cpp/server/server_cc.cc b/contrib/libs/grpc/src/cpp/server/server_cc.cc index c2a911c7f7..dab7f488a7 100644 --- a/contrib/libs/grpc/src/cpp/server/server_cc.cc +++ b/contrib/libs/grpc/src/cpp/server/server_cc.cc @@ -42,8 +42,12 @@ #include <grpcpp/server_context.h> #include <grpcpp/support/time.h> +#include "y_absl/memory/memory.h" + #include "src/core/ext/transport/inproc/inproc_transport.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/completion_queue.h" @@ -83,12 +87,16 @@ void InitGlobalCallbacks() { class ShutdownTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return false; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { + return false; + } }; -class DummyTag : public internal::CompletionQueueTag { +class PhonyTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** /*tag*/, bool* /*status*/) { return true; } + bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { + return true; + } }; class UnimplementedAsyncRequestContext { @@ -311,8 +319,8 @@ class Server::UnimplementedAsyncResponse final grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus> { public: - UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); - ~UnimplementedAsyncResponse() { delete request_; } + explicit UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() override { delete request_; } bool FinalizeResult(void** tag, bool* status) override { if (grpc::internal::CallOpSet< @@ -332,199 +340,171 @@ class Server::UnimplementedAsyncResponse final class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: - SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) - : method_(method), - method_tag_(method_tag), - in_flight_(false), - has_request_payload_(method->method_type() == - grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING), - call_details_(nullptr), - cq_(nullptr) { - grpc_metadata_array_init(&request_metadata_); - } - - ~SyncRequest() { - if (call_details_) { - delete call_details_; - } - grpc_metadata_array_destroy(&request_metadata_); - } - - void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); } - - void TeardownRequest() { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method, + grpc_core::Server::RegisteredCallAllocation* data) + : SyncRequest(server, method) { + CommonSetup(data); + data->deadline = &deadline_; + data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; } - void Request(grpc_server* server, grpc_completion_queue* notify_cq) { - GPR_ASSERT(cq_ && !in_flight_); - in_flight_ = true; - if (method_tag_) { - if (grpc_server_request_registered_call( - server, method_tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this) != GRPC_CALL_OK) { - TeardownRequest(); - return; - } - } else { - if (!call_details_) { - call_details_ = new grpc_call_details; - grpc_call_details_init(call_details_); - } - if (grpc_server_request_call(server, &call_, call_details_, - &request_metadata_, cq_, notify_cq, - this) != GRPC_CALL_OK) { - TeardownRequest(); - return; - } - } + SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method, + grpc_core::Server::BatchCallAllocation* data) + : SyncRequest(server, method) { + CommonSetup(data); + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + data->details = call_details_; } - void PostShutdownCleanup() { - if (call_) { - grpc_call_unref(call_); - call_ = nullptr; + ~SyncRequest() override { + // The destructor should only cleanup those objects created in the + // constructor, since some paths may or may not actually go through the + // Run stage where other objects are allocated. + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); } - if (cq_) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + if (call_details_ != nullptr) { + grpc_call_details_destroy(call_details_); + delete call_details_; } + grpc_metadata_array_destroy(&request_metadata_); + server_->UnrefWithPossibleNotify(); } bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + delete this; + return false; } if (call_details_) { deadline_ = call_details_->deadline; - grpc_call_details_destroy(call_details_); - grpc_call_details_init(call_details_); } return true; } - // The CallData class represents a call that is "active" as opposed - // to just being requested. It wraps and takes ownership of the cq from - // the call request - class CallData final { - public: - explicit CallData(Server* server, SyncRequest* mrd) - : cq_(mrd->cq_), - ctx_(mrd->deadline_, &mrd->request_metadata_), - has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - request_(nullptr), - method_(mrd->method_), - call_( - mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), method_->method_type(), - server->interceptor_creators_)), - server_(server), - global_callbacks_(nullptr), - resources_(false) { - ctx_.set_call(mrd->call_); - ctx_.cq_ = &cq_; - GPR_ASSERT(mrd->in_flight_); - mrd->in_flight_ = false; - mrd->request_metadata_.count = 0; + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { + ctx_.Init(deadline_, &request_metadata_); + wrapped_call_.Init( + call_, server_, &cq_, server_->max_receive_message_size(), + ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(), + server_->interceptor_creators_)); + ctx_->ctx.set_call(call_); + ctx_->ctx.cq_ = &cq_; + request_metadata_.count = 0; + + global_callbacks_ = global_callbacks; + resources_ = resources; + + interceptor_methods_.SetCall(&*wrapped_call_); + interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_); + + if (has_request_payload_) { + // Set interception point for RECV MESSAGE + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + deserialized_request_ = handler->Deserialize(call_, request_payload_, + &request_status_, nullptr); + + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr); } - ~CallData() { - if (has_request_payload_ && request_payload_) { - grpc_byte_buffer_destroy(request_payload_); - } + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. } + } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, - bool resources) { - global_callbacks_ = global_callbacks; - resources_ = resources; + void ContinueRunAfterInterception() { + ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr); + global_callbacks_->PreSynchronousRequest(&ctx_->ctx); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_, + nullptr, nullptr)); + global_callbacks_->PostSynchronousRequest(&ctx_->ctx); - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + cq_.Shutdown(); - if (has_request_payload_) { - // Set interception point for RECV MESSAGE - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_, nullptr); + grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag(); + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - request_payload_ = nullptr; - interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_, nullptr); - } + // Ensure the cq_ is shutdown + grpc::PhonyTag ignored_tag; + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } - } + // Cleanup structures allocated during Run/ContinueRunAfterInterception + wrapped_call_.Destroy(); + ctx_.Destroy(); - void ContinueRunAfterInterception() { - { - ctx_.BeginCompletionOp(&call_, nullptr, nullptr); - global_callbacks_->PreSynchronousRequest(&ctx_); - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr, nullptr)); - request_ = nullptr; - global_callbacks_->PostSynchronousRequest(&ctx_); - - cq_.Shutdown(); - - grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - - /* Ensure the cq_ is shutdown */ - grpc::DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - } - delete this; - } + delete this; + } - private: - grpc::CompletionQueue cq_; - grpc::ServerContext ctx_; - const bool has_request_payload_; - grpc_byte_buffer* request_payload_; - void* request_; - grpc::Status request_status_; - grpc::internal::RpcServiceMethod* const method_; - grpc::internal::Call call_; - Server* server_; - std::shared_ptr<GlobalCallbacks> global_callbacks_; - bool resources_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; - }; + // For requests that must be only cleaned up but not actually Run + void Cleanup() { + cq_.Shutdown(); + grpc_call_unref(call_); + delete this; + } private: + SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method) + : server_(server), + method_(method), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), + cq_(grpc_completion_queue_create_for_pluck(nullptr)) {} + + template <class CallAllocation> + void CommonSetup(CallAllocation* data) { + server_->Ref(); + grpc_metadata_array_init(&request_metadata_); + data->tag = static_cast<void*>(this); + data->call = &call_; + data->initial_metadata = &request_metadata_; + data->cq = cq_.cq(); + } + + Server* const server_; grpc::internal::RpcServiceMethod* const method_; - void* const method_tag_; - bool in_flight_; const bool has_request_payload_; grpc_call* call_; - grpc_call_details* call_details_; + grpc_call_details* call_details_ = nullptr; gpr_timespec deadline_; grpc_metadata_array request_metadata_; - grpc_byte_buffer* request_payload_; - grpc_completion_queue* cq_; + grpc_byte_buffer* request_payload_ = nullptr; + grpc::CompletionQueue cq_; + grpc::Status request_status_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; + void* deserialized_request_ = nullptr; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + + // ServerContextWrapper allows ManualConstructor while using a private + // contructor of ServerContext via this friend class. + struct ServerContextWrapper { + ServerContext ctx; + + ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr) + : ctx(deadline, arr) {} + }; + + grpc_core::ManualConstructor<ServerContextWrapper> ctx_; + grpc_core::ManualConstructor<internal::Call> wrapped_call_; }; template <class ServerContextType> @@ -548,7 +528,10 @@ class Server::CallbackRequest final method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), cq_(cq), - tag_(this) { + tag_(this), + ctx_(server_->context_allocator() != nullptr + ? server_->context_allocator()->NewCallbackServerContext() + : nullptr) { CommonSetup(server, data); data->deadline = &deadline_; data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; @@ -563,18 +546,25 @@ class Server::CallbackRequest final has_request_payload_(false), call_details_(new grpc_call_details), cq_(cq), - tag_(this) { + tag_(this), + ctx_(server_->context_allocator() != nullptr + ? server_->context_allocator() + ->NewGenericCallbackServerContext() + : nullptr) { CommonSetup(server, data); grpc_call_details_init(call_details_); data->details = call_details_; } - ~CallbackRequest() { + ~CallbackRequest() override { delete call_details_; grpc_metadata_array_destroy(&request_metadata_); if (has_request_payload_ && request_payload_) { grpc_byte_buffer_destroy(request_payload_); } + if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) { + default_ctx_.Destroy(); + } server_->UnrefWithPossibleNotify(); } @@ -588,7 +578,7 @@ class Server::CallbackRequest final class CallbackCallTag : public grpc_experimental_completion_queue_functor { public: - CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) + explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req) : req_(req) { functor_run = &CallbackCallTag::StaticRun; // Set inlineable to true since this callback is internally-controlled @@ -627,10 +617,10 @@ class Server::CallbackRequest final } // Bind the call, deadline, and metadata from what we got - req_->ctx_.set_call(req_->call_); - req_->ctx_.cq_ = req_->cq_; - req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, - &req_->request_metadata_); + req_->ctx_->set_call(req_->call_); + req_->ctx_->cq_ = req_->cq_; + req_->ctx_->BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call @@ -639,7 +629,7 @@ class Server::CallbackRequest final grpc::internal::Call( req_->call_, req_->server_, req_->cq_, req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( + req_->ctx_->set_server_rpc_info( req_->method_name(), (req_->method_ != nullptr) ? req_->method_->method_type() @@ -653,7 +643,7 @@ class Server::CallbackRequest final grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( - &req_->ctx_.client_metadata_); + &req_->ctx_->client_metadata_); if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE @@ -679,7 +669,7 @@ class Server::CallbackRequest final ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, + call_, req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); } }; @@ -688,9 +678,16 @@ class Server::CallbackRequest final void CommonSetup(Server* server, CallAllocation* data) { server->Ref(); grpc_metadata_array_init(&request_metadata_); - data->tag = &tag_; + data->tag = static_cast<void*>(&tag_); data->call = &call_; data->initial_metadata = &request_metadata_; + if (ctx_ == nullptr) { + default_ctx_.Init(); + ctx_ = &*default_ctx_; + ctx_alloc_by_default_ = true; + } + ctx_->set_context_allocator(server->context_allocator()); + data->cq = cq_->cq(); } Server* const server_; @@ -705,8 +702,10 @@ class Server::CallbackRequest final gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; + bool ctx_alloc_by_default_ = false; CallbackCallTag tag_; - ServerContextType ctx_; + ServerContextType* ctx_ = nullptr; + grpc_core::ManualConstructor<ServerContextType> default_ctx_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; @@ -723,8 +722,8 @@ bool Server::CallbackRequest< if (*status) { deadline_ = call_details_->deadline; // TODO(yangg) remove the copy here - ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); - ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); + ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method); + ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host); } grpc_slice_unref(call_details_->method); grpc_slice_unref(call_details_->host); @@ -740,7 +739,7 @@ const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() template <> const char* Server::CallbackRequest< grpc::GenericCallbackServerContext>::method_name() const { - return ctx_.method().c_str(); + return ctx_->method().c_str(); } // Implementation of ThreadManager. Each instance of SyncRequestThreadManager @@ -779,44 +778,39 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } void DoWork(void* tag, bool ok, bool resources) override { + (void)ok; SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - if (!sync_req) { - // No tag. Nothing to work on. This is an unlikley scenario and possibly a - // bug in RPC Manager implementation. - gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag"); - return; - } - - if (ok) { - // Calldata takes ownership of the completion queue and interceptors - // inside sync_req - auto* cd = new SyncRequest::CallData(server_, sync_req); - // Prepare for the next request - if (!IsShutdown()) { - sync_req->SetupRequest(); // Create new completion queue for sync_req - sync_req->Request(server_->c_server(), server_cq_->cq()); - } + // Under the AllocatingRequestMatcher model we will never see an invalid tag + // here. + GPR_DEBUG_ASSERT(sync_req != nullptr); + GPR_DEBUG_ASSERT(ok); - GPR_TIMER_SCOPE("cd.Run()", 0); - cd->Run(global_callbacks_, resources); - } - // TODO (sreek) If ok is false here (which it isn't in case of - // grpc_request_registered_call), we should still re-queue the request - // object + GPR_TIMER_SCOPE("sync_req->Run()", 0); + sync_req->Run(global_callbacks_, resources); } void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { - sync_requests_.emplace_back(new SyncRequest(method, tag)); + server_->server()->core_server->SetRegisteredMethodAllocator( + server_cq_->cq(), tag, [this, method] { + grpc_core::Server::RegisteredCallAllocation result; + new SyncRequest(server_, method, &result); + return result; + }); + has_sync_method_ = true; } void AddUnknownSyncMethod() { - if (!sync_requests_.empty()) { - unknown_method_.reset(new grpc::internal::RpcServiceMethod( + if (has_sync_method_) { + unknown_method_ = y_absl::make_unique<grpc::internal::RpcServiceMethod>( "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, - new grpc::internal::UnknownMethodHandler)); - sync_requests_.emplace_back( - new SyncRequest(unknown_method_.get(), nullptr)); + new grpc::internal::UnknownMethodHandler); + server_->server()->core_server->SetBatchMethodAllocator( + server_cq_->cq(), [this] { + grpc_core::Server::BatchCallAllocation result; + new SyncRequest(server_, unknown_method_.get(), &result); + return result; + }); } } @@ -831,27 +825,14 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void* tag; bool ok; while (server_cq_->Next(&tag, &ok)) { - if (ok) { - // If a request was pulled off the queue, it means that the thread - // handling the request added it to the completion queue after shutdown - // was called - because the thread had already started and checked the - // shutdown flag before shutdown was called. In this case, we simply - // clean it up here, *after* calling wait on all the worker threads, at - // which point we are certain no in-flight requests will add more to the - // queue. This fixes an intermittent memory leak on shutdown. - SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - sync_req->PostShutdownCleanup(); - } + // This problem can arise if the server CQ gets a request queued to it + // before it gets shutdown but then pulls it after shutdown. + static_cast<SyncRequest*>(tag)->Cleanup(); } } void Start() { - if (!sync_requests_.empty()) { - for (const auto& value : sync_requests_) { - value->SetupRequest(); - value->Request(server_->c_server(), server_cq_->cq()); - } - + if (has_sync_method_) { Initialize(); // ThreadManager's Initialize() } } @@ -860,7 +841,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { Server* server_; grpc::CompletionQueue* server_cq_; int cq_timeout_msec_; - std::vector<std::unique_ptr<SyncRequest>> sync_requests_; + bool has_sync_method_ = false; std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -873,6 +854,7 @@ Server::Server( int min_pollers, int max_pollers, int sync_cq_timeout_msec, std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors, + grpc_server_config_fetcher* server_config_fetcher, grpc_resource_quota* server_rq, std::vector< std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> @@ -936,13 +918,14 @@ Server::Server( } } server_ = grpc_server_create(&channel_args, nullptr); + grpc_server_set_config_fetcher(server_, server_config_fetcher); } Server::~Server() { { grpc::internal::ReleasableMutexLock lock(&mu_); if (started_ && !shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(); } else if (!started_) { // Shutdown the completion queues @@ -950,7 +933,12 @@ Server::~Server() { value->Shutdown(); } if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + callback_cq_->Shutdown(); + } else { + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + } callback_cq_ = nullptr; } } @@ -1005,7 +993,7 @@ static grpc_server_register_method_payload_handling PayloadHandlingForMethod( GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); } -bool Server::RegisterService(const TString* host, grpc::Service* service) { +bool Server::RegisterService(const TString* addr, grpc::Service* service) { bool has_async_methods = service->has_async_methods(); if (has_async_methods) { GPR_ASSERT(service->server_ == nullptr && @@ -1016,12 +1004,12 @@ bool Server::RegisterService(const TString* host, grpc::Service* service) { const char* method_name = nullptr; for (const auto& method : service->methods_) { - if (method.get() == nullptr) { // Handled by generic service if any. + if (method == nullptr) { // Handled by generic service if any. continue; } void* method_registration_tag = grpc_server_register_method( - server_, method->name(), host ? host->c_str() : nullptr, + server_, method->name(), addr ? addr->c_str() : nullptr, PayloadHandlingForMethod(method.get()), 0); if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", @@ -1118,7 +1106,9 @@ void Server::UnrefAndWaitLocked() { shutdown_done_ = true; return; // no need to wait on CV since done condition already set } - shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); + grpc::internal::WaitUntil( + &shutdown_done_cv_, &mu_, + [this]() Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; }); } void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { @@ -1157,7 +1147,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // service to handle any unimplemented methods using the default reactor // creator if (has_callback_methods_ && !has_callback_generic_service_) { - unimplemented_service_.reset(new grpc::CallbackGenericService); + unimplemented_service_ = y_absl::make_unique<grpc::CallbackGenericService>(); RegisterCallbackGenericService(unimplemented_service_.get()); } @@ -1167,13 +1157,27 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { } #endif - grpc_server_start(server_); + // If we have a generic service, all unmatched method names go there. + // Otherwise, we must provide at least one RPC request for an "unimplemented" + // RPC, which covers any RPC for a method name that isn't matched. If we + // have a sync service, let it be a sync unimplemented RPC, which must be + // registered before server start (to initialize an AllocatingRequestMatcher). + // If we have an AllocatingRequestMatcher, we can't also specify other + // unimplemented RPCs via explicit async requests, so we won't do so. If we + // only have async services, we can specify unimplemented RPCs on each async + // CQ so that some user polling thread will move them along as long as some + // progress is being made on any RPCs in the system. + bool unknown_rpc_needed = + !has_async_generic_service_ && !has_callback_generic_service_; + + if (unknown_rpc_needed && !sync_req_mgrs_.empty()) { + sync_req_mgrs_[0]->AddUnknownSyncMethod(); + unknown_rpc_needed = false; + } - if (!has_async_generic_service_ && !has_callback_generic_service_) { - for (const auto& value : sync_req_mgrs_) { - value->AddUnknownSyncMethod(); - } + grpc_server_start(server_); + if (unknown_rpc_needed) { for (size_t i = 0; i < num_cqs; i++) { if (cqs[i]->IsFrequentlyPolled()) { new UnimplementedAsyncRequest(this, cqs[i]); @@ -1182,14 +1186,15 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { if (health_check_cq != nullptr) { new UnimplementedAsyncRequest(this, health_check_cq); } + unknown_rpc_needed = false; } // If this server has any support for synchronous methods (has any sync // server CQs), make sure that we have a ResourceExhausted handler // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset( - new grpc::internal::ResourceExhaustedHandler); + resource_exhausted_handler_ = + y_absl::make_unique<grpc::internal::ResourceExhaustedHandler>(); } for (const auto& value : sync_req_mgrs_) { @@ -1219,7 +1224,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { /// The completion queue to use for server shutdown completion notification grpc::CompletionQueue shutdown_cq; - grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc::ShutdownTag shutdown_tag; // Phony shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); shutdown_cq.Shutdown(); @@ -1237,6 +1242,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has // successfully shutdown + // Drop the shutdown ref and wait for all other refs to drop as well. + UnrefAndWaitLocked(); + // Shutdown all ThreadManagers. This will try to gracefully stop all the // threads in the ThreadManagers (once they process any inflight requests) for (const auto& value : sync_req_mgrs_) { @@ -1248,13 +1256,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { value->Wait(); } - // Drop the shutdown ref and wait for all other refs to drop as well. - UnrefAndWaitLocked(); - // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + callback_cq_->Shutdown(); + } else { + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + } callback_cq_ = nullptr; } @@ -1265,7 +1275,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.SignalAll(); #ifndef NDEBUG // Unregister this server with the CQs passed into it by the user so that @@ -1326,13 +1336,19 @@ grpc::CompletionQueue* Server::CallbackCQ() { if (callback_cq_ != nullptr) { return callback_cq_; } - auto* shutdown_callback = new grpc::ShutdownCallback; - callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + auto* shutdown_callback = new grpc::ShutdownCallback; + callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } else { + // Otherwise we need to use the alternative CQ variant + callback_cq_ = CompletionQueue::CallbackAlternativeCQ(); + } return callback_cq_; } diff --git a/contrib/libs/grpc/src/cpp/server/server_context.cc b/contrib/libs/grpc/src/cpp/server/server_context.cc index 458ac20d87..df82c69ed8 100644 --- a/contrib/libs/grpc/src/cpp/server/server_context.cc +++ b/contrib/libs/grpc/src/cpp/server/server_context.cc @@ -28,6 +28,7 @@ #include <grpc/support/log.h> #include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/grpc_library.h> #include <grpcpp/support/server_callback.h> #include <grpcpp/support/time.h> @@ -37,6 +38,8 @@ namespace grpc { +static internal::GrpcLibraryInitializer g_gli_initializer; + // CompletionOp class ServerContextBase::CompletionOp final @@ -62,7 +65,7 @@ class ServerContextBase::CompletionOp final CompletionOp(CompletionOp&&) = delete; CompletionOp& operator=(CompletionOp&&) = delete; - ~CompletionOp() { + ~CompletionOp() override { if (call_.server_rpc_info()) { call_.server_rpc_info()->Unref(); } @@ -121,11 +124,12 @@ class ServerContextBase::CompletionOp final void ContinueFinalizeResultAfterInterception() override { done_intercepting_ = true; if (!has_tag_) { - /* We don't have a tag to return. */ + // We don't have a tag to return. Unref(); + // Unref can delete this, so do not access anything from this afterward. return; } - /* Start a dummy op so that we can return the tag */ + /* Start a phony op so that we can return the tag */ GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr) == GRPC_CALL_OK); } @@ -174,33 +178,41 @@ void ServerContextBase::CompletionOp::FillOps(internal::Call* call) { } bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { - // Decide whether to call the cancel callback within the lock - bool call_cancel; + // Decide whether to do the unref or call the cancel callback within the lock + bool do_unref = false; + bool has_tag = false; + bool call_cancel = false; { grpc_core::MutexLock lock(&mu_); if (done_intercepting_) { // We are done intercepting. - bool has_tag = has_tag_; + has_tag = has_tag_; if (has_tag) { *tag = tag_; } - Unref(); - return has_tag; - } - finalized_ = true; + // Release the lock before unreffing as Unref may delete this object + do_unref = true; + } else { + finalized_ = true; + + // If for some reason the incoming status is false, mark that as a + // cancellation. + // TODO(vjpai): does this ever happen? + if (!*status) { + cancelled_ = 1; + } - // If for some reason the incoming status is false, mark that as a - // cancellation. - // TODO(vjpai): does this ever happen? - if (!*status) { - cancelled_ = 1; + call_cancel = (cancelled_ != 0); + // Release the lock since we may call a callback and interceptors. } - - call_cancel = (cancelled_ != 0); - // Release the lock since we may call a callback and interceptors. } + if (do_unref) { + Unref(); + // Unref can delete this, so do not access anything from this afterward. + return has_tag; + } if (call_cancel && callback_controller_ != nullptr) { callback_controller_->MaybeCallOnCancel(); } @@ -214,6 +226,7 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { *tag = tag_; } Unref(); + // Unref can delete this, so do not access anything from this afterward. return has_tag; } // There are interceptors to be run. Return false for now. @@ -223,7 +236,9 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) { // ServerContextBase body ServerContextBase::ServerContextBase() - : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {} + : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) { + g_gli_initializer.summon(); +} ServerContextBase::ServerContextBase(gpr_timespec deadline, grpc_metadata_array* arr) @@ -240,6 +255,7 @@ void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline, ServerContextBase::~ServerContextBase() { if (completion_op_) { completion_op_->Unref(); + // Unref can delete completion_op_, so do not access it afterward. } if (rpc_info_) { rpc_info_->Unref(); diff --git a/contrib/libs/grpc/src/cpp/server/server_credentials.cc b/contrib/libs/grpc/src/cpp/server/server_credentials.cc index c3b3a8b379..36b5a52dc7 100644 --- a/contrib/libs/grpc/src/cpp/server/server_credentials.cc +++ b/contrib/libs/grpc/src/cpp/server/server_credentials.cc @@ -18,8 +18,13 @@ #include <grpcpp/security/server_credentials.h> +#include <grpcpp/impl/grpc_library.h> + namespace grpc { +static internal::GrpcLibraryInitializer g_gli_initializer; +ServerCredentials::ServerCredentials() { g_gli_initializer.summon(); } + ServerCredentials::~ServerCredentials() {} } // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc new file mode 100644 index 0000000000..b543f3f172 --- /dev/null +++ b/contrib/libs/grpc/src/cpp/server/xds_server_credentials.cc @@ -0,0 +1,41 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include "src/cpp/server/secure_server_credentials.h" + +namespace grpc { +namespace experimental { + +std::shared_ptr<ServerCredentials> XdsServerCredentials( + const std::shared_ptr<ServerCredentials>& fallback_credentials) { + GPR_ASSERT(fallback_credentials != nullptr); + if (fallback_credentials->IsInsecure()) { + grpc_server_credentials* insecure_creds = + grpc_insecure_server_credentials_create(); + auto xds_creds = std::make_shared<SecureServerCredentials>( + grpc_xds_server_credentials_create(insecure_creds)); + grpc_server_credentials_release(insecure_creds); + return xds_creds; + } + return std::make_shared<SecureServerCredentials>( + grpc_xds_server_credentials_create( + fallback_credentials->AsSecureServerCredentials()->c_creds())); +} + +} // namespace experimental +} // namespace grpc diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc index c8560aa81d..5155f610a8 100644 --- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc +++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.cc @@ -152,7 +152,7 @@ void ThreadManager::MainWorkLoop() { bool ok; WorkStatus work_status = PollForWork(&tag, &ok); - grpc_core::ReleasableMutexLock lock(&mu_); + grpc_core::LockableAndReleasableMutexLock lock(&mu_); // Reduce the number of pollers by 1 and check what happened with the poll num_pollers_--; bool done = false; @@ -179,7 +179,7 @@ void ThreadManager::MainWorkLoop() { max_active_threads_sofar_ = num_threads_; } // Drop lock before spawning thread to avoid contention - lock.Unlock(); + lock.Release(); WorkerThread* worker = new WorkerThread(this); if (worker->created()) { worker->Start(); @@ -195,17 +195,17 @@ void ThreadManager::MainWorkLoop() { // There is still at least some thread polling, so we can go on // even though we are below the number of pollers that we would // like to have (min_pollers_) - lock.Unlock(); + lock.Release(); } else { // There are no pollers to spare and we couldn't allocate // a new thread, so resources are exhausted! - lock.Unlock(); + lock.Release(); resource_exhausted = true; } } else { // There are a sufficient number of pollers available so we can do // the work and continue polling with our existing poller threads - lock.Unlock(); + lock.Release(); } // Lock is always released at this point - do the application work // or return resource exhausted if there is new work but we couldn't diff --git a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h index 43f1fd5585..aae2417787 100644 --- a/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h +++ b/contrib/libs/grpc/src/cpp/thread_manager/thread_manager.h @@ -119,7 +119,7 @@ class ThreadManager { // not be called (and the need for this WorkerThread class is eliminated) class WorkerThread { public: - WorkerThread(ThreadManager* thd_mgr); + explicit WorkerThread(ThreadManager* thd_mgr); ~WorkerThread(); bool created() const { return created_; } diff --git a/contrib/libs/grpc/src/cpp/util/error_details.cc b/contrib/libs/grpc/src/cpp/util/error_details.cc index dfd3351be1..0330f012c2 100644 --- a/contrib/libs/grpc/src/cpp/util/error_details.cc +++ b/contrib/libs/grpc/src/cpp/util/error_details.cc @@ -17,34 +17,3 @@ */ #include <grpcpp/support/error_details.h> - -#include "src/proto/grpc/status/status.pb.h" - -namespace grpc { - -grpc::Status ExtractErrorDetails(const grpc::Status& from, - ::google::rpc::Status* to) { - if (to == nullptr) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, ""); - } - if (!to->ParseFromString(TProtoStringType(from.error_details()))) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, ""); - } - return grpc::Status::OK; -} - -grpc::Status SetErrorDetails(const ::google::rpc::Status& from, - grpc::Status* to) { - if (to == nullptr) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, ""); - } - grpc::StatusCode code = grpc::StatusCode::UNKNOWN; - if (from.code() >= grpc::StatusCode::OK && - from.code() <= grpc::StatusCode::UNAUTHENTICATED) { - code = static_cast<grpc::StatusCode>(from.code()); - } - *to = grpc::Status(code, from.message(), from.SerializeAsString()); - return grpc::Status::OK; -} - -} // namespace grpc |