diff options
author | leonidlazarev <leonidlazarev@yandex-team.com> | 2023-07-17 19:35:29 +0300 |
---|---|---|
committer | leonidlazarev <leonidlazarev@yandex-team.com> | 2023-07-17 19:35:29 +0300 |
commit | cb8e9a6330e4e5d9a0e2f8506e7469bbd641ec63 (patch) | |
tree | eddb7b81e7d1f5a7ac8078591799509e95572f4e /contrib/libs/grpc/include/grpcpp/support | |
parent | 029cf29f3669091012394221f00dfa0f3631d91b (diff) | |
download | ydb-cb8e9a6330e4e5d9a0e2f8506e7469bbd641ec63.tar.gz |
feat grpc: update to grpc 1.53.1
update grpc to 1.53.1
update grpcio/py3 to 1.53.1
Added patches:
22-grpc-code-output.patch - allow translation of grpc code to internal string type.
23-max-thread-limitation.patch - to provide interface for settings of thread number limit, as
grpc::DynamicThreadPool doesn't provide interface to limit thread number anymore.
24-support_for-non-abort-grpc.patch - generate exception instead of application crash
25-forkable-destruction-order.patch - correct forkable logic for TimerManager
27-skip-child-post-fork-operations.patch - allow to skip child post fork operations to exclude UB (used for unified agent only)
pr33495_fox_nested_fork.patch - fix issues with nested forks
pr33582_fork_handler.patch - disable fork handler support if it is not requested intentionally
Diffstat (limited to 'contrib/libs/grpc/include/grpcpp/support')
23 files changed, 5128 insertions, 399 deletions
diff --git a/contrib/libs/grpc/include/grpcpp/support/async_stream.h b/contrib/libs/grpc/include/grpcpp/support/async_stream.h index 18e84dfcdb..1bf67d301d 100644 --- a/contrib/libs/grpc/include/grpcpp/support/async_stream.h +++ b/contrib/libs/grpc/include/grpcpp/support/async_stream.h @@ -1,28 +1,28 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H #define GRPCPP_SUPPORT_ASYNC_STREAM_H #include <grpc/grpc.h> +#include <grpc/support/log.h> #include <grpcpp/impl/call.h> -#include <grpcpp/impl/codegen/channel_interface.h> -#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/channel_interface.h> #include <grpcpp/impl/service_type.h> #include <grpcpp/server_context.h> #include <grpcpp/support/status.h> @@ -185,8 +185,8 @@ class ClientAsyncReaderFactory { grpc::ClientContext* context, const W& request, bool start, void* tag) { grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader<R>))) + return new ( + grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncReader<R>))) ClientAsyncReader<R>(call, context, request, start, tag); } }; @@ -200,7 +200,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { public: // always allocated against a call arena, no memory free required static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader)); + GPR_ASSERT(size == sizeof(ClientAsyncReader)); } // This operator should never be called as the memory should be freed as part @@ -208,10 +208,10 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { // delete to the operator new so that some compilers will not complain (see // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + static void operator delete(void*, void*) { GPR_ASSERT(false); } void StartCall(void* tag) override { - GPR_CODEGEN_ASSERT(!started_); + GPR_ASSERT(!started_); started_ = true; StartCallInternal(tag); } @@ -225,8 +225,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { /// calling code can access the received metadata through the /// \a ClientContext. void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + GPR_ASSERT(started_); + GPR_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); @@ -234,7 +234,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { } void Read(R* msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { read_ops_.RecvInitialMetadata(context_); @@ -249,7 +249,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata received from the server. void Finish(grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); @@ -265,12 +265,12 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { const W& request, bool start, void* tag) : context_(context), call_(call), started_(start) { // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); + GPR_ASSERT(init_ops_.SendMessage(request).ok()); init_ops_.ClientSendClose(); if (start) { StartCallInternal(tag); } else { - GPR_CODEGEN_ASSERT(tag == nullptr); + GPR_ASSERT(tag == nullptr); } } @@ -318,7 +318,7 @@ class ClientAsyncWriterFactory { /// Create a stream object. /// Start the RPC if \a start is set /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. + /// initial metadata sent) and \a request has been written out. /// If \a start is not set, \a tag must be nullptr and the actual call /// must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata @@ -333,8 +333,8 @@ class ClientAsyncWriterFactory { grpc::ClientContext* context, R* response, bool start, void* tag) { grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter<W>))) + return new ( + grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncWriter<W>))) ClientAsyncWriter<W>(call, context, response, start, tag); } }; @@ -348,7 +348,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { public: // always allocated against a call arena, no memory free required static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter)); + GPR_ASSERT(size == sizeof(ClientAsyncWriter)); } // This operator should never be called as the memory should be freed as part @@ -356,10 +356,10 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { // delete to the operator new so that some compilers will not complain (see // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + static void operator delete(void*, void*) { GPR_ASSERT(false); } void StartCall(void* tag) override { - GPR_CODEGEN_ASSERT(!started_); + GPR_ASSERT(!started_); started_ = true; StartCallInternal(tag); } @@ -372,8 +372,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { /// associated with this call is updated, and the calling code can access /// the received metadata through the \a ClientContext. void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + GPR_ASSERT(started_); + GPR_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); @@ -381,27 +381,27 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } void Write(const W& msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } void Write(const W& msg, grpc::WriteOptions options, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); write_ops_.set_output_tag(tag); if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); } // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); call_.PerformOps(&write_ops_); } void WritesDone(void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); write_ops_.set_output_tag(tag); write_ops_.ClientSendClose(); call_.PerformOps(&write_ops_); @@ -415,7 +415,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { /// - attempts to fill in the \a response parameter passed to this class's /// constructor with the server's response message. void Finish(grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); @@ -435,7 +435,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { if (start) { StartCallInternal(tag); } else { - GPR_CODEGEN_ASSERT(tag == nullptr); + GPR_ASSERT(tag == nullptr); } } @@ -488,7 +488,7 @@ class ClientAsyncReaderWriterFactory { /// Create a stream object. /// Start the RPC request if \a start is set. /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). If \a start is not set, \a tag must be + /// initial metadata sent). If \a start is not set, \a tag must be /// nullptr and the actual call must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. @@ -498,8 +498,8 @@ class ClientAsyncReaderWriterFactory { bool start, void* tag) { grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter<W, R>))) + return new (grpc_call_arena_alloc(call.call(), + sizeof(ClientAsyncReaderWriter<W, R>))) ClientAsyncReaderWriter<W, R>(call, context, start, tag); } }; @@ -515,7 +515,7 @@ class ClientAsyncReaderWriter final public: // always allocated against a call arena, no memory free required static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter)); + GPR_ASSERT(size == sizeof(ClientAsyncReaderWriter)); } // This operator should never be called as the memory should be freed as part @@ -523,10 +523,10 @@ class ClientAsyncReaderWriter final // delete to the operator new so that some compilers will not complain (see // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + static void operator delete(void*, void*) { GPR_ASSERT(false); } void StartCall(void* tag) override { - GPR_CODEGEN_ASSERT(!started_); + GPR_ASSERT(!started_); started_ = true; StartCallInternal(tag); } @@ -539,8 +539,8 @@ class ClientAsyncReaderWriter final /// is updated with it, and then the receiving initial metadata can /// be accessed through this \a ClientContext. void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + GPR_ASSERT(started_); + GPR_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); @@ -548,7 +548,7 @@ class ClientAsyncReaderWriter final } void Read(R* msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { read_ops_.RecvInitialMetadata(context_); @@ -558,27 +558,27 @@ class ClientAsyncReaderWriter final } void Write(const W& msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } void Write(const W& msg, grpc::WriteOptions options, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); write_ops_.set_output_tag(tag); if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); } // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); call_.PerformOps(&write_ops_); } void WritesDone(void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); write_ops_.set_output_tag(tag); write_ops_.ClientSendClose(); call_.PerformOps(&write_ops_); @@ -589,7 +589,7 @@ class ClientAsyncReaderWriter final /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void Finish(grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); + GPR_ASSERT(started_); finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); @@ -606,7 +606,7 @@ class ClientAsyncReaderWriter final if (start) { StartCallInternal(tag); } else { - GPR_CODEGEN_ASSERT(tag == nullptr); + GPR_ASSERT(tag == nullptr); } } @@ -651,7 +651,7 @@ class ServerAsyncReaderInterface /// /// It is appropriate to call this method when: /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous + /// implicitly, or explicitly because a previous /// \a AsyncReaderInterface::Read operation with a non-ok result, /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). /// @@ -706,7 +706,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { /// - The initial metadata that will be sent to the client from this op will /// be taken from the \a ServerContext associated with the call. void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + GPR_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, @@ -727,7 +727,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { /// See the \a ServerAsyncReaderInterface.Read method for semantics /// /// Side effect: - /// - also sends initial metadata if not alreay sent. + /// - also sends initial metadata if not already sent. /// - uses the \a ServerContext associated with this call to send possible /// initial and trailing metadata. /// @@ -758,14 +758,14 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { /// See the \a ServerAsyncReaderInterface.Read method for semantics /// /// Side effect: - /// - also sends initial metadata if not alreay sent. + /// - also sends initial metadata if not already sent. /// - uses the \a ServerContext associated with this call to send possible /// initial and trailing metadata. /// /// gRPC doesn't take ownership or a reference to \a status, so it is safe to /// to deallocate once FinishWithError returns. void FinishWithError(const grpc::Status& status, void* tag) override { - GPR_CODEGEN_ASSERT(!status.ok()); + GPR_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, @@ -805,7 +805,7 @@ class ServerAsyncWriterInterface /// /// It is appropriate to call this method when either: /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a + /// implicitly, or explicitly because a previous \a /// AsyncReaderInterface::Read operation with a non-ok /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. /// * it is desired to end the call early with some non-OK status code. @@ -855,7 +855,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { /// /// \param[in] tag Tag identifying this request. void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + GPR_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, @@ -871,7 +871,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { write_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } @@ -883,7 +883,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); call_.PerformOps(&write_ops_); } @@ -902,7 +902,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { write_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&write_ops_); options.set_buffer_hint(); - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&write_ops_); } @@ -967,7 +967,7 @@ class ServerAsyncReaderWriterInterface /// /// It is appropriate to call this method when either: /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a + /// implicitly, or explicitly because a previous \a /// AsyncReaderInterface::Read operation /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' /// with 'false'. @@ -1021,7 +1021,7 @@ class ServerAsyncReaderWriter final /// /// \param[in] tag Tag identifying this request. void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + GPR_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, @@ -1043,7 +1043,7 @@ class ServerAsyncReaderWriter final write_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } @@ -1053,7 +1053,7 @@ class ServerAsyncReaderWriter final options.set_buffer_hint(); } EnsureInitialMetadataSent(&write_ops_); - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); call_.PerformOps(&write_ops_); } @@ -1073,7 +1073,7 @@ class ServerAsyncReaderWriter final write_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&write_ops_); options.set_buffer_hint(); - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&write_ops_); } diff --git a/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h b/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h index a9886031f4..a831d73f72 100644 --- a/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h +++ b/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h @@ -1,31 +1,31 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H #include <grpc/grpc.h> +#include <grpc/support/log.h> #include <grpcpp/client_context.h> #include <grpcpp/impl/call.h> +#include <grpcpp/impl/call_op_set.h> #include <grpcpp/impl/call_op_set_interface.h> -#include <grpcpp/impl/codegen/call_op_set.h> -#include <grpcpp/impl/codegen/channel_interface.h> -#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/channel_interface.h> #include <grpcpp/impl/service_type.h> #include <grpcpp/server_context.h> #include <grpcpp/support/status.h> @@ -79,7 +79,7 @@ class ClientAsyncResponseReaderHelper { public: /// Start a call and write the request out if \a start is set. /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. + /// initial metadata sent) and \a request has been written out. /// If \a start is not set, the actual call must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. @@ -95,10 +95,9 @@ class ClientAsyncResponseReaderHelper { const grpc::internal::RpcMethod& method, grpc::ClientContext* context, const W& request) /* __attribute__((noinline)) */ { grpc::internal::Call call = channel->CreateCall(method, context, cq); - ClientAsyncResponseReader<R>* result = - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader<R>))) - ClientAsyncResponseReader<R>(call, context); + ClientAsyncResponseReader<R>* result = new (grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader<R>))) + ClientAsyncResponseReader<R>(call, context); SetupRequest<BaseR, BaseW>( call.call(), &result->single_buf_, &result->read_initial_metadata_, &result->finish_, static_cast<const BaseW&>(request)); @@ -128,11 +127,10 @@ class ClientAsyncResponseReaderHelper { grpc::internal::CallOpRecvMessage<R>, grpc::internal::CallOpClientRecvStatus>; SingleBufType* single_buf = - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(SingleBufType))) SingleBufType; + new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType; *single_buf_ptr = single_buf; // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok()); + GPR_ASSERT(single_buf->SendMessage(request).ok()); single_buf->ClientSendClose(); // The purpose of the following functions is to type-erase the actual @@ -154,7 +152,7 @@ class ClientAsyncResponseReaderHelper { // will be static-cast'ed back to the class specified here by hiding that // class information inside the function definition. Note that this feature // expects the class being specified here for R to be a base-class of the - // "real" R without any multiple-inheritance (as applies in protbuf wrt + // "real" R without any multiple-inheritance (as applies in protobuf wrt // MessageLite) *finish = [](ClientContext* context, internal::Call* call, bool initial_metadata_read, @@ -166,8 +164,8 @@ class ClientAsyncResponseReaderHelper { grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>, grpc::internal::CallOpClientRecvStatus>; FinishBufType* finish_buf = - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call->call(), sizeof(FinishBufType))) FinishBufType; + new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType))) + FinishBufType; *finish_buf_ptr = finish_buf; finish_buf->set_output_tag(tag); finish_buf->RecvMessage(static_cast<R*>(msg)); @@ -222,7 +220,7 @@ class ClientAsyncResponseReader final public: // always allocated against a call arena, no memory free required static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader)); + GPR_ASSERT(size == sizeof(ClientAsyncResponseReader)); } // This operator should never be called as the memory should be freed as part @@ -230,10 +228,10 @@ class ClientAsyncResponseReader final // delete to the operator new so that some compilers will not complain (see // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + static void operator delete(void*, void*) { GPR_ASSERT(false); } void StartCall() override { - GPR_CODEGEN_DEBUG_ASSERT(!started_); + GPR_DEBUG_ASSERT(!started_); started_ = true; internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); } @@ -245,8 +243,8 @@ class ClientAsyncResponseReader final /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_DEBUG_ASSERT(started_); - GPR_CODEGEN_DEBUG_ASSERT(!context_->initial_metadata_received_); + GPR_DEBUG_ASSERT(started_); + GPR_DEBUG_ASSERT(!context_->initial_metadata_received_); read_initial_metadata_(context_, &call_, single_buf_, tag); initial_metadata_read_ = true; } @@ -257,7 +255,7 @@ class ClientAsyncResponseReader final /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void Finish(R* msg, grpc::Status* status, void* tag) override { - GPR_CODEGEN_DEBUG_ASSERT(started_); + GPR_DEBUG_ASSERT(started_); finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, static_cast<void*>(msg), status, tag); } @@ -306,7 +304,7 @@ class ServerAsyncResponseWriter final /// /// \param[in] tag Tag identifying this request. void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + GPR_ASSERT(!ctx_->sent_initial_metadata_); meta_buf_.set_output_tag(tag); meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, @@ -375,7 +373,7 @@ class ServerAsyncResponseWriter final /// deallocate them once the Finish operation is complete (i.e. a result /// arrives in the completion queue). void FinishWithError(const grpc::Status& status, void* tag) { - GPR_CODEGEN_ASSERT(!status.ok()); + GPR_ASSERT(!status.ok()); finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, diff --git a/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h b/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h index 7ca7c4b258..5bbfb66496 100644 --- a/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h +++ b/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h @@ -1,20 +1,20 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_BYTE_BUFFER_H #define GRPCPP_SUPPORT_BYTE_BUFFER_H @@ -24,7 +24,6 @@ #include <grpc/byte_buffer.h> #include <grpc/grpc.h> #include <grpc/support/log.h> -#include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/serialization_traits.h> #include <grpcpp/support/config.h> #include <grpcpp/support/slice.h> @@ -85,11 +84,11 @@ class ByteBuffer final { // than its advertised side effect of increasing the reference count of the // slices it processes, and such an increase does not affect the semantics // seen by the caller of this constructor. - buffer_ = g_core_codegen_interface->grpc_raw_byte_buffer_create( + buffer_ = grpc_raw_byte_buffer_create( reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); } - /// Constuct a byte buffer by referencing elements of existing buffer + /// Construct a byte buffer by referencing elements of existing buffer /// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not /// a deep copy; it is just a referencing. As a result, its performance is /// size-independent. @@ -97,7 +96,7 @@ class ByteBuffer final { ~ByteBuffer() { if (buffer_) { - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_); + grpc_byte_buffer_destroy(buffer_); } } @@ -110,7 +109,7 @@ class ByteBuffer final { } if (buf.buffer_) { // then copy - buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buf.buffer_); + buffer_ = grpc_byte_buffer_copy(buf.buffer_); } return *this; } @@ -128,7 +127,7 @@ class ByteBuffer final { /// Remove all data. void Clear() { if (buffer_) { - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_); + grpc_byte_buffer_destroy(buffer_); buffer_ = nullptr; } } @@ -138,9 +137,7 @@ class ByteBuffer final { /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable. /// This is not a deep copy; it is a referencing and its performance /// is size-independent. - void Duplicate() { - buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buffer_); - } + void Duplicate() { buffer_ = grpc_byte_buffer_copy(buffer_); } /// Forget underlying byte buffer without destroying /// Use this only for un-owned byte buffers @@ -148,9 +145,7 @@ class ByteBuffer final { /// Buffer size in bytes. size_t Length() const { - return buffer_ == nullptr - ? 0 - : g_core_codegen_interface->grpc_byte_buffer_length(buffer_); + return buffer_ == nullptr ? 0 : grpc_byte_buffer_length(buffer_); } /// Swap the state of *this and *other. @@ -203,14 +198,14 @@ class ByteBuffer final { class ByteBufferPointer { public: - /* NOLINTNEXTLINE(google-explicit-constructor) */ + // NOLINTNEXTLINE(google-explicit-constructor) ByteBufferPointer(const ByteBuffer* b) : bbuf_(const_cast<ByteBuffer*>(b)) {} - /* NOLINTNEXTLINE(google-explicit-constructor) */ + // NOLINTNEXTLINE(google-explicit-constructor) operator ByteBuffer*() { return bbuf_; } - /* NOLINTNEXTLINE(google-explicit-constructor) */ + // NOLINTNEXTLINE(google-explicit-constructor) operator grpc_byte_buffer*() { return bbuf_->buffer_; } - /* NOLINTNEXTLINE(google-explicit-constructor) */ + // NOLINTNEXTLINE(google-explicit-constructor) operator grpc_byte_buffer**() { return &bbuf_->buffer_; } private: @@ -230,7 +225,7 @@ class SerializationTraits<ByteBuffer, void> { bool* own_buffer) { *buffer = source; *own_buffer = true; - return g_core_codegen_interface->ok(); + return grpc::Status::OK; } }; diff --git a/contrib/libs/grpc/include/grpcpp/support/callback_common.h b/contrib/libs/grpc/include/grpcpp/support/callback_common.h new file mode 100644 index 0000000000..a99a47dbf4 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/callback_common.h @@ -0,0 +1,221 @@ +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_SUPPORT_CALLBACK_COMMON_H +#define GRPCPP_SUPPORT_CALLBACK_COMMON_H + +#include <functional> + +#include <grpc/grpc.h> +#include <grpc/impl/grpc_types.h> +#include <grpc/support/log.h> +#include <grpcpp/impl/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/completion_queue_tag.h> +#include <grpcpp/support/config.h> +#include <grpcpp/support/status.h> + +namespace grpc { +namespace internal { + +/// An exception-safe way of invoking a user-specified callback function +// TODO(vjpai): decide whether it is better for this to take a const lvalue +// parameter or an rvalue parameter, or if it even matters +template <class Func, class... Args> +void CatchingCallback(Func&& func, Args&&... args) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(std::forward<Args>(args)...); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(std::forward<Args>(args)...); +#endif // GRPC_ALLOW_EXCEPTIONS +} + +template <class Reactor, class Func, class... Args> +Reactor* CatchingReactorGetter(Func&& func, Args&&... args) { +#if GRPC_ALLOW_EXCEPTIONS + try { + return func(std::forward<Args>(args)...); + } catch (...) { + // fail the RPC, don't crash the library + return nullptr; + } +#else // GRPC_ALLOW_EXCEPTIONS + return func(std::forward<Args>(args)...); +#endif // GRPC_ALLOW_EXCEPTIONS +} + +// The contract on these tags is that they are single-shot. They must be +// constructed and then fired at exactly one point. There is no expectation +// that they can be reused without reconstruction. + +class CallbackWithStatusTag : public grpc_completion_queue_functor { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_ASSERT(size == sizeof(CallbackWithStatusTag)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_ASSERT(false); } + + CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f, + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + grpc_call_ref(call); + functor_run = &CallbackWithStatusTag::StaticRun; + // A client-side callback should never be run inline since they will always + // have work to do from the user application. So, set the parent's + // inlineable field to false + inlineable = false; + } + ~CallbackWithStatusTag() {} + Status* status_ptr() { return &status_; } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(Status s) { + status_ = std::move(s); + Run(true); + } + + private: + grpc_call* call_; + std::function<void(Status)> func_; + CompletionQueueTag* ops_; + Status status_; + + static void StaticRun(grpc_completion_queue_functor* cb, int ok) { + static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + + if (!ops_->FinalizeResult(&ignored, &ok)) { + // The tag was swallowed + return; + } + GPR_ASSERT(ignored == ops_); + + // Last use of func_ or status_, so ok to move them out + auto func = std::move(func_); + auto status = std::move(status_); + func_ = nullptr; // reset to clear this out for sure + status_ = Status(); // reset to clear this out for sure + CatchingCallback(std::move(func), std::move(status)); + grpc_call_unref(call_); + } +}; + +/// CallbackWithSuccessTag can be reused multiple times, and will be used in +/// this fashion for streaming operations. As a result, it shouldn't clear +/// anything up until its destructor +class CallbackWithSuccessTag : public grpc_completion_queue_functor { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_ASSERT(size == sizeof(CallbackWithSuccessTag)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_ASSERT(false); } + + CallbackWithSuccessTag() : call_(nullptr) {} + + CallbackWithSuccessTag(const CallbackWithSuccessTag&) = delete; + CallbackWithSuccessTag& operator=(const CallbackWithSuccessTag&) = delete; + + ~CallbackWithSuccessTag() { Clear(); } + + // Set can only be called on a default-constructed or Clear'ed tag. + // It should never be called on a tag that was constructed with arguments + // or on a tag that has been Set before unless the tag has been cleared. + // can_inline indicates that this particular callback can be executed inline + // (without needing a thread hop) and is only used for library-provided server + // callbacks. + void Set(grpc_call* call, std::function<void(bool)> f, + CompletionQueueTag* ops, bool can_inline) { + GPR_ASSERT(call_ == nullptr); + grpc_call_ref(call); + call_ = call; + func_ = std::move(f); + ops_ = ops; + functor_run = &CallbackWithSuccessTag::StaticRun; + inlineable = can_inline; + } + + void Clear() { + if (call_ != nullptr) { + grpc_call* call = call_; + call_ = nullptr; + func_ = nullptr; + grpc_call_unref(call); + } + } + + CompletionQueueTag* ops() { return ops_; } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + /// check if this tag is currently set + // NOLINTNEXTLINE(google-explicit-constructor) + operator bool() const { return call_ != nullptr; } + + private: + grpc_call* call_; + std::function<void(bool)> func_; + CompletionQueueTag* ops_; + + static void StaticRun(grpc_completion_queue_functor* cb, int ok) { + static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + // Allow a "false" return value from FinalizeResult to silence the + // callback, just as it silences a CQ tag in the async cases +#ifndef NDEBUG + auto* ops = ops_; +#endif + bool do_callback = ops_->FinalizeResult(&ignored, &ok); + GPR_DEBUG_ASSERT(ignored == ops); + + if (do_callback) { + CatchingCallback(func_, ok); + } + } +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_SUPPORT_CALLBACK_COMMON_H diff --git a/contrib/libs/grpc/include/grpcpp/support/channel_arguments.h b/contrib/libs/grpc/include/grpcpp/support/channel_arguments.h index a09180de45..a3845467f1 100644 --- a/contrib/libs/grpc/include/grpcpp/support/channel_arguments.h +++ b/contrib/libs/grpc/include/grpcpp/support/channel_arguments.h @@ -1,20 +1,20 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_CHANNEL_ARGUMENTS_H #define GRPCPP_SUPPORT_CHANNEL_ARGUMENTS_H diff --git a/contrib/libs/grpc/include/grpcpp/support/client_callback.h b/contrib/libs/grpc/include/grpcpp/support/client_callback.h index c15bca0dbe..f096de3440 100644 --- a/contrib/libs/grpc/include/grpcpp/support/client_callback.h +++ b/contrib/libs/grpc/include/grpcpp/support/client_callback.h @@ -1,24 +1,1227 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H -#include <grpcpp/impl/codegen/client_callback.h> // IWYU pragma: export +#include <atomic> +#include <functional> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpcpp/impl/call.h> +#include <grpcpp/impl/call_op_set.h> +#include <grpcpp/impl/sync.h> +#include <grpcpp/support/callback_common.h> +#include <grpcpp/support/config.h> +#include <grpcpp/support/status.h> + +namespace grpc { +class Channel; +class ClientContext; + +namespace internal { +class RpcMethod; + +/// Perform a callback-based unary call. May optionally specify the base +/// class of the Request and Response so that the internal calls and structures +/// below this may be based on those base classes and thus achieve code reuse +/// across different RPCs (e.g., for protobuf, MessageLite would be a base +/// class). +/// TODO(vjpai): Combine as much as possible with the blocking unary call code +template <class InputMessage, class OutputMessage, + class BaseInputMessage = InputMessage, + class BaseOutputMessage = OutputMessage> +void CallbackUnaryCall(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + const InputMessage* request, OutputMessage* result, + std::function<void(grpc::Status)> on_completion) { + static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value, + "Invalid input message specification"); + static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value, + "Invalid output message specification"); + CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x( + channel, method, context, request, result, on_completion); +} + +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl { + public: + CallbackUnaryCallImpl(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + const InputMessage* request, OutputMessage* result, + std::function<void(grpc::Status)> on_completion) { + grpc::CompletionQueue* cq = channel->CallbackCQ(); + GPR_ASSERT(cq != nullptr); + grpc::internal::Call call(channel->CreateCall(method, context, cq)); + + using FullCallOpSet = grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage<OutputMessage>, + grpc::internal::CallOpClientSendClose, + grpc::internal::CallOpClientRecvStatus>; + + struct OpSetAndTag { + FullCallOpSet opset; + grpc::internal::CallbackWithStatusTag tag; + }; + const size_t alloc_sz = sizeof(OpSetAndTag); + auto* const alloced = + static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz)); + auto* ops = new (&alloced->opset) FullCallOpSet; + auto* tag = new (&alloced->tag) + grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops); + + // TODO(vjpai): Unify code with sync API as much as possible + grpc::Status s = ops->SendMessagePtr(request); + if (!s.ok()) { + tag->force_run(s); + return; + } + ops->SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + ops->RecvInitialMetadata(context); + ops->RecvMessage(result); + ops->AllowNoMessage(); + ops->ClientSendClose(); + ops->ClientRecvStatus(context, tag->status_ptr()); + ops->set_core_cq_tag(tag); + call.PerformOps(ops); + } +}; + +// Base class for public API classes. +class ClientReactor { + public: + virtual ~ClientReactor() = default; + + /// Called by the library when all operations associated with this RPC have + /// completed and all Holds have been removed. OnDone provides the RPC status + /// outcome for both successful and failed RPCs. If it is never called on an + /// RPC, it indicates an application-level problem (like failure to remove a + /// hold). + /// + /// \param[in] s The status outcome of this RPC + virtual void OnDone(const grpc::Status& /*s*/) = 0; + + /// InternalScheduleOnDone is not part of the API and is not meant to be + /// overridden. It is virtual to allow successful builds for certain bazel + /// build users that only want to depend on gRPC codegen headers and not the + /// full library (although this is not a generally-supported option). Although + /// the virtual call is slower than a direct call, this function is + /// heavyweight and the cost of the virtual call is not much in comparison. + /// This function may be removed or devirtualized in the future. + virtual void InternalScheduleOnDone(grpc::Status s); + + /// InternalTrailersOnly is not part of the API and is not meant to be + /// overridden. It is virtual to allow successful builds for certain bazel + /// build users that only want to depend on gRPC codegen headers and not the + /// full library (although this is not a generally-supported option). Although + /// the virtual call is slower than a direct call, this function is + /// heavyweight and the cost of the virtual call is not much in comparison. + /// This function may be removed or devirtualized in the future. + virtual bool InternalTrailersOnly(const grpc_call* call) const; +}; + +} // namespace internal + +// Forward declarations +template <class Request, class Response> +class ClientBidiReactor; +template <class Response> +class ClientReadReactor; +template <class Request> +class ClientWriteReactor; +class ClientUnaryReactor; + +// NOTE: The streaming objects are not actually implemented in the public API. +// These interfaces are provided for mocking only. Typical applications +// will interact exclusively with the reactors that they define. +template <class Request, class Response> +class ClientCallbackReaderWriter { + public: + virtual ~ClientCallbackReaderWriter() {} + virtual void StartCall() = 0; + virtual void Write(const Request* req, grpc::WriteOptions options) = 0; + virtual void WritesDone() = 0; + virtual void Read(Response* resp) = 0; + virtual void AddHold(int holds) = 0; + virtual void RemoveHold() = 0; + + protected: + void BindReactor(ClientBidiReactor<Request, Response>* reactor) { + reactor->BindStream(this); + } +}; + +template <class Response> +class ClientCallbackReader { + public: + virtual ~ClientCallbackReader() {} + virtual void StartCall() = 0; + virtual void Read(Response* resp) = 0; + virtual void AddHold(int holds) = 0; + virtual void RemoveHold() = 0; + + protected: + void BindReactor(ClientReadReactor<Response>* reactor) { + reactor->BindReader(this); + } +}; + +template <class Request> +class ClientCallbackWriter { + public: + virtual ~ClientCallbackWriter() {} + virtual void StartCall() = 0; + void Write(const Request* req) { Write(req, grpc::WriteOptions()); } + virtual void Write(const Request* req, grpc::WriteOptions options) = 0; + void WriteLast(const Request* req, grpc::WriteOptions options) { + Write(req, options.set_last_message()); + } + virtual void WritesDone() = 0; + + virtual void AddHold(int holds) = 0; + virtual void RemoveHold() = 0; + + protected: + void BindReactor(ClientWriteReactor<Request>* reactor) { + reactor->BindWriter(this); + } +}; + +class ClientCallbackUnary { + public: + virtual ~ClientCallbackUnary() {} + virtual void StartCall() = 0; + + protected: + void BindReactor(ClientUnaryReactor* reactor); +}; + +// The following classes are the reactor interfaces that are to be implemented +// by the user. They are passed in to the library as an argument to a call on a +// stub (either a codegen-ed call or a generic call). The streaming RPC is +// activated by calling StartCall, possibly after initiating StartRead, +// StartWrite, or AddHold operations on the streaming object. Note that none of +// the classes are pure; all reactions have a default empty reaction so that the +// user class only needs to override those reactions that it cares about. +// The reactor must be passed to the stub invocation before any of the below +// operations can be called and its reactions will be invoked by the library in +// response to the completion of various operations. Reactions must not include +// blocking operations (such as blocking I/O, starting synchronous RPCs, or +// waiting on condition variables). Reactions may be invoked concurrently, +// except that OnDone is called after all others (assuming proper API usage). +// The reactor may not be deleted until OnDone is called. + +/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC. +template <class Request, class Response> +class ClientBidiReactor : public internal::ClientReactor { + public: + /// Activate the RPC and initiate any reads or writes that have been Start'ed + /// before this call. All streaming RPCs issued by the client MUST have + /// StartCall invoked on them (even if they are canceled) as this call is the + /// activation of their lifecycle. + void StartCall() { stream_->StartCall(); } + + /// Initiate a read operation (or post it for later initiation if StartCall + /// has not yet been invoked). + /// + /// \param[out] resp Where to eventually store the read message. Valid when + /// the library calls OnReadDone + void StartRead(Response* resp) { stream_->Read(resp); } + + /// Initiate a write operation (or post it for later initiation if StartCall + /// has not yet been invoked). + /// + /// \param[in] req The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); } + + /// Initiate/post a write operation with specified options. + /// + /// \param[in] req The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWrite(const Request* req, grpc::WriteOptions options) { + stream_->Write(req, options); + } + + /// Initiate/post a write operation with specified options and an indication + /// that this is the last write (like StartWrite and StartWritesDone, merged). + /// Note that calling this means that no more calls to StartWrite, + /// StartWriteLast, or StartWritesDone are allowed. + /// + /// \param[in] req The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWriteLast(const Request* req, grpc::WriteOptions options) { + StartWrite(req, options.set_last_message()); + } + + /// Indicate that the RPC will have no more write operations. This can only be + /// issued once for a given RPC. This is not required or allowed if + /// StartWriteLast is used since that already has the same implication. + /// Note that calling this means that no more calls to StartWrite, + /// StartWriteLast, or StartWritesDone are allowed. + void StartWritesDone() { stream_->WritesDone(); } + + /// Holds are needed if (and only if) this stream has operations that take + /// place on it after StartCall but from outside one of the reactions + /// (OnReadDone, etc). This is _not_ a common use of the streaming API. + /// + /// Holds must be added before calling StartCall. If a stream still has a hold + /// in place, its resources will not be destroyed even if the status has + /// already come in from the wire and there are currently no active callbacks + /// outstanding. Similarly, the stream will not call OnDone if there are still + /// holds on it. + /// + /// For example, if a StartRead or StartWrite operation is going to be + /// initiated from elsewhere in the application, the application should call + /// AddHold or AddMultipleHolds before StartCall. If there is going to be, + /// for example, a read-flow and a write-flow taking place outside the + /// reactions, then call AddMultipleHolds(2) before StartCall. When the + /// application knows that it won't issue any more read operations (such as + /// when a read comes back as not ok), it should issue a RemoveHold(). It + /// should also call RemoveHold() again after it does StartWriteLast or + /// StartWritesDone that indicates that there will be no more write ops. + /// The number of RemoveHold calls must match the total number of AddHold + /// calls plus the number of holds added by AddMultipleHolds. + /// The argument to AddMultipleHolds must be positive. + void AddHold() { AddMultipleHolds(1); } + void AddMultipleHolds(int holds) { + GPR_DEBUG_ASSERT(holds > 0); + stream_->AddHold(holds); + } + void RemoveHold() { stream_->RemoveHold(); } + + /// Notifies the application that all operations associated with this RPC + /// have completed and all Holds have been removed. OnDone provides the RPC + /// status outcome for both successful and failed RPCs and will be called in + /// all cases. If it is not called, it indicates an application-level problem + /// (like failure to remove a hold). + /// + /// \param[in] s The status outcome of this RPC + void OnDone(const grpc::Status& /*s*/) override {} + + /// Notifies the application that a read of initial metadata from the + /// server is done. If the application chooses not to implement this method, + /// it can assume that the initial metadata has been read before the first + /// call of OnReadDone or OnDone. + /// + /// \param[in] ok Was the initial metadata read successfully? If false, no + /// new read/write operation will succeed, and any further + /// Start* operations should not be called. + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + virtual void OnReadDone(bool /*ok*/) {} + + /// Notifies the application that a StartWrite or StartWriteLast operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + virtual void OnWriteDone(bool /*ok*/) {} + + /// Notifies the application that a StartWritesDone operation completed. Note + /// that this is only used on explicit StartWritesDone operations and not for + /// those that are implicitly invoked as part of a StartWriteLast. + /// + /// \param[in] ok Was it successful? If false, the application will later see + /// the failure reflected as a bad status in OnDone and no + /// further Start* should be called. + virtual void OnWritesDoneDone(bool /*ok*/) {} + + private: + friend class ClientCallbackReaderWriter<Request, Response>; + void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) { + stream_ = stream; + } + ClientCallbackReaderWriter<Request, Response>* stream_; +}; + +/// \a ClientReadReactor is the interface for a server-streaming RPC. +/// All public methods behave as in ClientBidiReactor. +template <class Response> +class ClientReadReactor : public internal::ClientReactor { + public: + void StartCall() { reader_->StartCall(); } + void StartRead(Response* resp) { reader_->Read(resp); } + + void AddHold() { AddMultipleHolds(1); } + void AddMultipleHolds(int holds) { + GPR_DEBUG_ASSERT(holds > 0); + reader_->AddHold(holds); + } + void RemoveHold() { reader_->RemoveHold(); } + + void OnDone(const grpc::Status& /*s*/) override {} + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + virtual void OnReadDone(bool /*ok*/) {} + + private: + friend class ClientCallbackReader<Response>; + void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; } + ClientCallbackReader<Response>* reader_; +}; + +/// \a ClientWriteReactor is the interface for a client-streaming RPC. +/// All public methods behave as in ClientBidiReactor. +template <class Request> +class ClientWriteReactor : public internal::ClientReactor { + public: + void StartCall() { writer_->StartCall(); } + void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); } + void StartWrite(const Request* req, grpc::WriteOptions options) { + writer_->Write(req, options); + } + void StartWriteLast(const Request* req, grpc::WriteOptions options) { + StartWrite(req, options.set_last_message()); + } + void StartWritesDone() { writer_->WritesDone(); } + + void AddHold() { AddMultipleHolds(1); } + void AddMultipleHolds(int holds) { + GPR_DEBUG_ASSERT(holds > 0); + writer_->AddHold(holds); + } + void RemoveHold() { writer_->RemoveHold(); } + + void OnDone(const grpc::Status& /*s*/) override {} + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + virtual void OnWriteDone(bool /*ok*/) {} + virtual void OnWritesDoneDone(bool /*ok*/) {} + + private: + friend class ClientCallbackWriter<Request>; + void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; } + + ClientCallbackWriter<Request>* writer_; +}; + +/// \a ClientUnaryReactor is a reactor-style interface for a unary RPC. +/// This is _not_ a common way of invoking a unary RPC. In practice, this +/// option should be used only if the unary RPC wants to receive initial +/// metadata without waiting for the response to complete. Most deployments of +/// RPC systems do not use this option, but it is needed for generality. +/// All public methods behave as in ClientBidiReactor. +/// StartCall is included for consistency with the other reactor flavors: even +/// though there are no StartRead or StartWrite operations to queue before the +/// call (that is part of the unary call itself) and there is no reactor object +/// being created as a result of this call, we keep a consistent 2-phase +/// initiation API among all the reactor flavors. +class ClientUnaryReactor : public internal::ClientReactor { + public: + void StartCall() { call_->StartCall(); } + void OnDone(const grpc::Status& /*s*/) override {} + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + + private: + friend class ClientCallbackUnary; + void BindCall(ClientCallbackUnary* call) { call_ = call; } + ClientCallbackUnary* call_; +}; + +// Define function out-of-line from class to avoid forward declaration issue +inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) { + reactor->BindCall(this); +} + +namespace internal { + +// Forward declare factory classes for friendship +template <class Request, class Response> +class ClientCallbackReaderWriterFactory; +template <class Response> +class ClientCallbackReaderFactory; +template <class Request> +class ClientCallbackWriterFactory; + +template <class Request, class Response> +class ClientCallbackReaderWriterImpl + : public ClientCallbackReaderWriter<Request, Response> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_ASSERT(false); } + + void StartCall() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any read backlog + // 3. Any write backlog + // 4. Recv trailing metadata (unless corked) + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + + call_.PerformOps(&start_ops_); + + { + grpc::internal::MutexLock lock(&start_mu_); + + if (backlog_.read_ops) { + call_.PerformOps(&read_ops_); + } + if (backlog_.write_ops) { + call_.PerformOps(&write_ops_); + } + if (backlog_.writes_done_ops) { + call_.PerformOps(&writes_done_ops_); + } + call_.PerformOps(&finish_ops_); + // The last thing in this critical section is to set started_ so that it + // can be used lock-free as well. + started_.store(true, std::memory_order_release); + } + // MaybeFinish outside the lock to make sure that destruction of this object + // doesn't take place while holding the lock (which would cause the lock to + // be released after destruction) + this->MaybeFinish(/*from_reaction=*/false); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.read_ops = true; + return; + } + } + call_.PerformOps(&read_ops_); + } + + void Write(const Request* msg, grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(vjpai): don't assert + GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(corked_write_needed_)) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.write_ops = true; + return; + } + } + call_.PerformOps(&write_ops_); + } + void WritesDone() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &writes_done_ops_, /*can_inline=*/false); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(corked_write_needed_)) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.writes_done_ops = true; + return; + } + } + call_.PerformOps(&writes_done_ops_); + } + + void AddHold(int holds) override { + callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); + } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } + + private: + friend class ClientCallbackReaderWriterFactory<Request, Response>; + + ClientCallbackReaderWriterImpl(grpc::internal::Call call, + grpc::ClientContext* context, + ClientBidiReactor<Request, Response>* reactor) + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_), + corked_write_needed_(start_corked_) { + this->BindReactor(reactor); + + // Set up the unchanging parts of the start, read, and write tags and ops. + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + + write_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + read_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &read_ops_, /*can_inline=*/false); + read_ops_.set_core_cq_tag(&read_tag_); + + // Also set up the Finish tag and op set. + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + } + + // MaybeFinish can be called from reactions or from user-initiated operations + // like StartCall or RemoveHold. If this is the last operation or hold on this + // object, it will invoke the OnDone reaction. If MaybeFinish was called from + // a reaction, it can call OnDone directly. If not, it would need to schedule + // OnDone onto an executor thread to avoid the possibility of deadlocking with + // any locks in the user code that invoked it. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackReaderWriterImpl(); + grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientBidiReactor<Request, Response>* const reactor_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpRecvInitialMetadata> + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + const bool start_corked_; + bool corked_write_needed_; // no lock needed since only accessed in + // Write/WritesDone which cannot be concurrent + + grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + write_ops_; + grpc::internal::CallbackWithSuccessTag write_tag_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpClientSendClose> + writes_done_ops_; + grpc::internal::CallbackWithSuccessTag writes_done_tag_; + + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>> + read_ops_; + grpc::internal::CallbackWithSuccessTag read_tag_; + + struct StartCallBacklog { + bool write_ops = false; + bool writes_done_ops = false; + bool read_ops = false; + }; + StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_); + + // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish + std::atomic<intptr_t> callbacks_outstanding_{3}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; +}; + +template <class Request, class Response> +class ClientCallbackReaderWriterFactory { + public: + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + ClientBidiReactor<Request, Response>* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc_call_ref(call.call()); + new (grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>))) + ClientCallbackReaderWriterImpl<Request, Response>(call, context, + reactor); + } +}; + +template <class Response> +class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_ASSERT(false); } + + void StartCall() override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any backlog + // 3. Recv trailing metadata + + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + // Also set up the read tag so it doesn't have to be set up each time + read_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &read_ops_, /*can_inline=*/false); + read_ops_.set_core_cq_tag(&read_tag_); + + { + grpc::internal::MutexLock lock(&start_mu_); + if (backlog_.read_ops) { + call_.PerformOps(&read_ops_); + } + started_.store(true, std::memory_order_release); + } + + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.read_ops = true; + return; + } + } + call_.PerformOps(&read_ops_); + } + + void AddHold(int holds) override { + callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); + } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } + + private: + friend class ClientCallbackReaderFactory<Response>; + + template <class Request> + ClientCallbackReaderImpl(grpc::internal::Call call, + grpc::ClientContext* context, Request* request, + ClientReadReactor<Response>* reactor) + : context_(context), call_(call), reactor_(reactor) { + this->BindReactor(reactor); + // TODO(vjpai): don't assert + GPR_ASSERT(start_ops_.SendMessagePtr(request).ok()); + start_ops_.ClientSendClose(); + } + + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackReaderImpl(); + grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientReadReactor<Response>* const reactor_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose, + grpc::internal::CallOpRecvInitialMetadata> + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + + grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>> + read_ops_; + grpc::internal::CallbackWithSuccessTag read_tag_; + + struct StartCallBacklog { + bool read_ops = false; + }; + StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_); + + // Minimum of 2 callbacks to pre-register for start and finish + std::atomic<intptr_t> callbacks_outstanding_{2}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; +}; + +template <class Response> +class ClientCallbackReaderFactory { + public: + template <class Request> + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, const Request* request, + ClientReadReactor<Response>* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc_call_ref(call.call()); + new (grpc_call_arena_alloc(call.call(), + sizeof(ClientCallbackReaderImpl<Response>))) + ClientCallbackReaderImpl<Response>(call, context, request, reactor); + } +}; + +template <class Request> +class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_ASSERT(false); } + + void StartCall() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any backlog + // 3. Recv trailing metadata + + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + call_.PerformOps(&start_ops_); + + { + grpc::internal::MutexLock lock(&start_mu_); + + if (backlog_.write_ops) { + call_.PerformOps(&write_ops_); + } + if (backlog_.writes_done_ops) { + call_.PerformOps(&writes_done_ops_); + } + call_.PerformOps(&finish_ops_); + // The last thing in this critical section is to set started_ so that it + // can be used lock-free as well. + started_.store(true, std::memory_order_release); + } + // MaybeFinish outside the lock to make sure that destruction of this object + // doesn't take place while holding the lock (which would cause the lock to + // be released after destruction) + this->MaybeFinish(/*from_reaction=*/false); + } + + void Write(const Request* msg, grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { + if (GPR_UNLIKELY(options.is_last_message())) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(vjpai): don't assert + GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + + if (GPR_UNLIKELY(corked_write_needed_)) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.write_ops = true; + return; + } + } + call_.PerformOps(&write_ops_); + } + + void WritesDone() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &writes_done_ops_, /*can_inline=*/false); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + + if (GPR_UNLIKELY(corked_write_needed_)) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.writes_done_ops = true; + return; + } + } + call_.PerformOps(&writes_done_ops_); + } + + void AddHold(int holds) override { + callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); + } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } + + private: + friend class ClientCallbackWriterFactory<Request>; + + template <class Response> + ClientCallbackWriterImpl(grpc::internal::Call call, + grpc::ClientContext* context, Response* response, + ClientWriteReactor<Request>* reactor) + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_), + corked_write_needed_(start_corked_) { + this->BindReactor(reactor); + + // Set up the unchanging parts of the start and write tags and ops. + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + + write_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + // Also set up the Finish tag and op set. + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + } + + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackWriterImpl(); + grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientWriteReactor<Request>* const reactor_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpRecvInitialMetadata> + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + const bool start_corked_; + bool corked_write_needed_; // no lock needed since only accessed in + // Write/WritesDone which cannot be concurrent + + grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage, + grpc::internal::CallOpClientRecvStatus> + finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + write_ops_; + grpc::internal::CallbackWithSuccessTag write_tag_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpClientSendClose> + writes_done_ops_; + grpc::internal::CallbackWithSuccessTag writes_done_tag_; + + struct StartCallBacklog { + bool write_ops = false; + bool writes_done_ops = false; + }; + StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_); + + // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish + std::atomic<intptr_t> callbacks_outstanding_{3}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; +}; + +template <class Request> +class ClientCallbackWriterFactory { + public: + template <class Response> + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, Response* response, + ClientWriteReactor<Request>* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc_call_ref(call.call()); + new (grpc_call_arena_alloc(call.call(), + sizeof(ClientCallbackWriterImpl<Request>))) + ClientCallbackWriterImpl<Request>(call, context, response, reactor); + } +}; + +class ClientCallbackUnaryImpl final : public ClientCallbackUnary { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_ASSERT(false); } + + void StartCall() override { + // This call initiates two batches, each with a callback + // 1. Send initial metadata + write + writes done + recv initial metadata + // 2. Read message, recv trailing metadata + + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + finish_tag_.Set( + call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + private: + friend class ClientCallbackUnaryFactory; + + template <class Request, class Response> + ClientCallbackUnaryImpl(grpc::internal::Call call, + grpc::ClientContext* context, Request* request, + Response* response, ClientUnaryReactor* reactor) + : context_(context), call_(call), reactor_(reactor) { + this->BindReactor(reactor); + // TODO(vjpai): don't assert + GPR_ASSERT(start_ops_.SendMessagePtr(request).ok()); + start_ops_.ClientSendClose(); + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + } + + // In the unary case, MaybeFinish is only ever invoked from a + // library-initiated reaction, so it will just directly call OnDone if this is + // the last reaction for this RPC. + void MaybeFinish() { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackUnaryImpl(); + grpc_call_unref(call); + reactor->OnDone(s); + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientUnaryReactor* const reactor_; + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose, + grpc::internal::CallOpRecvInitialMetadata> + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + + grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage, + grpc::internal::CallOpClientRecvStatus> + finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + // This call will have 2 callbacks: start and finish + std::atomic<intptr_t> callbacks_outstanding_{2}; +}; + +class ClientCallbackUnaryFactory { + public: + template <class Request, class Response, class BaseRequest = Request, + class BaseResponse = Response> + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, const Request* request, + Response* response, ClientUnaryReactor* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc_call_ref(call.call()); + + new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl))) + ClientCallbackUnaryImpl(call, context, + static_cast<const BaseRequest*>(request), + static_cast<BaseResponse*>(response), reactor); + } +}; + +} // namespace internal +} // namespace grpc #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H diff --git a/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h b/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h index 8e5e1ce67b..ef84d18439 100644 --- a/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h +++ b/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h @@ -1,20 +1,20 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H #define GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H @@ -22,6 +22,7 @@ #include <memory> #include <vector> +#include <grpc/support/log.h> #include <grpcpp/impl/rpc_method.h> #include <grpcpp/support/interceptor.h> #include <grpcpp/support/string_ref.h> @@ -135,7 +136,7 @@ class ClientRpcInfo { // Runs interceptor at pos \a pos. void RunInterceptor( experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) { - GPR_CODEGEN_ASSERT(pos < interceptors_.size()); + GPR_ASSERT(pos < interceptors_.size()); interceptors_[pos]->Intercept(interceptor_methods); } diff --git a/contrib/libs/grpc/include/grpcpp/support/config.h b/contrib/libs/grpc/include/grpcpp/support/config.h index f2632f2640..6e28fe73c8 100644 --- a/contrib/libs/grpc/include/grpcpp/support/config.h +++ b/contrib/libs/grpc/include/grpcpp/support/config.h @@ -1,24 +1,44 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_CONFIG_H #define GRPCPP_SUPPORT_CONFIG_H -#include <grpcpp/impl/codegen/config.h> // IWYU pragma: export +#include <util/generic/string.h> +#include <util/string/cast.h> + +/// The following macros are deprecated and appear only for users +/// with PB files generated using gRPC 1.0.x plugins. They should +/// not be used in new code +#define GRPC_OVERRIDE override // deprecated +#define GRPC_FINAL final // deprecated + +#ifdef GRPC_CUSTOM_STRING +#warning GRPC_CUSTOM_STRING is no longer supported. Please use TString. +#endif + +namespace grpc { + +// Using grpc::string and grpc::to_string is discouraged in favor of +// TString and ::ToString. This is only for legacy code using +// them explictly. +typedef TString string; + +} // namespace grpc #endif // GRPCPP_SUPPORT_CONFIG_H diff --git a/contrib/libs/grpc/include/grpcpp/support/interceptor.h b/contrib/libs/grpc/include/grpcpp/support/interceptor.h index d4f2ea180e..7c5c95a664 100644 --- a/contrib/libs/grpc/include/grpcpp/support/interceptor.h +++ b/contrib/libs/grpc/include/grpcpp/support/interceptor.h @@ -1,24 +1,231 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_INTERCEPTOR_H #define GRPCPP_SUPPORT_INTERCEPTOR_H -#include <grpcpp/impl/codegen/interceptor.h> // IWYU pragma: export +#include <map> +#include <memory> +#include <util/generic/string.h> +#include <util/string/cast.h> + +#include <grpc/impl/grpc_types.h> +#include <grpcpp/impl/metadata_map.h> +#include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/config.h> +#include <grpcpp/support/string_ref.h> + +namespace grpc { + +class ChannelInterface; +class Status; + +namespace experimental { + +/// An enumeration of different possible points at which the \a Intercept +/// method of the \a Interceptor interface may be called. Any given call +/// to \a Intercept will include one or more of these hook points, and +/// each hook point makes certain types of information available to the +/// interceptor. +/// In these enumeration names, PRE_SEND means that an interception has taken +/// place between the time the application provided a certain type of data +/// (e.g., initial metadata, status) and the time that that data goes to the +/// other side. POST_SEND means that the data has been committed for going to +/// the other side (even if it has not yet been received at the other side). +/// PRE_RECV means an interception between the time that a certain +/// operation has been requested and it is available. POST_RECV means that a +/// result is available but has not yet been passed back to the application. +/// A batch of interception points will only contain either PRE or POST hooks +/// but not both types. For example, a batch with PRE_SEND hook points will not +/// contain POST_RECV or POST_SEND ops. Likewise, a batch with POST_* ops can +/// not contain PRE_* ops. +enum class InterceptionHookPoints { + /// The first three in this list are for clients and servers + PRE_SEND_INITIAL_METADATA, + PRE_SEND_MESSAGE, + POST_SEND_MESSAGE, + PRE_SEND_STATUS, // server only + PRE_SEND_CLOSE, // client only: WritesDone for stream; after write in unary + /// The following three are for hijacked clients only. A batch with PRE_RECV_* + /// hook points will never contain hook points of other types. + PRE_RECV_INITIAL_METADATA, + PRE_RECV_MESSAGE, + PRE_RECV_STATUS, + /// The following two are for all clients and servers + POST_RECV_INITIAL_METADATA, + POST_RECV_MESSAGE, + POST_RECV_STATUS, // client only + POST_RECV_CLOSE, // server only + /// This is a special hook point available to both clients and servers when + /// TryCancel() is performed. + /// - No other hook points will be present along with this. + /// - It is illegal for an interceptor to block/delay this operation. + /// - ALL interceptors see this hook point irrespective of whether the + /// RPC was hijacked or not. + PRE_SEND_CANCEL, + NUM_INTERCEPTION_HOOKS +}; + +/// Class that is passed as an argument to the \a Intercept method +/// of the application's \a Interceptor interface implementation. It has five +/// purposes: +/// 1. Indicate which hook points are present at a specific interception +/// 2. Allow an interceptor to inform the library that an RPC should +/// continue to the next stage of its processing (which may be another +/// interceptor or the main path of the library) +/// 3. Allow an interceptor to hijack the processing of the RPC (only for +/// client-side RPCs with PRE_SEND_INITIAL_METADATA) so that it does not +/// proceed with normal processing beyond that stage +/// 4. Access the relevant fields of an RPC at each interception point +/// 5. Set some fields of an RPC at each interception point, when possible +class InterceptorBatchMethods { + public: + virtual ~InterceptorBatchMethods() {} + /// Determine whether the current batch has an interception hook point + /// of type \a type + virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0; + /// Signal that the interceptor is done intercepting the current batch of the + /// RPC. Every interceptor must either call Proceed or Hijack on each + /// interception. In most cases, only Proceed will be used. Explicit use of + /// Proceed is what enables interceptors to delay the processing of RPCs + /// while they perform other work. + /// Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning + /// from the Intercept method does the job of continuing the RPC in this case. + /// This is because PRE_SEND_CANCEL is always in a separate batch and is not + /// allowed to be delayed. + virtual void Proceed() = 0; + /// Indicate that the interceptor has hijacked the RPC (only valid if the + /// batch contains send_initial_metadata on the client side). Later + /// interceptors in the interceptor list will not be called. Later batches + /// on the same RPC will go through interception, but only up to the point + /// of the hijacking interceptor. + virtual void Hijack() = 0; + + /// Send Message Methods + /// GetSerializedSendMessage and GetSendMessage/ModifySendMessage are the + /// available methods to view and modify the request payload. An interceptor + /// can access the payload in either serialized form or non-serialized form + /// but not both at the same time. + /// gRPC performs serialization in a lazy manner, which means + /// that a call to GetSerializedSendMessage will result in a serialization + /// operation if the payload stored is not in the serialized form already; the + /// non-serialized form will be lost and GetSendMessage will no longer return + /// a valid pointer, and this will remain true for later interceptors too. + /// This can change however if ModifySendMessage is used to replace the + /// current payload. Note that ModifySendMessage requires a new payload + /// message in the non-serialized form. This will overwrite the existing + /// payload irrespective of whether it had been serialized earlier. Also note + /// that gRPC Async API requires early serialization of the payload which + /// means that the payload would be available in the serialized form only + /// unless an interceptor replaces the payload with ModifySendMessage. + + /// Returns a modifable ByteBuffer holding the serialized form of the message + /// that is going to be sent. Valid for PRE_SEND_MESSAGE interceptions. + /// A return value of nullptr indicates that this ByteBuffer is not valid. + virtual ByteBuffer* GetSerializedSendMessage() = 0; + + /// Returns a non-modifiable pointer to the non-serialized form of the message + /// to be sent. Valid for PRE_SEND_MESSAGE interceptions. A return value of + /// nullptr indicates that this field is not valid. + virtual const void* GetSendMessage() = 0; + + /// Overwrites the message to be sent with \a message. \a message should be in + /// the non-serialized form expected by the method. Valid for PRE_SEND_MESSAGE + /// interceptions. Note that the interceptor is responsible for maintaining + /// the life of the message till it is serialized or it receives the + /// POST_SEND_MESSAGE interception point, whichever happens earlier. The + /// modifying interceptor may itself force early serialization by calling + /// GetSerializedSendMessage. + virtual void ModifySendMessage(const void* message) = 0; + + /// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE + /// interceptions. + virtual bool GetSendMessageStatus() = 0; + + /// Returns a modifiable multimap of the initial metadata to be sent. Valid + /// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates + /// that this field is not valid. + virtual std::multimap<TString, TString>* GetSendInitialMetadata() = 0; + + /// Returns the status to be sent. Valid for PRE_SEND_STATUS interceptions. + virtual Status GetSendStatus() = 0; + + /// Overwrites the status with \a status. Valid for PRE_SEND_STATUS + /// interceptions. + virtual void ModifySendStatus(const Status& status) = 0; + + /// Returns a modifiable multimap of the trailing metadata to be sent. Valid + /// for PRE_SEND_STATUS interceptions. A value of nullptr indicates + /// that this field is not valid. + virtual std::multimap<TString, TString>* + GetSendTrailingMetadata() = 0; + + /// Returns a pointer to the modifiable received message. Note that the + /// message is already deserialized but the type is not set; the interceptor + /// should static_cast to the appropriate type before using it. This is valid + /// for PRE_RECV_MESSAGE and POST_RECV_MESSAGE interceptions; nullptr for not + /// valid + virtual void* GetRecvMessage() = 0; + + /// Returns a modifiable multimap of the received initial metadata. + /// Valid for PRE_RECV_INITIAL_METADATA and POST_RECV_INITIAL_METADATA + /// interceptions; nullptr if not valid + virtual std::multimap<grpc::string_ref, grpc::string_ref>* + GetRecvInitialMetadata() = 0; + + /// Returns a modifiable view of the received status on PRE_RECV_STATUS and + /// POST_RECV_STATUS interceptions; nullptr if not valid. + virtual Status* GetRecvStatus() = 0; + + /// Returns a modifiable multimap of the received trailing metadata on + /// PRE_RECV_STATUS and POST_RECV_STATUS interceptions; nullptr if not valid + virtual std::multimap<grpc::string_ref, grpc::string_ref>* + GetRecvTrailingMetadata() = 0; + + /// Gets an intercepted channel. When a call is started on this interceptor, + /// only interceptors after the current interceptor are created from the + /// factory objects registered with the channel. This allows calls to be + /// started from interceptors without infinite regress through the interceptor + /// list. + virtual std::unique_ptr<ChannelInterface> GetInterceptedChannel() = 0; + + /// On a hijacked RPC, an interceptor can decide to fail a PRE_RECV_MESSAGE + /// op. This would be a signal to the reader that there will be no more + /// messages, or the stream has failed or been cancelled. + virtual void FailHijackedRecvMessage() = 0; + + /// On a hijacked RPC/ to-be hijacked RPC, this can be called to fail a SEND + /// MESSAGE op + virtual void FailHijackedSendMessage() = 0; +}; + +/// Interface for an interceptor. Interceptor authors must create a class +/// that derives from this parent class. +class Interceptor { + public: + virtual ~Interceptor() {} + + /// The one public method of an Interceptor interface. Override this to + /// trigger the desired actions at the hook points described above. + virtual void Intercept(InterceptorBatchMethods* methods) = 0; +}; + +} // namespace experimental +} // namespace grpc #endif // GRPCPP_SUPPORT_INTERCEPTOR_H diff --git a/contrib/libs/grpc/include/grpcpp/support/message_allocator.h b/contrib/libs/grpc/include/grpcpp/support/message_allocator.h new file mode 100644 index 0000000000..8e627900e5 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/message_allocator.h @@ -0,0 +1,71 @@ +// +// +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H +#define GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H + +namespace grpc { + +// NOTE: This is an API for advanced users who need custom allocators. +// Per rpc struct for the allocator. This is the interface to return to user. +class RpcAllocatorState { + public: + virtual ~RpcAllocatorState() = default; + // Optionally deallocate request early to reduce the size of working set. + // A custom MessageAllocator needs to be registered to make use of this. + // This is not abstract because implementing it is optional. + virtual void FreeRequest() {} +}; + +// This is the interface returned by the allocator. +// grpc library will call the methods to get request/response pointers and to +// release the object when it is done. +template <typename RequestT, typename ResponseT> +class MessageHolder : public RpcAllocatorState { + public: + // Release this object. For example, if the custom allocator's + // AllocateMessasge creates an instance of a subclass with new, the Release() + // should do a "delete this;". + virtual void Release() = 0; + RequestT* request() { return request_; } + ResponseT* response() { return response_; } + + protected: + void set_request(RequestT* request) { request_ = request; } + void set_response(ResponseT* response) { response_ = response; } + + private: + // NOTE: subclasses should set these pointers. + RequestT* request_; + ResponseT* response_; +}; + +// A custom allocator can be set via the generated code to a callback unary +// method, such as SetMessageAllocatorFor_Echo(custom_allocator). The allocator +// needs to be alive for the lifetime of the server. +// Implementations need to be thread-safe. +template <typename RequestT, typename ResponseT> +class MessageAllocator { + public: + virtual ~MessageAllocator() = default; + virtual MessageHolder<RequestT, ResponseT>* AllocateMessages() = 0; +}; + +} // namespace grpc + +#endif // GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H diff --git a/contrib/libs/grpc/include/grpcpp/support/method_handler.h b/contrib/libs/grpc/include/grpcpp/support/method_handler.h index 0b97a7af03..e4ec4a88d8 100644 --- a/contrib/libs/grpc/include/grpcpp/support/method_handler.h +++ b/contrib/libs/grpc/include/grpcpp/support/method_handler.h @@ -1,24 +1,405 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H #define GRPCPP_SUPPORT_METHOD_HANDLER_H -#include <grpcpp/impl/codegen/method_handler.h> // IWYU pragma: export +#include <grpc/byte_buffer.h> +#include <grpc/support/log.h> +#include <grpcpp/impl/rpc_service_method.h> +#include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/sync_stream.h> + +namespace grpc { + +namespace internal { + +// Invoke the method handler, fill in the status, and +// return whether or not we finished safely (without an exception). +// Note that exception handling is 0-cost in most compiler/library +// implementations (except when an exception is actually thrown), +// so this process doesn't require additional overhead in the common case. +// Additionally, we don't need to return if we caught an exception or not; +// the handling is the same in either case. +template <class Callable> +::grpc::Status CatchingFunctionHandler(Callable&& handler) { +#if GRPC_ALLOW_EXCEPTIONS + try { + return handler(); + } catch (...) { + return grpc::Status(grpc::StatusCode::UNKNOWN, + "Unexpected error in RPC handling"); + } +#else // GRPC_ALLOW_EXCEPTIONS + return handler(); +#endif // GRPC_ALLOW_EXCEPTIONS +} + +/// A helper function with reduced templating to do the common work needed to +/// actually send the server response. Uses non-const parameter for Status since +/// this should only ever be called from the end of the RunHandler method. + +template <class ResponseType> +void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param, + ResponseType* rsp, grpc::Status& status) { + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + ops; + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + if (status.ok()) { + status = ops.SendMessagePtr(rsp); + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); +} + +/// A helper function with reduced templating to do deserializing. + +template <class RequestType> +void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status, + RequestType* request) { + grpc::ByteBuffer buf; + buf.set_buffer(req); + *status = grpc::SerializationTraits<RequestType>::Deserialize( + &buf, static_cast<RequestType*>(request)); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; +} + +/// A wrapper class of an application provided rpc method handler. +template <class ServiceType, class RequestType, class ResponseType, + class BaseRequestType = RequestType, + class BaseResponseType = ResponseType> +class RpcMethodHandler : public grpc::internal::MethodHandler { + public: + RpcMethodHandler( + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + const RequestType*, ResponseType*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) final { + ResponseType rsp; + grpc::Status status = param.status; + if (status.ok()) { + status = CatchingFunctionHandler([this, ¶m, &rsp] { + return func_(service_, + static_cast<grpc::ServerContext*>(param.server_context), + static_cast<RequestType*>(param.request), &rsp); + }); + static_cast<RequestType*>(param.request)->~RequestType(); + } + UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + grpc::Status* status, void** /*handler_data*/) final { + auto* request = + new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType; + return UnaryDeserializeHelper(req, status, + static_cast<BaseRequestType*>(request)); + } + + private: + /// Application provided rpc handler function. + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + const RequestType*, ResponseType*)> + func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + +/// A wrapper class of an application provided client streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class ClientStreamingHandler : public grpc::internal::MethodHandler { + public: + ClientStreamingHandler( + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + ServerReader<RequestType>*, ResponseType*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) final { + ServerReader<RequestType> reader( + param.call, static_cast<grpc::ServerContext*>(param.server_context)); + ResponseType rsp; + grpc::Status status = + CatchingFunctionHandler([this, ¶m, &reader, &rsp] { + return func_(service_, + static_cast<grpc::ServerContext*>(param.server_context), + &reader, &rsp); + }); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + } + if (status.ok()) { + status = ops.SendMessagePtr(&rsp); + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + ServerReader<RequestType>*, ResponseType*)> + func_; + ServiceType* service_; +}; + +/// A wrapper class of an application provided server streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler : public grpc::internal::MethodHandler { + public: + ServerStreamingHandler(std::function<grpc::Status( + ServiceType*, grpc::ServerContext*, + const RequestType*, ServerWriter<ResponseType>*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) final { + grpc::Status status = param.status; + if (status.ok()) { + ServerWriter<ResponseType> writer( + param.call, static_cast<grpc::ServerContext*>(param.server_context)); + status = CatchingFunctionHandler([this, ¶m, &writer] { + return func_(service_, + static_cast<grpc::ServerContext*>(param.server_context), + static_cast<RequestType*>(param.request), &writer); + }); + static_cast<RequestType*>(param.request)->~RequestType(); + } + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + if (param.server_context->has_pending_ops_) { + param.call->cq()->Pluck(¶m.server_context->pending_ops_); + } + param.call->cq()->Pluck(&ops); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + grpc::Status* status, void** /*handler_data*/) final { + grpc::ByteBuffer buf; + buf.set_buffer(req); + auto* request = + new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType(); + *status = + grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + const RequestType*, ServerWriter<ResponseType>*)> + func_; + ServiceType* service_; +}; + +/// A wrapper class of an application provided bidi-streaming handler. +/// This also applies to server-streamed implementation of a unary method +/// with the additional requirement that such methods must have done a +/// write for status to be ok +/// Since this is used by more than 1 class, the service is not passed in. +/// Instead, it is expected to be an implicitly-captured argument of func +/// (through bind or something along those lines) +template <class Streamer, bool WriteNeeded> +class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler { + public: + explicit TemplatedBidiStreamingHandler( + std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func) + : func_(func), write_needed_(WriteNeeded) {} + + void RunHandler(const HandlerParameter& param) final { + Streamer stream(param.call, + static_cast<grpc::ServerContext*>(param.server_context)); + grpc::Status status = CatchingFunctionHandler([this, ¶m, &stream] { + return func_(static_cast<grpc::ServerContext*>(param.server_context), + &stream); + }); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + if (write_needed_ && status.ok()) { + // If we needed a write but never did one, we need to mark the + // status as a fail + status = grpc::Status(grpc::StatusCode::INTERNAL, + "Service did not provide response message"); + } + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + if (param.server_context->has_pending_ops_) { + param.call->cq()->Pluck(¶m.server_context->pending_ops_); + } + param.call->cq()->Pluck(&ops); + } + + private: + std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_; + const bool write_needed_; +}; + +template <class ServiceType, class RequestType, class ResponseType> +class BidiStreamingHandler + : public TemplatedBidiStreamingHandler< + ServerReaderWriter<ResponseType, RequestType>, false> { + public: + BidiStreamingHandler(std::function<grpc::Status( + ServiceType*, grpc::ServerContext*, + ServerReaderWriter<ResponseType, RequestType>*)> + func, + ServiceType* service) + // TODO(vjpai): When gRPC supports C++14, move-capture func in the below + : TemplatedBidiStreamingHandler< + ServerReaderWriter<ResponseType, RequestType>, false>( + [func, service]( + grpc::ServerContext* ctx, + ServerReaderWriter<ResponseType, RequestType>* streamer) { + return func(service, ctx, streamer); + }) {} +}; + +template <class RequestType, class ResponseType> +class StreamedUnaryHandler + : public TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true> { + public: + explicit StreamedUnaryHandler( + std::function< + grpc::Status(grpc::ServerContext*, + ServerUnaryStreamer<RequestType, ResponseType>*)> + func) + : TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>( + std::move(func)) {} +}; + +template <class RequestType, class ResponseType> +class SplitServerStreamingHandler + : public TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false> { + public: + explicit SplitServerStreamingHandler( + std::function< + grpc::Status(grpc::ServerContext*, + ServerSplitStreamer<RequestType, ResponseType>*)> + func) + : TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>( + std::move(func)) {} +}; + +/// General method handler class for errors that prevent real method use +/// e.g., handle unknown method by returning UNIMPLEMENTED error. +template <grpc::StatusCode code> +class ErrorMethodHandler : public grpc::internal::MethodHandler { + public: + explicit ErrorMethodHandler(const TString& message) : message_(message) {} + + template <class T> + static void FillOps(grpc::ServerContextBase* context, + const TString& message, T* ops) { + grpc::Status status(code, message); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(&context->initial_metadata_, + context->initial_metadata_flags()); + if (context->compression_level_set()) { + ops->set_compression_level(context->compression_level()); + } + context->sent_initial_metadata_ = true; + } + ops->ServerSendStatus(&context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) final { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + ops; + FillOps(param.server_context, message_, &ops); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req, + grpc::Status* /*status*/, void** /*handler_data*/) final { + // We have to destroy any request payload + if (req != nullptr) { + grpc_byte_buffer_destroy(req); + } + return nullptr; + } + + private: + const TString message_; +}; + +typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED> + UnknownMethodHandler; +typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED> + ResourceExhaustedHandler; + +} // namespace internal +} // namespace grpc #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H diff --git a/contrib/libs/grpc/include/grpcpp/support/proto_buffer_reader.h b/contrib/libs/grpc/include/grpcpp/support/proto_buffer_reader.h new file mode 100644 index 0000000000..e89818a3d6 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/proto_buffer_reader.h @@ -0,0 +1,146 @@ +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_SUPPORT_PROTO_BUFFER_READER_H +#define GRPCPP_SUPPORT_PROTO_BUFFER_READER_H + +#include <type_traits> + +#include <grpc/byte_buffer.h> +#include <grpc/byte_buffer_reader.h> +#include <grpc/impl/grpc_types.h> +#include <grpc/slice.h> +#include <grpc/support/log.h> +#include <grpcpp/impl/codegen/config_protobuf.h> +#include <grpcpp/impl/serialization_traits.h> +#include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/status.h> + +/// This header provides an object that reads bytes directly from a +/// grpc::ByteBuffer, via the ZeroCopyInputStream interface + +namespace grpc { + +/// This is a specialization of the protobuf class ZeroCopyInputStream +/// The principle is to get one chunk of data at a time from the proto layer, +/// with options to backup (re-see some bytes) or skip (forward past some bytes) +/// +/// Read more about ZeroCopyInputStream interface here: +/// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyInputStream +class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream { + public: + /// Constructs buffer reader from \a buffer. Will set \a status() to non ok + /// if \a buffer is invalid (the internal buffer has not been initialized). + explicit ProtoBufferReader(ByteBuffer* buffer) + : byte_count_(0), backup_count_(0), status_() { + /// Implemented through a grpc_byte_buffer_reader which iterates + /// over the slices that make up a byte buffer + if (!buffer->Valid() || + !grpc_byte_buffer_reader_init(&reader_, buffer->c_buffer())) { + status_ = Status(StatusCode::INTERNAL, + "Couldn't initialize byte buffer reader"); + } + } + + ~ProtoBufferReader() override { + if (status_.ok()) { + grpc_byte_buffer_reader_destroy(&reader_); + } + } + + /// Give the proto library a chunk of data from the stream. The caller + /// may safely read from data[0, size - 1]. + bool Next(const void** data, int* size) override { + if (!status_.ok()) { + return false; + } + /// If we have backed up previously, we need to return the backed-up slice + if (backup_count_ > 0) { + *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) - + backup_count_; + GPR_ASSERT(backup_count_ <= INT_MAX); + *size = static_cast<int>(backup_count_); + backup_count_ = 0; + return true; + } + /// Otherwise get the next slice from the byte buffer reader + if (!grpc_byte_buffer_reader_peek(&reader_, &slice_)) { + return false; + } + *data = GRPC_SLICE_START_PTR(*slice_); + // On win x64, int is only 32bit + GPR_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX); + byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(*slice_)); + return true; + } + + /// Returns the status of the buffer reader. + Status status() const { return status_; } + + /// The proto library calls this to indicate that we should back up \a count + /// bytes that have already been returned by the last call of Next. + /// So do the backup and have that ready for a later Next. + void BackUp(int count) override { + GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(*slice_))); + backup_count_ = count; + } + + /// The proto library calls this to skip over \a count bytes. Implement this + /// using Next and BackUp combined. + bool Skip(int count) override { + const void* data; + int size; + while (Next(&data, &size)) { + if (size >= count) { + BackUp(size - count); + return true; + } + // size < count; + count -= size; + } + // error or we have too large count; + return false; + } + + /// Returns the total number of bytes read since this object was created. + int64_t ByteCount() const override { return byte_count_ - backup_count_; } + + // These protected members are needed to support internal optimizations. + // they expose internal bits of grpc core that are NOT stable. If you have + // a use case needs to use one of these functions, please send an email to + // https://groups.google.com/forum/#!forum/grpc-io. + protected: + void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; } + int64_t backup_count() { return backup_count_; } + void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; } + grpc_byte_buffer_reader* reader() { return &reader_; } + grpc_slice* slice() { return slice_; } + grpc_slice** mutable_slice_ptr() { return &slice_; } + + private: + int64_t byte_count_; ///< total bytes read since object creation + int64_t backup_count_; ///< how far backed up in the stream we are + grpc_byte_buffer_reader reader_; ///< internal object to read \a grpc_slice + ///< from the \a grpc_byte_buffer + grpc_slice* slice_; ///< current slice passed back to the caller + Status status_; ///< status of the entire object +}; + +} // namespace grpc + +#endif // GRPCPP_SUPPORT_PROTO_BUFFER_READER_H diff --git a/contrib/libs/grpc/include/grpcpp/support/proto_buffer_writer.h b/contrib/libs/grpc/include/grpcpp/support/proto_buffer_writer.h new file mode 100644 index 0000000000..eb7b2c40b9 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/proto_buffer_writer.h @@ -0,0 +1,176 @@ +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H +#define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H + +#include <type_traits> + +#include <grpc/byte_buffer.h> +#include <grpc/impl/grpc_types.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> +#include <grpc/support/log.h> +#include <grpcpp/impl/codegen/config_protobuf.h> +#include <grpcpp/impl/serialization_traits.h> +#include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/status.h> + +/// This header provides an object that writes bytes directly into a +/// grpc::ByteBuffer, via the ZeroCopyOutputStream interface + +namespace grpc { + +// Forward declaration for testing use only +namespace internal { +class ProtoBufferWriterPeer; +} // namespace internal + +const int kProtoBufferWriterMaxBufferLength = 1024 * 1024; + +/// This is a specialization of the protobuf class ZeroCopyOutputStream. +/// The principle is to give the proto layer one buffer of bytes at a time +/// that it can use to serialize the next portion of the message, with the +/// option to "backup" if more buffer is given than required at the last buffer. +/// +/// Read more about ZeroCopyOutputStream interface here: +/// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream +class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream { + public: + /// Constructor for this derived class + /// + /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created + /// \param block_size How big are the chunks to allocate at a time + /// \param total_size How many total bytes are required for this proto + ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size) + : block_size_(block_size), + total_size_(total_size), + byte_count_(0), + have_backup_(false) { + GPR_ASSERT(!byte_buffer->Valid()); + /// Create an empty raw byte buffer and look at its underlying slice buffer + grpc_byte_buffer* bp = grpc_raw_byte_buffer_create(nullptr, 0); + byte_buffer->set_buffer(bp); + slice_buffer_ = &bp->data.raw.slice_buffer; + } + + ~ProtoBufferWriter() override { + if (have_backup_) { + grpc_slice_unref(backup_slice_); + } + } + + /// Give the proto library the next buffer of bytes and its size. It is + /// safe for the caller to write from data[0, size - 1]. + bool Next(void** data, int* size) override { + // Protobuf should not ask for more memory than total_size_. + GPR_ASSERT(byte_count_ < total_size_); + // 1. Use the remaining backup slice if we have one + // 2. Otherwise allocate a slice, up to the remaining length needed + // or our maximum allocation size + // 3. Provide the slice start and size available + // 4. Add the slice being returned to the slice buffer + size_t remain = static_cast<size_t>(total_size_ - byte_count_); + if (have_backup_) { + /// If we have a backup slice, we should use it first + slice_ = backup_slice_; + have_backup_ = false; + if (GRPC_SLICE_LENGTH(slice_) > remain) { + GRPC_SLICE_SET_LENGTH(slice_, remain); + } + } else { + // When less than a whole block is needed, only allocate that much. + // But make sure the allocated slice is not inlined. + size_t allocate_length = + remain > static_cast<size_t>(block_size_) ? block_size_ : remain; + slice_ = grpc_slice_malloc(allocate_length > GRPC_SLICE_INLINED_SIZE + ? allocate_length + : GRPC_SLICE_INLINED_SIZE + 1); + } + *data = GRPC_SLICE_START_PTR(slice_); + // On win x64, int is only 32bit + GPR_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); + byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_)); + // Using grpc_slice_buffer_add could modify slice_ and merge it with the + // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to + // ensure the slice gets added at a separate index. It can then be kept + // around and popped later in the BackUp function. + grpc_slice_buffer_add_indexed(slice_buffer_, slice_); + return true; + } + + /// Backup by \a count bytes because Next returned more bytes than needed + /// (only used in the last buffer). \a count must be less than or equal too + /// the last buffer returned from next. + void BackUp(int count) override { + // count == 0 is invoked by ZeroCopyOutputStream users indicating that any + // potential buffer obtained through a previous call to Next() is final. + // ZeroCopyOutputStream implementations such as streaming output can use + // these calls to flush any temporary buffer and flush the output. The logic + // below is not robust against count == 0 invocations, so directly return. + if (count == 0) return; + + /// 1. Remove the partially-used last slice from the slice buffer + /// 2. Split it into the needed (if any) and unneeded part + /// 3. Add the needed part back to the slice buffer + /// 4. Mark that we still have the remaining part (for later use/unref) + GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_))); + grpc_slice_buffer_pop(slice_buffer_); + if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) { + backup_slice_ = slice_; + } else { + backup_slice_ = + grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count); + grpc_slice_buffer_add(slice_buffer_, slice_); + } + // It's dangerous to keep an inlined grpc_slice as the backup slice, since + // on a following Next() call, a reference will be returned to this slice + // via GRPC_SLICE_START_PTR, which will not be an address held by + // slice_buffer_. + have_backup_ = backup_slice_.refcount != nullptr; + byte_count_ -= count; + } + + /// Returns the total number of bytes written since this object was created. + int64_t ByteCount() const override { return byte_count_; } + + // These protected members are needed to support internal optimizations. + // they expose internal bits of grpc core that are NOT stable. If you have + // a use case needs to use one of these functions, please send an email to + // https://groups.google.com/forum/#!forum/grpc-io. + protected: + grpc_slice_buffer* slice_buffer() { return slice_buffer_; } + void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; } + + private: + // friend for testing purposes only + friend class internal::ProtoBufferWriterPeer; + const int block_size_; ///< size to alloc for each new \a grpc_slice needed + const int total_size_; ///< byte size of proto being serialized + int64_t byte_count_; ///< bytes written since this object was created + grpc_slice_buffer* + slice_buffer_; ///< internal buffer of slices holding the serialized data + bool have_backup_; ///< if we are holding a backup slice or not + grpc_slice backup_slice_; ///< holds space we can still write to, if the + ///< caller has called BackUp + grpc_slice slice_; ///< current slice passed back to the caller +}; + +} // namespace grpc + +#endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H diff --git a/contrib/libs/grpc/include/grpcpp/support/server_callback.h b/contrib/libs/grpc/include/grpcpp/support/server_callback.h index 1ffdce53d9..5cd4df28a3 100644 --- a/contrib/libs/grpc/include/grpcpp/support/server_callback.h +++ b/contrib/libs/grpc/include/grpcpp/support/server_callback.h @@ -1,24 +1,797 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H #define GRPCPP_SUPPORT_SERVER_CALLBACK_H -#include <grpcpp/impl/codegen/server_callback.h> // IWYU pragma: export +#include <atomic> +#include <functional> +#include <type_traits> + +#include <grpcpp/impl/call.h> +#include <grpcpp/impl/call_op_set.h> +#include <grpcpp/impl/sync.h> +#include <grpcpp/support/callback_common.h> +#include <grpcpp/support/config.h> +#include <grpcpp/support/message_allocator.h> +#include <grpcpp/support/status.h> + +namespace grpc { + +// Declare base class of all reactors as internal +namespace internal { + +// Forward declarations +template <class Request, class Response> +class CallbackUnaryHandler; +template <class Request, class Response> +class CallbackClientStreamingHandler; +template <class Request, class Response> +class CallbackServerStreamingHandler; +template <class Request, class Response> +class CallbackBidiHandler; + +class ServerReactor { + public: + virtual ~ServerReactor() = default; + virtual void OnDone() = 0; + virtual void OnCancel() = 0; + + // The following is not API. It is for internal use only and specifies whether + // all reactions of this Reactor can be run without an extra executor + // scheduling. This should only be used for internally-defined reactors with + // trivial reactions. + virtual bool InternalInlineable() { return false; } + + private: + template <class Request, class Response> + friend class CallbackUnaryHandler; + template <class Request, class Response> + friend class CallbackClientStreamingHandler; + template <class Request, class Response> + friend class CallbackServerStreamingHandler; + template <class Request, class Response> + friend class CallbackBidiHandler; +}; + +/// The base class of ServerCallbackUnary etc. +class ServerCallbackCall { + public: + virtual ~ServerCallbackCall() {} + + // This object is responsible for tracking when it is safe to call OnDone and + // OnCancel. OnDone should not be called until the method handler is complete, + // Finish has been called, the ServerContext CompletionOp (which tracks + // cancellation or successful completion) has completed, and all outstanding + // Read/Write actions have seen their reactions. OnCancel should not be called + // until after the method handler is done and the RPC has completed with a + // cancellation. This is tracked by counting how many of these conditions have + // been met and calling OnCancel when none remain unmet. + + // Public versions of MaybeDone: one where we don't know the reactor in + // advance (used for the ServerContext CompletionOp), and one for where we + // know the inlineability of the OnDone reaction. You should set the inline + // flag to true if either the Reactor is InternalInlineable() or if this + // callback is already being forced to run dispatched to an executor + // (typically because it contains additional work than just the MaybeDone). + + void MaybeDone() { + if (GPR_UNLIKELY(Unref() == 1)) { + ScheduleOnDone(reactor()->InternalInlineable()); + } + } + + void MaybeDone(bool inline_ondone) { + if (GPR_UNLIKELY(Unref() == 1)) { + ScheduleOnDone(inline_ondone); + } + } + + // Fast version called with known reactor passed in, used from derived + // classes, typically in non-cancel case + void MaybeCallOnCancel(ServerReactor* reactor) { + if (GPR_UNLIKELY(UnblockCancellation())) { + CallOnCancel(reactor); + } + } + + // Slower version called from object that doesn't know the reactor a priori + // (such as the ServerContext CompletionOp which is formed before the + // reactor). This is used in cancel cases only, so it's ok to be slower and + // invoke a virtual function. + void MaybeCallOnCancel() { + if (GPR_UNLIKELY(UnblockCancellation())) { + CallOnCancel(reactor()); + } + } + + protected: + /// Increases the reference count + void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } + + private: + virtual ServerReactor* reactor() = 0; + + // CallOnDone performs the work required at completion of the RPC: invoking + // the OnDone function and doing all necessary cleanup. This function is only + // ever invoked on a fully-Unref'fed ServerCallbackCall. + virtual void CallOnDone() = 0; + + // If the OnDone reaction is inlineable, execute it inline. Otherwise send it + // to an executor. + void ScheduleOnDone(bool inline_ondone); + + // If the OnCancel reaction is inlineable, execute it inline. Otherwise send + // it to an executor. + void CallOnCancel(ServerReactor* reactor); + + // Implement the cancellation constraint counter. Return true if OnCancel + // should be called, false otherwise. + bool UnblockCancellation() { + return on_cancel_conditions_remaining_.fetch_sub( + 1, std::memory_order_acq_rel) == 1; + } + + /// Decreases the reference count and returns the previous value + int Unref() { + return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); + } + + std::atomic_int on_cancel_conditions_remaining_{2}; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for start, Finish, and CompletionOp +}; + +template <class Request, class Response> +class DefaultMessageHolder : public MessageHolder<Request, Response> { + public: + DefaultMessageHolder() { + this->set_request(&request_obj_); + this->set_response(&response_obj_); + } + void Release() override { + // the object is allocated in the call arena. + this->~DefaultMessageHolder<Request, Response>(); + } + + private: + Request request_obj_; + Response response_obj_; +}; + +} // namespace internal + +// Forward declarations +class ServerUnaryReactor; +template <class Request> +class ServerReadReactor; +template <class Response> +class ServerWriteReactor; +template <class Request, class Response> +class ServerBidiReactor; + +// NOTE: The actual call/stream object classes are provided as API only to +// support mocking. There are no implementations of these class interfaces in +// the API. +class ServerCallbackUnary : public internal::ServerCallbackCall { + public: + ~ServerCallbackUnary() override {} + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + + protected: + // Use a template rather than explicitly specifying ServerUnaryReactor to + // delay binding and avoid a circular forward declaration issue + template <class Reactor> + void BindReactor(Reactor* reactor) { + reactor->InternalBindCall(this); + } +}; + +template <class Request> +class ServerCallbackReader : public internal::ServerCallbackCall { + public: + ~ServerCallbackReader() override {} + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + + protected: + void BindReactor(ServerReadReactor<Request>* reactor) { + reactor->InternalBindReader(this); + } +}; + +template <class Response> +class ServerCallbackWriter : public internal::ServerCallbackCall { + public: + ~ServerCallbackWriter() override {} + + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, + grpc::Status s) = 0; + + protected: + void BindReactor(ServerWriteReactor<Response>* reactor) { + reactor->InternalBindWriter(this); + } +}; + +template <class Request, class Response> +class ServerCallbackReaderWriter : public internal::ServerCallbackCall { + public: + ~ServerCallbackReaderWriter() override {} + + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, + grpc::Status s) = 0; + + protected: + void BindReactor(ServerBidiReactor<Request, Response>* reactor) { + reactor->InternalBindStream(this); + } +}; + +// The following classes are the reactor interfaces that are to be implemented +// by the user, returned as the output parameter of the method handler for a +// callback method. Note that none of the classes are pure; all reactions have a +// default empty reaction so that the user class only needs to override those +// reactions that it cares about. The reaction methods will be invoked by the +// library in response to the completion of various operations. Reactions must +// not include blocking operations (such as blocking I/O, starting synchronous +// RPCs, or waiting on condition variables). Reactions may be invoked +// concurrently, except that OnDone is called after all others (assuming proper +// API usage). The reactor may not be deleted until OnDone is called. + +/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. +template <class Request, class Response> +class ServerBidiReactor : public internal::ServerReactor { + public: + // NOTE: Initializing stream_ as a constructor initializer rather than a + // default initializer because gcc-4.x requires a copy constructor for + // default initializing a templated member, which isn't ok for atomic. + // TODO(vjpai): Switch to default constructor and default initializer when + // gcc-4.x is no longer supported + ServerBidiReactor() : stream_(nullptr) {} + ~ServerBidiReactor() override = default; + + /// Send any initial metadata stored in the RPC context. If not invoked, + /// any initial metadata will be passed along with the first Write or the + /// Finish (if there are no writes). + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter<Request, Response>* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + stream->SendInitialMetadata(); + } + + /// Initiate a read operation. + /// + /// \param[out] req Where to eventually store the read message. Valid when + /// the library calls OnReadDone + void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter<Request, Response>* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.read_wanted = req; + return; + } + } + stream->Read(req); + } + + /// Initiate a write operation. + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + void StartWrite(const Response* resp) { + StartWrite(resp, grpc::WriteOptions()); + } + + /// Initiate a write operation with specified options. + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWrite(const Response* resp, grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter<Request, Response>* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + return; + } + } + stream->Write(resp, options); + } + + /// Initiate a write operation with specified options and final RPC Status, + /// which also causes any trailing metadata for this RPC to be sent out. + /// StartWriteAndFinish is like merging StartWriteLast and Finish into a + /// single step. A key difference, though, is that this operation doesn't have + /// an OnWriteDone reaction - it is considered complete only when OnDone is + /// available. An RPC can either have StartWriteAndFinish or Finish, but not + /// both. + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnDone is called. + /// \param[in] options The WriteOptions to use for writing this message + /// \param[in] s The status outcome of this RPC + void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, + grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter<Request, Response>* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.write_and_finish_wanted = true; + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + backlog_.status_wanted = std::move(s); + return; + } + } + stream->WriteAndFinish(resp, options, std::move(s)); + } + + /// Inform system of a planned write operation with specified options, but + /// allow the library to schedule the actual write coalesced with the writing + /// of trailing metadata (which takes place on a Finish call). + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWriteLast(const Response* resp, grpc::WriteOptions options) { + StartWrite(resp, options.set_last_message()); + } + + /// Indicate that the stream is to be finished and the trailing metadata and + /// RPC status are to be sent. Every RPC MUST be finished using either Finish + /// or StartWriteAndFinish (but not both), even if the RPC is already + /// cancelled. + /// + /// \param[in] s The status outcome of this RPC + void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter<Request, Response>* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + stream->Finish(std::move(s)); + } + + /// Notifies the application that an explicit StartSendInitialMetadata + /// operation completed. Not used when the sending of initial metadata + /// piggybacks onto the first write. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no further read-side operation + /// will succeed. + virtual void OnReadDone(bool /*ok*/) {} + + /// Notifies the application that a StartWrite (or StartWriteLast) operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnWriteDone(bool /*ok*/) {} + + /// Notifies the application that all operations associated with this RPC + /// have completed. This is an override (from the internal base class) but + /// still abstract, so derived classes MUST override it to be instantiated. + void OnDone() override = 0; + + /// Notifies the application that this RPC has been cancelled. This is an + /// override (from the internal base class) but not final, so derived classes + /// should override it if they want to take action. + void OnCancel() override {} + + private: + friend class ServerCallbackReaderWriter<Request, Response>; + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindStream( + ServerCallbackReaderWriter<Request, Response>* stream) { + grpc::internal::MutexLock l(&stream_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + stream->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { + stream->Read(backlog_.read_wanted); + } + if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { + stream->WriteAndFinish(backlog_.write_wanted, + std::move(backlog_.write_options_wanted), + std::move(backlog_.status_wanted)); + } else { + if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { + stream->Write(backlog_.write_wanted, + std::move(backlog_.write_options_wanted)); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + stream->Finish(std::move(backlog_.status_wanted)); + } + } + // Set stream_ last so that other functions can use it lock-free + stream_.store(stream, std::memory_order_release); + } + + grpc::internal::Mutex stream_mu_; + // TODO(vjpai): Make stream_or_backlog_ into a std::variant or y_absl::variant + // once C++17 or ABSL is supported since stream and backlog are + // mutually exclusive in this class. Do likewise with the + // remaining reactor classes and their backlogs as well. + std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool write_and_finish_wanted = false; + bool finish_wanted = false; + Request* read_wanted = nullptr; + const Response* write_wanted = nullptr; + grpc::WriteOptions write_options_wanted; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(stream_mu_); +}; + +/// \a ServerReadReactor is the interface for a client-streaming RPC. +template <class Request> +class ServerReadReactor : public internal::ServerReactor { + public: + ServerReadReactor() : reader_(nullptr) {} + ~ServerReadReactor() override = default; + + /// The following operation initiations are exactly like ServerBidiReactor. + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { + ServerCallbackReader<Request>* reader = + reader_.load(std::memory_order_acquire); + if (reader == nullptr) { + grpc::internal::MutexLock l(&reader_mu_); + reader = reader_.load(std::memory_order_relaxed); + if (reader == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + reader->SendInitialMetadata(); + } + void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { + ServerCallbackReader<Request>* reader = + reader_.load(std::memory_order_acquire); + if (reader == nullptr) { + grpc::internal::MutexLock l(&reader_mu_); + reader = reader_.load(std::memory_order_relaxed); + if (reader == nullptr) { + backlog_.read_wanted = req; + return; + } + } + reader->Read(req); + } + void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { + ServerCallbackReader<Request>* reader = + reader_.load(std::memory_order_acquire); + if (reader == nullptr) { + grpc::internal::MutexLock l(&reader_mu_); + reader = reader_.load(std::memory_order_relaxed); + if (reader == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + reader->Finish(std::move(s)); + } + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + virtual void OnReadDone(bool /*ok*/) {} + void OnDone() override = 0; + void OnCancel() override {} + + private: + friend class ServerCallbackReader<Request>; + + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindReader(ServerCallbackReader<Request>* reader) + Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { + grpc::internal::MutexLock l(&reader_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + reader->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { + reader->Read(backlog_.read_wanted); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + reader->Finish(std::move(backlog_.status_wanted)); + } + // Set reader_ last so that other functions can use it lock-free + reader_.store(reader, std::memory_order_release); + } + + grpc::internal::Mutex reader_mu_; + std::atomic<ServerCallbackReader<Request>*> reader_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool finish_wanted = false; + Request* read_wanted = nullptr; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(reader_mu_); +}; + +/// \a ServerWriteReactor is the interface for a server-streaming RPC. +template <class Response> +class ServerWriteReactor : public internal::ServerReactor { + public: + ServerWriteReactor() : writer_(nullptr) {} + ~ServerWriteReactor() override = default; + + /// The following operation initiations are exactly like ServerBidiReactor. + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter<Response>* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + writer->SendInitialMetadata(); + } + void StartWrite(const Response* resp) { + StartWrite(resp, grpc::WriteOptions()); + } + void StartWrite(const Response* resp, grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter<Response>* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + return; + } + } + writer->Write(resp, options); + } + void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, + grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter<Response>* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.write_and_finish_wanted = true; + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + backlog_.status_wanted = std::move(s); + return; + } + } + writer->WriteAndFinish(resp, options, std::move(s)); + } + void StartWriteLast(const Response* resp, grpc::WriteOptions options) { + StartWrite(resp, options.set_last_message()); + } + void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter<Response>* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + writer->Finish(std::move(s)); + } + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + virtual void OnWriteDone(bool /*ok*/) {} + void OnDone() override = 0; + void OnCancel() override {} + + private: + friend class ServerCallbackWriter<Response>; + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) + Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { + grpc::internal::MutexLock l(&writer_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + writer->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { + writer->WriteAndFinish(backlog_.write_wanted, + std::move(backlog_.write_options_wanted), + std::move(backlog_.status_wanted)); + } else { + if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { + writer->Write(backlog_.write_wanted, + std::move(backlog_.write_options_wanted)); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + writer->Finish(std::move(backlog_.status_wanted)); + } + } + // Set writer_ last so that other functions can use it lock-free + writer_.store(writer, std::memory_order_release); + } + + grpc::internal::Mutex writer_mu_; + std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool write_and_finish_wanted = false; + bool finish_wanted = false; + const Response* write_wanted = nullptr; + grpc::WriteOptions write_options_wanted; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(writer_mu_); +}; + +class ServerUnaryReactor : public internal::ServerReactor { + public: + ServerUnaryReactor() : call_(nullptr) {} + ~ServerUnaryReactor() override = default; + + /// StartSendInitialMetadata is exactly like ServerBidiReactor. + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(call_mu_) { + ServerCallbackUnary* call = call_.load(std::memory_order_acquire); + if (call == nullptr) { + grpc::internal::MutexLock l(&call_mu_); + call = call_.load(std::memory_order_relaxed); + if (call == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + call->SendInitialMetadata(); + } + /// Finish is similar to ServerBidiReactor except for one detail. + /// If the status is non-OK, any message will not be sent. Instead, + /// the client will only receive the status and any trailing metadata. + void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(call_mu_) { + ServerCallbackUnary* call = call_.load(std::memory_order_acquire); + if (call == nullptr) { + grpc::internal::MutexLock l(&call_mu_); + call = call_.load(std::memory_order_relaxed); + if (call == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + call->Finish(std::move(s)); + } + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + void OnDone() override = 0; + void OnCancel() override {} + + private: + friend class ServerCallbackUnary; + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindCall(ServerCallbackUnary* call) + Y_ABSL_LOCKS_EXCLUDED(call_mu_) { + grpc::internal::MutexLock l(&call_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + call->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + call->Finish(std::move(backlog_.status_wanted)); + } + // Set call_ last so that other functions can use it lock-free + call_.store(call, std::memory_order_release); + } + + grpc::internal::Mutex call_mu_; + std::atomic<ServerCallbackUnary*> call_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool finish_wanted = false; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(call_mu_); +}; + +namespace internal { + +template <class Base> +class FinishOnlyReactor : public Base { + public: + explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); } + void OnDone() override { this->~FinishOnlyReactor(); } +}; + +using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>; +template <class Request> +using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>; +template <class Response> +using UnimplementedWriteReactor = + FinishOnlyReactor<ServerWriteReactor<Response>>; +template <class Request, class Response> +using UnimplementedBidiReactor = + FinishOnlyReactor<ServerBidiReactor<Request, Response>>; + +} // namespace internal + +// TODO(vjpai): Remove namespace experimental when last known users are migrated +// off. +namespace experimental { + +template <class Request, class Response> +using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>; + +} // namespace experimental + +} // namespace grpc #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H diff --git a/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h b/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h index ad9c7a1869..ecb4e34259 100644 --- a/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h +++ b/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h @@ -1,24 +1,140 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H #define GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H -#include <grpcpp/impl/codegen/server_interceptor.h> // IWYU pragma: export +#include <atomic> +#include <vector> + +#include <grpc/support/log.h> +#include <grpcpp/impl/rpc_method.h> +#include <grpcpp/support/interceptor.h> +#include <grpcpp/support/string_ref.h> + +namespace grpc { +class ServerContextBase; +namespace internal { +class InterceptorBatchMethodsImpl; +} + +namespace experimental { +class ServerRpcInfo; + +// A factory interface for creation of server interceptors. A vector of +// factories can be provided to ServerBuilder which will be used to create a new +// vector of server interceptors per RPC. Server interceptor authors should +// create a subclass of ServerInterceptorFactorInterface which creates objects +// of their interceptors. +class ServerInterceptorFactoryInterface { + public: + virtual ~ServerInterceptorFactoryInterface() {} + // Returns a pointer to an Interceptor object on successful creation, nullptr + // otherwise. If nullptr is returned, this server interceptor factory is + // ignored for the purposes of that RPC. + virtual Interceptor* CreateServerInterceptor(ServerRpcInfo* info) = 0; +}; + +/// ServerRpcInfo represents the state of a particular RPC as it +/// appears to an interceptor. It is created and owned by the library and +/// passed to the CreateServerInterceptor method of the application's +/// ServerInterceptorFactoryInterface implementation +class ServerRpcInfo { + public: + /// Type categorizes RPCs by unary or streaming type + enum class Type { UNARY, CLIENT_STREAMING, SERVER_STREAMING, BIDI_STREAMING }; + + ~ServerRpcInfo() {} + + // Delete all copy and move constructors and assignments + ServerRpcInfo(const ServerRpcInfo&) = delete; + ServerRpcInfo& operator=(const ServerRpcInfo&) = delete; + ServerRpcInfo(ServerRpcInfo&&) = delete; + ServerRpcInfo& operator=(ServerRpcInfo&&) = delete; + + // Getter methods + + /// Return the fully-specified method name + const char* method() const { return method_; } + + /// Return the type of the RPC (unary or a streaming flavor) + Type type() const { return type_; } + + /// Return a pointer to the underlying ServerContext structure associated + /// with the RPC to support features that apply to it + ServerContextBase* server_context() { return ctx_; } + + private: + static_assert(Type::UNARY == + static_cast<Type>(internal::RpcMethod::NORMAL_RPC), + "violated expectation about Type enum"); + static_assert(Type::CLIENT_STREAMING == + static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::SERVER_STREAMING == + static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::BIDI_STREAMING == + static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), + "violated expectation about Type enum"); + + ServerRpcInfo(ServerContextBase* ctx, const char* method, + internal::RpcMethod::RpcType type) + : ctx_(ctx), method_(method), type_(static_cast<Type>(type)) {} + + // Runs interceptor at pos \a pos. + void RunInterceptor( + experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) { + GPR_ASSERT(pos < interceptors_.size()); + interceptors_[pos]->Intercept(interceptor_methods); + } + + void RegisterInterceptors( + const std::vector< + std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>& + creators) { + for (const auto& creator : creators) { + auto* interceptor = creator->CreateServerInterceptor(this); + if (interceptor != nullptr) { + interceptors_.push_back( + std::unique_ptr<experimental::Interceptor>(interceptor)); + } + } + } + + void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); } + void Unref() { + if (GPR_UNLIKELY(ref_.fetch_sub(1, std::memory_order_acq_rel) == 1)) { + delete this; + } + } + + ServerContextBase* ctx_ = nullptr; + const char* method_ = nullptr; + const Type type_; + std::atomic<intptr_t> ref_{1}; + std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; + + friend class internal::InterceptorBatchMethodsImpl; + friend class grpc::ServerContextBase; +}; + +} // namespace experimental +} // namespace grpc #endif // GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H diff --git a/contrib/libs/grpc/include/grpcpp/support/slice.h b/contrib/libs/grpc/include/grpcpp/support/slice.h index 2434983f59..bafdaba9cd 100644 --- a/contrib/libs/grpc/include/grpcpp/support/slice.h +++ b/contrib/libs/grpc/include/grpcpp/support/slice.h @@ -1,26 +1,142 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_SLICE_H #define GRPCPP_SUPPORT_SLICE_H #include <grpc/slice.h> -#include <grpcpp/impl/codegen/slice.h> // IWYU pragma: export #include <grpcpp/support/config.h> +#include <grpcpp/support/string_ref.h> + +namespace grpc { + +/// A wrapper around \a grpc_slice. +/// +/// A slice represents a contiguous reference counted array of bytes. +/// It is cheap to take references to a slice, and it is cheap to create a +/// slice pointing to a subset of another slice. +class Slice final { + public: + /// Construct an empty slice. + Slice() : slice_(grpc_empty_slice()) {} + /// Destructor - drops one reference. + ~Slice() { grpc_slice_unref(slice_); } + + enum AddRef { ADD_REF }; + /// Construct a slice from \a slice, adding a reference. + Slice(grpc_slice slice, AddRef) : slice_(grpc_slice_ref(slice)) {} + + enum StealRef { STEAL_REF }; + /// Construct a slice from \a slice, stealing a reference. + Slice(grpc_slice slice, StealRef) : slice_(slice) {} + + /// Allocate a slice of specified size + explicit Slice(size_t len) : slice_(grpc_slice_malloc(len)) {} + + /// Construct a slice from a copied buffer + Slice(const void* buf, size_t len) + : slice_(grpc_slice_from_copied_buffer(reinterpret_cast<const char*>(buf), + len)) {} + + /// Construct a slice from a copied string + // NOLINTNEXTLINE(google-explicit-constructor) + Slice(const TString& str) + : slice_(grpc_slice_from_copied_buffer(str.c_str(), str.length())) {} + + enum StaticSlice { STATIC_SLICE }; + + /// Construct a slice from a static buffer + Slice(const void* buf, size_t len, StaticSlice) + : slice_(grpc_slice_from_static_buffer(reinterpret_cast<const char*>(buf), + len)) {} + + /// Copy constructor, adds a reference. + Slice(const Slice& other) : slice_(grpc_slice_ref(other.slice_)) {} + + /// Move constructor, steals a reference. + Slice(Slice&& other) noexcept : slice_(other.slice_) { + other.slice_ = grpc_empty_slice(); + } + + /// Assignment, reference count is unchanged. + Slice& operator=(Slice other) { + std::swap(slice_, other.slice_); + return *this; + } + + /// Create a slice pointing at some data. Calls malloc to allocate a refcount + /// for the object, and arranges that destroy will be called with the + /// user data pointer passed in at destruction. Can be the same as buf or + /// different (e.g., if data is part of a larger structure that must be + /// destroyed when the data is no longer needed) + Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data) + : slice_(grpc_slice_new_with_user_data(buf, len, destroy, user_data)) {} + + /// Specialization of above for common case where buf == user_data + Slice(void* buf, size_t len, void (*destroy)(void*)) + : Slice(buf, len, destroy, buf) {} + + /// Similar to the above but has a destroy that also takes slice length + Slice(void* buf, size_t len, void (*destroy)(void*, size_t)) + : slice_(grpc_slice_new_with_len(buf, len, destroy)) {} + + /// Byte size. + size_t size() const { return GRPC_SLICE_LENGTH(slice_); } + + /// Raw pointer to the beginning (first element) of the slice. + const uint8_t* begin() const { return GRPC_SLICE_START_PTR(slice_); } + + /// Raw pointer to the end (one byte \em past the last element) of the slice. + const uint8_t* end() const { return GRPC_SLICE_END_PTR(slice_); } + + /// Returns a substring of the `slice` as another slice. + Slice sub(size_t begin, size_t end) const { + return Slice(grpc_slice_sub(slice_, begin, end), STEAL_REF); + } + + /// Raw C slice. Caller needs to call grpc_slice_unref when done. + grpc_slice c_slice() const { return grpc_slice_ref(slice_); } + + private: + friend class ByteBuffer; + + grpc_slice slice_; +}; + +inline grpc::string_ref StringRefFromSlice(const grpc_slice* slice) { + return grpc::string_ref( + reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(*slice)), + GRPC_SLICE_LENGTH(*slice)); +} + +inline TString StringFromCopiedSlice(grpc_slice slice) { + return TString(reinterpret_cast<char*>(GRPC_SLICE_START_PTR(slice)), + GRPC_SLICE_LENGTH(slice)); +} + +inline grpc_slice SliceReferencingString(const TString& str) { + return grpc_slice_from_static_buffer(str.data(), str.length()); +} + +inline grpc_slice SliceFromCopiedString(const TString& str) { + return grpc_slice_from_copied_buffer(str.data(), str.length()); +} + +} // namespace grpc #endif // GRPCPP_SUPPORT_SLICE_H diff --git a/contrib/libs/grpc/include/grpcpp/support/status.h b/contrib/libs/grpc/include/grpcpp/support/status.h index e46b46d16f..0930e642b0 100644 --- a/contrib/libs/grpc/include/grpcpp/support/status.h +++ b/contrib/libs/grpc/include/grpcpp/support/status.h @@ -1,24 +1,24 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_STATUS_H #define GRPCPP_SUPPORT_STATUS_H -#include <grpcpp/impl/codegen/status.h> // IWYU pragma: export +#include <grpcpp/impl/status.h> // IWYU pragma: export #endif // GRPCPP_SUPPORT_STATUS_H diff --git a/contrib/libs/grpc/include/grpcpp/support/status_code_enum.h b/contrib/libs/grpc/include/grpcpp/support/status_code_enum.h new file mode 100644 index 0000000000..7110071478 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/status_code_enum.h @@ -0,0 +1,145 @@ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_SUPPORT_STATUS_CODE_ENUM_H +#define GRPCPP_SUPPORT_STATUS_CODE_ENUM_H + +// IWYU pragma: private, include <grpcpp/support/status.h> + +namespace grpc { + +enum StatusCode { + /// Not an error; returned on success. + OK = 0, + + /// The operation was cancelled (typically by the caller). + CANCELLED = 1, + + /// Unknown error. An example of where this error may be returned is if a + /// Status value received from another address space belongs to an error-space + /// that is not known in this address space. Also errors raised by APIs that + /// do not return enough error information may be converted to this error. + UNKNOWN = 2, + + /// Client specified an invalid argument. Note that this differs from + /// FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments that are + /// problematic regardless of the state of the system (e.g., a malformed file + /// name). + INVALID_ARGUMENT = 3, + + /// Deadline expired before operation could complete. For operations that + /// change the state of the system, this error may be returned even if the + /// operation has completed successfully. For example, a successful response + /// from a server could have been delayed long enough for the deadline to + /// expire. + DEADLINE_EXCEEDED = 4, + + /// Some requested entity (e.g., file or directory) was not found. + NOT_FOUND = 5, + + /// Some entity that we attempted to create (e.g., file or directory) already + /// exists. + ALREADY_EXISTS = 6, + + /// The caller does not have permission to execute the specified operation. + /// PERMISSION_DENIED must not be used for rejections caused by exhausting + /// some resource (use RESOURCE_EXHAUSTED instead for those errors). + /// PERMISSION_DENIED must not be used if the caller can not be identified + /// (use UNAUTHENTICATED instead for those errors). + PERMISSION_DENIED = 7, + + /// The request does not have valid authentication credentials for the + /// operation. + UNAUTHENTICATED = 16, + + /// Some resource has been exhausted, perhaps a per-user quota, or perhaps the + /// entire file system is out of space. + RESOURCE_EXHAUSTED = 8, + + /// Operation was rejected because the system is not in a state required for + /// the operation's execution. For example, directory to be deleted may be + /// non-empty, an rmdir operation is applied to a non-directory, etc. + /// + /// A litmus test that may help a service implementor in deciding + /// between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: + /// (a) Use UNAVAILABLE if the client can retry just the failing call. + /// (b) Use ABORTED if the client should retry at a higher-level + /// (e.g., restarting a read-modify-write sequence). + /// (c) Use FAILED_PRECONDITION if the client should not retry until + /// the system state has been explicitly fixed. E.g., if an "rmdir" + /// fails because the directory is non-empty, FAILED_PRECONDITION + /// should be returned since the client should not retry unless + /// they have first fixed up the directory by deleting files from it. + /// (d) Use FAILED_PRECONDITION if the client performs conditional + /// REST Get/Update/Delete on a resource and the resource on the + /// server does not match the condition. E.g., conflicting + /// read-modify-write on the same resource. + FAILED_PRECONDITION = 9, + + /// The operation was aborted, typically due to a concurrency issue like + /// sequencer check failures, transaction aborts, etc. + /// + /// See litmus test above for deciding between FAILED_PRECONDITION, ABORTED, + /// and UNAVAILABLE. + ABORTED = 10, + + /// Operation was attempted past the valid range. E.g., seeking or reading + /// past end of file. + /// + /// Unlike INVALID_ARGUMENT, this error indicates a problem that may be fixed + /// if the system state changes. For example, a 32-bit file system will + /// generate INVALID_ARGUMENT if asked to read at an offset that is not in the + /// range [0,2^32-1], but it will generate OUT_OF_RANGE if asked to read from + /// an offset past the current file size. + /// + /// There is a fair bit of overlap between FAILED_PRECONDITION and + /// OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific error) + /// when it applies so that callers who are iterating through a space can + /// easily look for an OUT_OF_RANGE error to detect when they are done. + OUT_OF_RANGE = 11, + + /// Operation is not implemented or not supported/enabled in this service. + UNIMPLEMENTED = 12, + + /// Internal errors. Means some invariants expected by underlying System has + /// been broken. If you see one of these errors, Something is very broken. + INTERNAL = 13, + + /// The service is currently unavailable. This is a most likely a transient + /// condition and may be corrected by retrying with a backoff. Note that it is + /// not always safe to retry non-idempotent operations. + /// + /// \warning Although data MIGHT not have been transmitted when this + /// status occurs, there is NOT A GUARANTEE that the server has not seen + /// anything. So in general it is unsafe to retry on this status code + /// if the call is non-idempotent. + /// + /// See litmus test above for deciding between FAILED_PRECONDITION, ABORTED, + /// and UNAVAILABLE. + UNAVAILABLE = 14, + + /// Unrecoverable data loss or corruption. + DATA_LOSS = 15, + + /// Force users to include a default branch: + DO_NOT_USE = -1 +}; + +} // namespace grpc + +#endif // GRPCPP_SUPPORT_STATUS_CODE_ENUM_H diff --git a/contrib/libs/grpc/include/grpcpp/support/string_ref.h b/contrib/libs/grpc/include/grpcpp/support/string_ref.h index 6cf82bedf6..9ecff4a6b3 100644 --- a/contrib/libs/grpc/include/grpcpp/support/string_ref.h +++ b/contrib/libs/grpc/include/grpcpp/support/string_ref.h @@ -1,24 +1,152 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_STRING_REF_H #define GRPCPP_SUPPORT_STRING_REF_H -#include <grpcpp/impl/codegen/string_ref.h> // IWYU pragma: export +#include <string.h> + +#include <algorithm> +#include <iosfwd> +#include <iostream> +#include <iterator> + +#include <grpcpp/support/config.h> + +#include <util/stream/output.h> + +namespace grpc { + +/// This class is a non owning reference to a string. +/// +/// It should be a strict subset of the upcoming std::string_ref. +/// +/// \see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3442.html +/// +/// The constexpr is dropped or replaced with const for legacy compiler +/// compatibility. +class string_ref { + public: + /// types + typedef const char* const_iterator; + typedef std::reverse_iterator<const_iterator> const_reverse_iterator; + + /// constants + const static size_t npos; + + /// construct/copy. + string_ref() : data_(nullptr), length_(0) {} + string_ref(const string_ref& other) + : data_(other.data_), length_(other.length_) {} + // NOLINTNEXTLINE(bugprone-unhandled-self-assignment) + string_ref& operator=(const string_ref& rhs) { + data_ = rhs.data_; + length_ = rhs.length_; + return *this; + } + + // NOLINTNEXTLINE(google-explicit-constructor) + string_ref(const char* s) : data_(s), length_(strlen(s)) {} + string_ref(const char* s, size_t l) : data_(s), length_(l) {} + // NOLINTNEXTLINE(google-explicit-constructor) + string_ref(const TString& s) : data_(s.data()), length_(s.length()) {} + + /// iterators + const_iterator begin() const { return data_; } + const_iterator end() const { return data_ + length_; } + const_iterator cbegin() const { return data_; } + const_iterator cend() const { return data_ + length_; } + const_reverse_iterator rbegin() const { + return const_reverse_iterator(end()); + } + const_reverse_iterator rend() const { + return const_reverse_iterator(begin()); + } + const_reverse_iterator crbegin() const { + return const_reverse_iterator(end()); + } + const_reverse_iterator crend() const { + return const_reverse_iterator(begin()); + } + + /// capacity + size_t size() const { return length_; } + size_t length() const { return length_; } + size_t max_size() const { return length_; } + bool empty() const { return length_ == 0; } + + /// element access + const char* data() const { return data_; } + + /// string operations + int compare(string_ref x) const { + size_t min_size = length_ < x.length_ ? length_ : x.length_; + int r = memcmp(data_, x.data_, min_size); + if (r < 0) return -1; + if (r > 0) return 1; + if (length_ < x.length_) return -1; + if (length_ > x.length_) return 1; + return 0; + } + + bool starts_with(string_ref x) const { + return length_ >= x.length_ && (memcmp(data_, x.data_, x.length_) == 0); + } + + bool ends_with(string_ref x) const { + return length_ >= x.length_ && + (memcmp(data_ + (length_ - x.length_), x.data_, x.length_) == 0); + } + + size_t find(string_ref s) const { + auto it = std::search(cbegin(), cend(), s.cbegin(), s.cend()); + return it == cend() ? npos : std::distance(cbegin(), it); + } + + size_t find(char c) const { + auto it = std::find(cbegin(), cend(), c); + return it == cend() ? npos : std::distance(cbegin(), it); + } + + string_ref substr(size_t pos, size_t n = npos) const { + if (pos > length_) pos = length_; + if (n > (length_ - pos)) n = length_ - pos; + return string_ref(data_ + pos, n); + } + + private: + const char* data_; + size_t length_; +}; + +/// Comparison operators +inline bool operator==(string_ref x, string_ref y) { return x.compare(y) == 0; } +inline bool operator!=(string_ref x, string_ref y) { return x.compare(y) != 0; } +inline bool operator<(string_ref x, string_ref y) { return x.compare(y) < 0; } +inline bool operator<=(string_ref x, string_ref y) { return x.compare(y) <= 0; } +inline bool operator>(string_ref x, string_ref y) { return x.compare(y) > 0; } +inline bool operator>=(string_ref x, string_ref y) { return x.compare(y) >= 0; } + +inline IOutputStream& operator<<(IOutputStream& out, const string_ref& string) { + TString t(string.begin(), string.end()); + return out << t; +} + +} // namespace grpc #endif // GRPCPP_SUPPORT_STRING_REF_H diff --git a/contrib/libs/grpc/include/grpcpp/support/stub_options.h b/contrib/libs/grpc/include/grpcpp/support/stub_options.h new file mode 100644 index 0000000000..4b62f6a9d8 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/stub_options.h @@ -0,0 +1,42 @@ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_SUPPORT_STUB_OPTIONS_H +#define GRPCPP_SUPPORT_STUB_OPTIONS_H + +namespace grpc { + +/// Useful interface for generated stubs +class StubOptions { + public: + StubOptions() = default; + explicit StubOptions(const char* suffix_for_stats) + : suffix_for_stats_(suffix_for_stats) {} + + void set_suffix_for_stats(const char* suffix_for_stats) { + suffix_for_stats_ = suffix_for_stats; + } + const char* suffix_for_stats() const { return suffix_for_stats_; } + + private: + const char* suffix_for_stats_ = nullptr; +}; + +} // namespace grpc + +#endif // GRPCPP_SUPPORT_STUB_OPTIONS_H diff --git a/contrib/libs/grpc/include/grpcpp/support/sync_stream.h b/contrib/libs/grpc/include/grpcpp/support/sync_stream.h index 78a348de75..ce583f6ed7 100644 --- a/contrib/libs/grpc/include/grpcpp/support/sync_stream.h +++ b/contrib/libs/grpc/include/grpcpp/support/sync_stream.h @@ -1,24 +1,949 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H #define GRPCPP_SUPPORT_SYNC_STREAM_H -#include <grpcpp/impl/codegen/sync_stream.h> // IWYU pragma: export +#include <grpc/support/log.h> +#include <grpcpp/client_context.h> +#include <grpcpp/completion_queue.h> +#include <grpcpp/impl/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/service_type.h> +#include <grpcpp/server_context.h> +#include <grpcpp/support/status.h> + +namespace grpc { + +namespace internal { +/// Common interface for all synchronous client side streaming. +class ClientStreamingInterface { + public: + virtual ~ClientStreamingInterface() {} + + /// Block waiting until the stream finishes and a final status of the call is + /// available. + /// + /// It is appropriate to call this method exactly once when both: + /// * the calling code (client-side) has no more message to send + /// (this can be declared implicitly by calling this method, or + /// explicitly through an earlier call to <i>WritesDone</i> method of the + /// class in use, e.g. \a ClientWriterInterface::WritesDone or + /// \a ClientReaderWriterInterface::WritesDone). + /// * there are no more messages to be received from the server (which can + /// be known implicitly, or explicitly from an earlier call to \a + /// ReaderInterface::Read that returned "false"). + /// + /// This function will return either: + /// - when all incoming messages have been read and the server has + /// returned status. + /// - when the server has returned a non-OK status. + /// - OR when the call failed for some reason and the library generated a + /// status. + /// + /// Return values: + /// - \a Status contains the status code, message and details for the call + /// - the \a ClientContext associated with this call is updated with + /// possible trailing metadata sent from the server. + virtual grpc::Status Finish() = 0; +}; + +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; + +/// An interface that yields a sequence of messages of type \a R. +template <class R> +class ReaderInterface { + public: + virtual ~ReaderInterface() {} + + /// Get an upper bound on the next message size available for reading on this + /// stream. + virtual bool NextMessageSize(uint32_t* sz) = 0; + + /// Block to read a message and parse to \a msg. Returns \a true on success. + /// This is thread-safe with respect to \a Write or \WritesDone methods on + /// the same stream. It should not be called concurrently with another \a + /// Read on the same stream as the order of delivery will not be defined. + /// + /// \param[out] msg The read message. + /// + /// \return \a false when there will be no more incoming messages, either + /// because the other side has called \a WritesDone() or the stream has failed + /// (or been cancelled). + virtual bool Read(R* msg) = 0; +}; + +/// An interface that can be fed a sequence of messages of type \a W. +template <class W> +class WriterInterface { + public: + virtual ~WriterInterface() {} + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + virtual bool Write(const W& msg, grpc::WriteOptions options) = 0; + + /// Block to write \a msg to the stream with default write options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// + /// \return \a true on success, \a false when the stream has been closed. + inline bool Write(const W& msg) { return Write(msg, grpc::WriteOptions()); } + + /// Write \a msg and coalesce it with the writing of trailing metadata, using + /// WriteOptions \a options. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. \a msg and trailing metadata are coalesced and sent on wire + /// by calling this function. For server, WriteLast buffers the \a msg. + /// The writing of \a msg is held until the service handler returns, + /// where \a msg and trailing metadata are coalesced and sent on wire. + /// Note that WriteLast can only buffer \a msg up to the flow control window + /// size. If \a msg size is larger than the window size, it will be sent on + /// wire without buffering. + /// + /// \param[in] msg The message to be written to the stream. + /// \param[in] options The WriteOptions to be used to write this message. + void WriteLast(const W& msg, grpc::WriteOptions options) { + Write(msg, options.set_last_message()); + } +}; + +} // namespace internal + +/// Client-side interface for streaming reads of message of type \a R. +template <class R> +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +namespace internal { +template <class R> +class ClientReaderFactory { + public: + template <class W> + static ClientReader<R>* Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + const W& request) { + return new ClientReader<R>(channel, method, context, request); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for doing server-streaming RPCs, +/// where the stream of messages coming from the server has messages +/// of type \a R. +template <class R> +class ClientReader final : public ClientReaderInterface<R> { + public: + /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for + /// semantics. + /// + // Side effect: + /// Once complete, the initial metadata read from + /// the server will be accessible through the \a ClientContext used to + /// construct this object. + void WaitForInitialMetadata() override { + GPR_ASSERT(!context_->initial_metadata_received_); + + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); /// status ignored + } + + bool NextMessageSize(uint32_t* sz) override { + int result = call_.max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + /// See the \a ReaderInterface.Read method for semantics. + /// Side effect: + /// This also receives initial metadata from the server, if not + /// already received (if initial metadata is received, it can be then + /// accessed through the \a ClientContext associated with this call). + bool Read(R* msg) override { + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage<R>> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + /// See the \a ClientStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// The \a ClientContext associated with this call is updated with + /// possible metadata received from the server. + grpc::Status Finish() override { + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpClientRecvStatus> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + grpc::Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + friend class internal::ClientReaderFactory<R>; + grpc::ClientContext* context_; + grpc::CompletionQueue cq_; + grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template <class W> + ClientReader(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_ASSERT(ops.SendMessagePtr(&request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } +}; + +/// Client-side interface for streaming writes of message type \a W. +template <class W> +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W> { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the client is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +namespace internal { +template <class W> +class ClientWriterFactory { + public: + template <class R> + static ClientWriter<W>* Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, R* response) { + return new ClientWriter<W>(channel, method, context, response); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for doing client-streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W. +template <class W> +class ClientWriter : public ClientWriterInterface<W> { + public: + /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for + /// semantics. + /// + // Side effect: + /// Once complete, the initial metadata read from the server will be + /// accessible through the \a ClientContext used to construct this object. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + /// See the WriterInterface.Write(const W& msg, WriteOptions options) method + /// for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, grpc::WriteOptions options) override { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessagePtr(&msg, options).ok()) { + return false; + } + + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() override { + grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// See the ClientStreamingInterface.Finish method for semantics. + /// Side effects: + /// - Also receives initial metadata if not already received. + /// - Attempts to fill in the \a response parameter passed + /// to the constructor of this instance with the response + /// message from the server. + grpc::Status Finish() override { + grpc::Status status; + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); + return status; + } + + private: + friend class internal::ClientWriterFactory<W>; + + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template <class R> + ClientWriter(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, R* response) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + + if (!context_->initial_metadata_corked_) { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } + + grpc::ClientContext* context_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpGenericRecvMessage, + grpc::internal::CallOpClientRecvStatus> + finish_ops_; + grpc::CompletionQueue cq_; + grpc::internal::Call call_; +}; + +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template <class W, class R> +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the client is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +namespace internal { +template <class W, class R> +class ClientReaderWriterFactory { + public: + static ClientReaderWriter<W, R>* Create( + grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, + grpc::ClientContext* context) { + return new ClientReaderWriter<W, R>(channel, method, context); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. +template <class W, class R> +class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { + public: + /// Block waiting to read initial metadata from the server. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// Once complete, the initial metadata read from the server will be + /// accessible through the \a ClientContext used to construct this object. + void WaitForInitialMetadata() override { + GPR_ASSERT(!context_->initial_metadata_received_); + + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool NextMessageSize(uint32_t* sz) override { + int result = call_.max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + /// See the \a ReaderInterface.Read method for semantics. + /// Side effect: + /// Also receives initial metadata if not already received (updates the \a + /// ClientContext associated with this call in that case). + bool Read(R* msg) override { + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage<R>> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + /// See the \a WriterInterface.Write method for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call to fill in values). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, grpc::WriteOptions options) override { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessagePtr(&msg, options).ok()) { + return false; + } + + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() override { + grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// See the ClientStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible trailing metadata sent from the server. + grpc::Status Finish() override { + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpClientRecvStatus> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + grpc::Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + friend class internal::ClientReaderWriterFactory<W, R>; + + grpc::ClientContext* context_; + grpc::CompletionQueue cq_; + grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } +}; + +/// Server-side interface for streaming reads of message of type \a R. +template <class R> +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface<R> {}; + +/// Synchronous (blocking) server-side API for doing client-streaming RPCs, +/// where the incoming message stream coming from the client has messages of +/// type \a R. +template <class R> +class ServerReader final : public ServerReaderInterface<R> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool NextMessageSize(uint32_t* sz) override { + int result = call_->max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + bool Read(R* msg) override { + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + bool ok = call_->cq()->Pluck(&ops) && ops.got_message; + if (!ok) { + ctx_->MaybeMarkCancelledOnRead(); + } + return ok; + } + + private: + grpc::internal::Call* const call_; + ServerContext* const ctx_; + + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ClientStreamingHandler; + + ServerReader(grpc::internal::Call* call, grpc::ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; + +/// Server-side interface for streaming writes of message of type \a W. +template <class W> +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W> {}; + +/// Synchronous (blocking) server-side API for doing for doing a +/// server-streaming RPCs, where the outgoing message stream coming from the +/// server has messages of type \a W. +template <class W> +class ServerWriter final : public ServerWriterInterface<W> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. + /// Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + /// See the \a WriterInterface.Write method for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call to fill in values). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, grpc::WriteOptions options) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); + } + + private: + grpc::internal::Call* const call_; + grpc::ServerContext* const ctx_; + + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ServerStreamingHandler; + + ServerWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; + +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> {}; + +/// Actual implementation of bi-directional streaming +namespace internal { +template <class W, class R> +class ServerReaderWriterBody final { + public: + ServerReaderWriterBody(grpc::internal::Call* call, grpc::ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool NextMessageSize(uint32_t* sz) { + int result = call_->max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + bool Read(R* msg) { + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + bool ok = call_->cq()->Pluck(&ops) && ops.got_message; + if (!ok) { + ctx_->MaybeMarkCancelledOnRead(); + } + return ok; + } + + bool Write(const W& msg, grpc::WriteOptions options) { + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); + } + + private: + grpc::internal::Call* const call_; + grpc::ServerContext* const ctx_; +}; + +} // namespace internal + +/// Synchronous (blocking) server-side API for a bidirectional +/// streaming call, where the incoming message stream coming from the client has +/// messages of type \a R, and the outgoing message streaming coming from +/// the server has messages of type \a W. +template <class W, class R> +class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + bool Read(R* msg) override { return body_.Read(msg); } + + /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) + /// method for semantics. + /// Side effect: + /// Also sends initial metadata if not already sent (using the \a + /// ServerContext associated with this call). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, grpc::WriteOptions options) override { + return body_.Write(msg, options); + } + + private: + internal::ServerReaderWriterBody<W, R> body_; + + friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, + false>; + ServerReaderWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) + : body_(call, ctx) {} +}; + +/// A class to represent a flow-controlled unary call. This is something +/// of a hybrid between conventional unary and streaming. This is invoked +/// through a unary call on the client side, but the server responds to it +/// as though it were a single-ping-pong streaming call. The server can use +/// the \a NextMessageSize method to determine an upper-bound on the size of +/// the message. A key difference relative to streaming: ServerUnaryStreamer +/// must have exactly 1 Read and exactly 1 Write, in that order, to function +/// correctly. Otherwise, the RPC is in error. +template <class RequestType, class ResponseType> +class ServerUnaryStreamer final + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + /// Block to send initial metadata to client. + /// Implicit input parameter: + /// - the \a ServerContext associated with this call will be used for + /// sending initial metadata. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + /// Get an upper bound on the request message size from the client. + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a ReaderInterface::Read on the same stream since reads on + /// the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + bool Read(RequestType* request) override { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + using internal::WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, + grpc::WriteOptions options) override { + if (write_done_ || !read_done_) { + return false; + } + write_done_ = true; + return body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; + bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>; + ServerUnaryStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} +}; + +/// A class to represent a flow-controlled server-side streaming call. +/// This is something of a hybrid between server-side and bidi streaming. +/// This is invoked through a server-side streaming call on the client side, +/// but the server responds to it as though it were a bidi streaming call that +/// must first have exactly 1 Read and then any number of Writes. +template <class RequestType, class ResponseType> +class ServerSplitStreamer final + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + /// Block to send initial metadata to client. + /// Implicit input parameter: + /// - the \a ServerContext associated with this call will be used for + /// sending initial metadata. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + /// Get an upper bound on the request message size from the client. + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a ReaderInterface::Read on the same stream since reads on + /// the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + bool Read(RequestType* request) override { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + using internal::WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, + grpc::WriteOptions options) override { + return read_done_ && body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>; + ServerSplitStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} +}; + +} // namespace grpc #endif // GRPCPP_SUPPORT_SYNC_STREAM_H diff --git a/contrib/libs/grpc/include/grpcpp/support/time.h b/contrib/libs/grpc/include/grpcpp/support/time.h index b5e07b68f2..5397eb6ba2 100644 --- a/contrib/libs/grpc/include/grpcpp/support/time.h +++ b/contrib/libs/grpc/include/grpcpp/support/time.h @@ -1,24 +1,89 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_TIME_H #define GRPCPP_SUPPORT_TIME_H -#include <grpcpp/impl/codegen/time.h> // IWYU pragma: export +#include <chrono> + +#include <grpc/impl/grpc_types.h> +#include <grpcpp/support/config.h> + +namespace grpc { + +/// If you are trying to use CompletionQueue::AsyncNext with a time class that +/// isn't either gpr_timespec or std::chrono::system_clock::time_point, you +/// will most likely be looking at this comment as your compiler will have +/// fired an error below. In order to fix this issue, you have two potential +/// solutions: + +/// 1. Use gpr_timespec or std::chrono::system_clock::time_point instead +/// 2. Specialize the TimePoint class with whichever time class that you +/// want to use here. See below for two examples of how to do this. +/// +template <typename T> +class TimePoint { + public: + // If you see the error with methods below, you may need either + // i) using the existing types having a conversion class such as + // gpr_timespec and std::chrono::system_clock::time_point or + // ii) writing a new TimePoint<YourType> to address your case. + TimePoint(const T& /*time*/) = delete; + gpr_timespec raw_time() = delete; +}; + +template <> +class TimePoint<gpr_timespec> { + public: + // NOLINTNEXTLINE(google-explicit-constructor) + TimePoint(const gpr_timespec& time) : time_(time) {} + gpr_timespec raw_time() { return time_; } + + private: + gpr_timespec time_; +}; + +} // namespace grpc + +namespace grpc { + +// from and to should be absolute time. +void Timepoint2Timespec(const std::chrono::system_clock::time_point& from, + gpr_timespec* to); +void TimepointHR2Timespec( + const std::chrono::high_resolution_clock::time_point& from, + gpr_timespec* to); + +std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t); + +template <> +class TimePoint<std::chrono::system_clock::time_point> { + public: + // NOLINTNEXTLINE(google-explicit-constructor) + TimePoint(const std::chrono::system_clock::time_point& time) { + Timepoint2Timespec(time, &time_); + } + gpr_timespec raw_time() const { return time_; } + + private: + gpr_timespec time_; +}; + +} // namespace grpc #endif // GRPCPP_SUPPORT_TIME_H diff --git a/contrib/libs/grpc/include/grpcpp/support/validate_service_config.h b/contrib/libs/grpc/include/grpcpp/support/validate_service_config.h index f1368623b5..12ecf1c9a4 100644 --- a/contrib/libs/grpc/include/grpcpp/support/validate_service_config.h +++ b/contrib/libs/grpc/include/grpcpp/support/validate_service_config.h @@ -1,20 +1,20 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_VALIDATE_SERVICE_CONFIG_H #define GRPCPP_SUPPORT_VALIDATE_SERVICE_CONFIG_H |