summaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/src/cpp/client/channel_cc.cc
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/grpc/src/cpp/client/channel_cc.cc
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/grpc/src/cpp/client/channel_cc.cc')
-rw-r--r--contrib/libs/grpc/src/cpp/client/channel_cc.cc252
1 files changed, 252 insertions, 0 deletions
diff --git a/contrib/libs/grpc/src/cpp/client/channel_cc.cc b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
new file mode 100644
index 00000000000..ac95c29efcd
--- /dev/null
+++ b/contrib/libs/grpc/src/cpp/client/channel_cc.cc
@@ -0,0 +1,252 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpcpp/channel.h>
+
+#include <cstring>
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpc/slice.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/completion_queue.h>
+#include <grpcpp/impl/call.h>
+#include <grpcpp/impl/codegen/call_op_set.h>
+#include <grpcpp/impl/codegen/completion_queue_tag.h>
+#include <grpcpp/impl/grpc_library.h>
+#include <grpcpp/impl/rpc_method.h>
+#include <grpcpp/security/credentials.h>
+#include <grpcpp/support/channel_arguments.h>
+#include <grpcpp/support/config.h>
+#include <grpcpp/support/status.h>
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+namespace grpc {
+
+static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer;
+Channel::Channel(const TString& host, grpc_channel* channel,
+ std::vector<std::unique_ptr<
+ ::grpc::experimental::ClientInterceptorFactoryInterface>>
+ interceptor_creators)
+ : host_(host), c_channel_(channel) {
+ interceptor_creators_ = std::move(interceptor_creators);
+ g_gli_initializer.summon();
+}
+
+Channel::~Channel() {
+ grpc_channel_destroy(c_channel_);
+ if (callback_cq_ != nullptr) {
+ callback_cq_->Shutdown();
+ }
+}
+
+namespace {
+
+inline grpc_slice SliceFromArray(const char* arr, size_t len) {
+ return g_core_codegen_interface->grpc_slice_from_copied_buffer(arr, len);
+}
+
+TString GetChannelInfoField(grpc_channel* channel,
+ grpc_channel_info* channel_info,
+ char*** channel_info_field) {
+ char* value = nullptr;
+ memset(channel_info, 0, sizeof(*channel_info));
+ *channel_info_field = &value;
+ grpc_channel_get_info(channel, channel_info);
+ if (value == nullptr) return "";
+ TString result = value;
+ gpr_free(value);
+ return result;
+}
+
+} // namespace
+
+TString Channel::GetLoadBalancingPolicyName() const {
+ grpc_channel_info channel_info;
+ return GetChannelInfoField(c_channel_, &channel_info,
+ &channel_info.lb_policy_name);
+}
+
+TString Channel::GetServiceConfigJSON() const {
+ grpc_channel_info channel_info;
+ return GetChannelInfoField(c_channel_, &channel_info,
+ &channel_info.service_config_json);
+}
+
+namespace experimental {
+
+void ChannelResetConnectionBackoff(Channel* channel) {
+ grpc_channel_reset_connect_backoff(channel->c_channel_);
+}
+
+} // namespace experimental
+
+::grpc::internal::Call Channel::CreateCallInternal(
+ const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
+ ::grpc::CompletionQueue* cq, size_t interceptor_pos) {
+ const bool kRegistered = method.channel_tag() && context->authority().empty();
+ grpc_call* c_call = nullptr;
+ if (kRegistered) {
+ c_call = grpc_channel_create_registered_call(
+ c_channel_, context->propagate_from_call_,
+ context->propagation_options_.c_bitmask(), cq->cq(),
+ method.channel_tag(), context->raw_deadline(), nullptr);
+ } else {
+ const ::TString* host_str = nullptr;
+ if (!context->authority_.empty()) {
+ host_str = &context->authority_;
+ } else if (!host_.empty()) {
+ host_str = &host_;
+ }
+ grpc_slice method_slice =
+ SliceFromArray(method.name(), strlen(method.name()));
+ grpc_slice host_slice;
+ if (host_str != nullptr) {
+ host_slice = ::grpc::SliceFromCopiedString(*host_str);
+ }
+ c_call = grpc_channel_create_call(
+ c_channel_, context->propagate_from_call_,
+ context->propagation_options_.c_bitmask(), cq->cq(), method_slice,
+ host_str == nullptr ? nullptr : &host_slice, context->raw_deadline(),
+ nullptr);
+ grpc_slice_unref(method_slice);
+ if (host_str != nullptr) {
+ grpc_slice_unref(host_slice);
+ }
+ }
+ grpc_census_call_set_context(c_call, context->census_context());
+
+ // ClientRpcInfo should be set before call because set_call also checks
+ // whether the call has been cancelled, and if the call was cancelled, we
+ // should notify the interceptors too.
+ auto* info =
+ context->set_client_rpc_info(method.name(), method.method_type(), this,
+ interceptor_creators_, interceptor_pos);
+ context->set_call(c_call, shared_from_this());
+
+ return ::grpc::internal::Call(c_call, this, cq, info);
+}
+
+::grpc::internal::Call Channel::CreateCall(
+ const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
+ CompletionQueue* cq) {
+ return CreateCallInternal(method, context, cq, 0);
+}
+
+void Channel::PerformOpsOnCall(::grpc::internal::CallOpSetInterface* ops,
+ ::grpc::internal::Call* call) {
+ ops->FillOps(
+ call); // Make a copy of call. It's fine since Call just has pointers
+}
+
+void* Channel::RegisterMethod(const char* method) {
+ return grpc_channel_register_call(
+ c_channel_, method, host_.empty() ? nullptr : host_.c_str(), nullptr);
+}
+
+grpc_connectivity_state Channel::GetState(bool try_to_connect) {
+ return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
+}
+
+namespace {
+
+class TagSaver final : public ::grpc::internal::CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() override {}
+ bool FinalizeResult(void** tag, bool* /*status*/) override {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+
+ private:
+ void* tag_;
+};
+
+} // namespace
+
+void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline,
+ ::grpc::CompletionQueue* cq, void* tag) {
+ TagSaver* tag_saver = new TagSaver(tag);
+ grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
+ cq->cq(), tag_saver);
+}
+
+bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline) {
+ ::grpc::CompletionQueue cq;
+ bool ok = false;
+ void* tag = nullptr;
+ NotifyOnStateChangeImpl(last_observed, deadline, &cq, nullptr);
+ cq.Next(&tag, &ok);
+ GPR_ASSERT(tag == nullptr);
+ return ok;
+}
+
+namespace {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
+ public:
+ ShutdownCallback() {
+ functor_run = &ShutdownCallback::Run;
+ // Set inlineable to true since this callback is trivial and thus does not
+ // need to be run from the executor (triggering a thread hop). This should
+ // only be used by internal callbacks like this and not by user application
+ // code.
+ inlineable = true;
+ }
+ // TakeCQ takes ownership of the cq into the shutdown callback
+ // so that the shutdown callback will be responsible for destroying it
+ void TakeCQ(::grpc::CompletionQueue* cq) { cq_ = cq; }
+
+ // The Run function will get invoked by the completion queue library
+ // when the shutdown is actually complete
+ static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ delete callback->cq_;
+ delete callback;
+ }
+
+ private:
+ ::grpc::CompletionQueue* cq_ = nullptr;
+};
+} // namespace
+
+::grpc::CompletionQueue* Channel::CallbackCQ() {
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-channel CQ registered
+ grpc::internal::MutexLock l(&mu_);
+ if (callback_cq_ == nullptr) {
+ auto* shutdown_callback = new ShutdownCallback;
+ callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ }
+ return callback_cq_;
+}
+
+} // namespace grpc