diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-03-15 21:33:41 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-03-15 21:33:41 +0300 |
commit | 3dd665b514943f69657b593eb51af90b99b1206b (patch) | |
tree | 0eb633e628bb1fe6c639574b1184d43def7c0a73 /contrib/libs/grpc/include/grpcpp/impl | |
parent | a68afc731202027f105bc5723ee11788017c29e2 (diff) | |
download | ydb-3dd665b514943f69657b593eb51af90b99b1206b.tar.gz |
intermediate changes
ref:953ca886ec160075b38c0f3614de029b423f0a9e
Diffstat (limited to 'contrib/libs/grpc/include/grpcpp/impl')
33 files changed, 763 insertions, 469 deletions
diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h index 3deeda8c7f..7cb7cc6f16 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h @@ -20,6 +20,8 @@ #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> +#include <grpcpp/impl/codegen/call_op_set_interface.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/server_context.h> @@ -28,6 +30,10 @@ namespace grpc { +// Forward declaration for use in Helper class +template <class R> +class ClientAsyncResponseReader; + /// An interface relevant for async client side unary RPCs (which send /// one request message to a server and receive one response message). template <class R> @@ -66,8 +72,8 @@ class ClientAsyncResponseReaderInterface { }; namespace internal { -template <class R> -class ClientAsyncResponseReaderFactory { + +class ClientAsyncResponseReaderHelper { public: /// Start a call and write the request out if \a start is set. /// \a tag will be notified on \a cq when the call has been started (i.e. @@ -75,17 +81,136 @@ class ClientAsyncResponseReaderFactory { /// If \a start is not set, the actual call must be initiated by StartCall /// Note that \a context will be used to fill in custom initial metadata /// used to send to the server when starting the call. + /// + /// Optionally pass in a base class for request and response types so that the + /// internal functions and structs can be templated based on that, allowing + /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors + /// can't have an explicit template parameter, the last argument is an + /// extraneous parameter just to provide the needed type information. + template <class R, class W, class BaseR = R, class BaseW = W> + static ClientAsyncResponseReader<R>* Create( + ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, + const W& request) /* __attribute__((noinline)) */ { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + ClientAsyncResponseReader<R>* result = + new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader<R>))) + ClientAsyncResponseReader<R>(call, context); + SetupRequest<BaseR, BaseW>( + call.call(), &result->single_buf_, &result->read_initial_metadata_, + &result->finish_, static_cast<const BaseW&>(request)); + + return result; + } + + // Various helper functions to reduce templating use + + template <class R, class W> + static void SetupRequest( + grpc_call* call, + ::grpc::internal::CallOpSendInitialMetadata** single_buf_ptr, + std::function<void(ClientContext*, internal::Call*, + internal::CallOpSendInitialMetadata*, void*)>* + read_initial_metadata, + std::function< + void(ClientContext*, internal::Call*, bool initial_metadata_read, + internal::CallOpSendInitialMetadata*, + internal::CallOpSetInterface**, void*, Status*, void*)>* finish, + const W& request) { + using SingleBufType = + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose, + ::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>, + ::grpc::internal::CallOpClientRecvStatus>; + SingleBufType* single_buf = + new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(SingleBufType))) SingleBufType; + *single_buf_ptr = single_buf; + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok()); + single_buf->ClientSendClose(); + + // The purpose of the following functions is to type-erase the actual + // templated type of the CallOpSet being used by hiding that type inside the + // function definition rather than specifying it as an argument of the + // function or a member of the class. The type-erased CallOpSet will get + // static_cast'ed back to the real type so that it can be used properly. + *read_initial_metadata = + [](ClientContext* context, internal::Call* call, + internal::CallOpSendInitialMetadata* single_buf_view, void* tag) { + auto* single_buf = static_cast<SingleBufType*>(single_buf_view); + single_buf->set_output_tag(tag); + single_buf->RecvInitialMetadata(context); + call->PerformOps(single_buf); + }; + + // Note that this function goes one step further than the previous one + // because it type-erases the message being written down to a void*. This + // will be static-cast'ed back to the class specified here by hiding that + // class information inside the function definition. Note that this feature + // expects the class being specified here for R to be a base-class of the + // "real" R without any multiple-inheritance (as applies in protbuf wrt + // MessageLite) + *finish = [](ClientContext* context, internal::Call* call, + bool initial_metadata_read, + internal::CallOpSendInitialMetadata* single_buf_view, + internal::CallOpSetInterface** finish_buf_ptr, void* msg, + Status* status, void* tag) { + if (initial_metadata_read) { + using FinishBufType = ::grpc::internal::CallOpSet< + ::grpc::internal::CallOpRecvMessage<R>, + ::grpc::internal::CallOpClientRecvStatus>; + FinishBufType* finish_buf = + new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call->call(), sizeof(FinishBufType))) FinishBufType; + *finish_buf_ptr = finish_buf; + finish_buf->set_output_tag(tag); + finish_buf->RecvMessage(static_cast<R*>(msg)); + finish_buf->AllowNoMessage(); + finish_buf->ClientRecvStatus(context, status); + call->PerformOps(finish_buf); + } else { + auto* single_buf = static_cast<SingleBufType*>(single_buf_view); + single_buf->set_output_tag(tag); + single_buf->RecvInitialMetadata(context); + single_buf->RecvMessage(static_cast<R*>(msg)); + single_buf->AllowNoMessage(); + single_buf->ClientRecvStatus(context, status); + call->PerformOps(single_buf); + } + }; + } + + static void StartCall( + ::grpc::ClientContext* context, + ::grpc::internal::CallOpSendInitialMetadata* single_buf) { + single_buf->SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + } +}; + +// TODO(vjpai): This templated factory is deprecated and will be replaced by +//. the non-templated helper as soon as possible. +template <class R> +class ClientAsyncResponseReaderFactory { + public: template <class W> static ClientAsyncResponseReader<R>* Create( ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq, const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, const W& request, bool start) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader<R>))) - ClientAsyncResponseReader<R>(call, context, request, start); + auto* result = ClientAsyncResponseReaderHelper::Create<R>( + channel, cq, method, context, request); + if (start) { + result->StartCall(); + } + return result; } }; + } // namespace internal /// Async API for client-side unary RPCs, where the message response @@ -107,9 +232,9 @@ class ClientAsyncResponseReader final static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } void StartCall() override { - GPR_CODEGEN_ASSERT(!started_); + GPR_CODEGEN_DEBUG_ASSERT(!started_); started_ = true; - StartCallInternal(); + internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); } /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for @@ -119,76 +244,48 @@ class ClientAsyncResponseReader final /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - single_buf.set_output_tag(tag); - single_buf.RecvInitialMetadata(context_); - call_.PerformOps(&single_buf); + GPR_CODEGEN_DEBUG_ASSERT(started_); + GPR_CODEGEN_DEBUG_ASSERT(!context_->initial_metadata_received_); + read_initial_metadata_(context_, &call_, single_buf_, tag); initial_metadata_read_ = true; } - /// See \a ClientAysncResponseReaderInterface::Finish for semantics. + /// See \a ClientAsyncResponseReaderInterface::Finish for semantics. /// /// Side effect: /// - the \a ClientContext associated with this call is updated with /// possible initial and trailing metadata sent from the server. void Finish(R* msg, ::grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - if (initial_metadata_read_) { - finish_buf.set_output_tag(tag); - finish_buf.RecvMessage(msg); - finish_buf.AllowNoMessage(); - finish_buf.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf); - } else { - single_buf.set_output_tag(tag); - single_buf.RecvInitialMetadata(context_); - single_buf.RecvMessage(msg); - single_buf.AllowNoMessage(); - single_buf.ClientRecvStatus(context_, status); - call_.PerformOps(&single_buf); - } + GPR_CODEGEN_DEBUG_ASSERT(started_); + finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, + static_cast<void*>(msg), status, tag); } private: - friend class internal::ClientAsyncResponseReaderFactory<R>; + friend class internal::ClientAsyncResponseReaderHelper; ::grpc::ClientContext* const context_; ::grpc::internal::Call call_; - bool started_; + bool started_ = false; bool initial_metadata_read_ = false; - template <class W> ClientAsyncResponseReader(::grpc::internal::Call call, - ::grpc::ClientContext* context, const W& request, - bool start) - : context_(context), call_(call), started_(start) { - // Bind the metadata at time of StartCallInternal but set up the rest here - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok()); - single_buf.ClientSendClose(); - if (start) StartCallInternal(); - } - - void StartCallInternal() { - single_buf.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - } + ::grpc::ClientContext* context) + : context_(context), call_(call) {} // disable operator new static void* operator new(std::size_t size); static void* operator new(std::size_t /*size*/, void* p) { return p; } - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose, - ::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>, - ::grpc::internal::CallOpClientRecvStatus> - single_buf; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>, - ::grpc::internal::CallOpClientRecvStatus> - finish_buf; + internal::CallOpSendInitialMetadata* single_buf_; + internal::CallOpSetInterface* finish_buf_ = nullptr; + std::function<void(ClientContext*, internal::Call*, + internal::CallOpSendInitialMetadata*, void*)> + read_initial_metadata_; + std::function<void(ClientContext*, internal::Call*, + bool initial_metadata_read, + internal::CallOpSendInitialMetadata*, + internal::CallOpSetInterface**, void*, Status*, void*)> + finish_; }; /// Async server-side API for handling unary calls, where the single diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h index 6e64ec9981..2c015f2266 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h @@ -40,8 +40,8 @@ template <class RequestType, class ResponseType> class CallbackUnaryHandler; template <class RequestType, class ResponseType> class CallbackServerStreamingHandler; -template <class ServiceType, class RequestType, class ResponseType> -class RpcMethodHandler; +template <class RequestType> +void* UnaryDeserializeHelper(grpc_byte_buffer*, ::grpc::Status*, RequestType*); template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <::grpc::StatusCode code> @@ -162,8 +162,9 @@ class ByteBuffer final { template <class R> friend class internal::CallOpRecvMessage; friend class internal::CallOpGenericRecvMessage; - template <class ServiceType, class RequestType, class ResponseType> - friend class internal::RpcMethodHandler; + template <class RequestType> + friend void* internal::UnaryDeserializeHelper(grpc_byte_buffer*, + ::grpc::Status*, RequestType*); template <class ServiceType, class RequestType, class ResponseType> friend class internal::ServerStreamingHandler; template <class RequestType, class ResponseType> @@ -194,10 +195,14 @@ class ByteBuffer final { class ByteBufferPointer { public: + /* NOLINTNEXTLINE(google-explicit-constructor) */ ByteBufferPointer(const ByteBuffer* b) : bbuf_(const_cast<ByteBuffer*>(b)) {} + /* NOLINTNEXTLINE(google-explicit-constructor) */ operator ByteBuffer*() { return bbuf_; } + /* NOLINTNEXTLINE(google-explicit-constructor) */ operator grpc_byte_buffer*() { return bbuf_->buffer_; } + /* NOLINTNEXTLINE(google-explicit-constructor) */ operator grpc_byte_buffer**() { return &bbuf_->buffer_; } private: diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h index 379333164a..6df16a1125 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h @@ -62,7 +62,7 @@ inline grpc_metadata* FillMetadataArray( return nullptr; } grpc_metadata* metadata_array = - (grpc_metadata*)(g_core_codegen_interface->gpr_malloc( + static_cast<grpc_metadata*>(g_core_codegen_interface->gpr_malloc( (*metadata_count) * sizeof(grpc_metadata))); size_t i = 0; for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { @@ -83,11 +83,6 @@ inline grpc_metadata* FillMetadataArray( class WriteOptions { public: WriteOptions() : flags_(0), last_message_(false) {} - WriteOptions(const WriteOptions& other) - : flags_(other.flags_), last_message_(other.last_message_) {} - - /// Default assignment operator - WriteOptions& operator=(const WriteOptions& other) = default; /// Clear all flags. inline void Clear() { flags_ = 0; } @@ -243,7 +238,7 @@ class CallOpSendInitialMetadata { grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->flags = flags_; - op->reserved = NULL; + op->reserved = nullptr; initial_metadata_ = FillMetadataArray(*metadata_map_, &initial_metadata_count_, ""); op->data.send_initial_metadata.count = initial_metadata_count_; @@ -327,13 +322,14 @@ class CallOpSendMessage { grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; op->flags = write_options_.flags(); - op->reserved = NULL; + op->reserved = nullptr; op->data.send_message.send_message = send_buf_.c_buffer(); // Flags are per-message: clear them after use. write_options_.Clear(); } void FinishOp(bool* status) { if (msg_ == nullptr && !send_buf_.Valid()) return; + send_buf_.Clear(); if (hijacked_ && failed_send_) { // Hijacking interceptor failed this Op *status = false; @@ -382,9 +378,33 @@ class CallOpSendMessage { template <class M> Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; + // Serialize immediately since we do not have access to the message pointer + bool own_buf; + // TODO(vjpai): Remove the void below when possible + // The void in the template parameter below should not be needed + // (since it should be implicit) but is needed due to an observed + // difference in behavior between clang and gcc for certain internal users + Status result = SerializationTraits<M, void>::Serialize( + message, send_buf_.bbuf_ptr(), &own_buf); + if (!own_buf) { + send_buf_.Duplicate(); + } + return result; +} + +template <class M> +Status CallOpSendMessage::SendMessage(const M& message) { + return SendMessage(message, WriteOptions()); +} + +template <class M> +Status CallOpSendMessage::SendMessagePtr(const M* message, + WriteOptions options) { + msg_ = message; + write_options_ = options; + // Store the serializer for later since we have access to the message serializer_ = [this](const void* message) { bool own_buf; - send_buf_.Clear(); // TODO(vjpai): Remove the void below when possible // The void in the template parameter below should not be needed // (since it should be implicit) but is needed due to an observed @@ -396,31 +416,12 @@ Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { } return result; }; - // Serialize immediately only if we do not have access to the message pointer - if (msg_ == nullptr) { - Status result = serializer_(&message); - serializer_ = nullptr; - return result; - } return Status(); } template <class M> -Status CallOpSendMessage::SendMessage(const M& message) { - return SendMessage(message, WriteOptions()); -} - -template <class M> -Status CallOpSendMessage::SendMessagePtr(const M* message, - WriteOptions options) { - msg_ = message; - return SendMessage(*message, options); -} - -template <class M> Status CallOpSendMessage::SendMessagePtr(const M* message) { - msg_ = message; - return SendMessage(*message, WriteOptions()); + return SendMessagePtr(message, WriteOptions()); } template <class R> @@ -439,7 +440,7 @@ class CallOpRecvMessage { grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_MESSAGE; op->flags = 0; - op->reserved = NULL; + op->reserved = nullptr; op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr(); } @@ -515,7 +516,7 @@ class DeserializeFunc { template <class R> class DeserializeFuncType final : public DeserializeFunc { public: - DeserializeFuncType(R* message) : message_(message) {} + explicit DeserializeFuncType(R* message) : message_(message) {} Status Deserialize(ByteBuffer* buf) override { return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_); } @@ -548,7 +549,7 @@ class CallOpGenericRecvMessage { grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_MESSAGE; op->flags = 0; - op->reserved = NULL; + op->reserved = nullptr; op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr(); } @@ -631,7 +632,7 @@ class CallOpClientSendClose { grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; op->flags = 0; - op->reserved = NULL; + op->reserved = nullptr; } void FinishOp(bool* /*status*/) { send_ = false; } @@ -683,7 +684,7 @@ class CallOpServerSendStatus { op->data.send_status_from_server.status_details = send_error_message_.empty() ? nullptr : &error_message_slice_; op->flags = 0; - op->reserved = NULL; + op->reserved = nullptr; } void FinishOp(bool* /*status*/) { @@ -737,7 +738,7 @@ class CallOpRecvInitialMetadata { op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr(); op->flags = 0; - op->reserved = NULL; + op->reserved = nullptr; } void FinishOp(bool* /*status*/) { @@ -791,7 +792,7 @@ class CallOpClientRecvStatus { op->data.recv_status_on_client.status_details = &error_message_; op->data.recv_status_on_client.error_string = &debug_error_string_; op->flags = 0; - op->reserved = NULL; + op->reserved = nullptr; } void FinishOp(bool* /*status*/) { @@ -809,7 +810,8 @@ class CallOpClientRecvStatus { metadata_map_->GetBinaryErrorDetails()); if (debug_error_string_ != nullptr) { client_context_->set_debug_error_string(debug_error_string_); - g_core_codegen_interface->gpr_free((void*)debug_error_string_); + g_core_codegen_interface->gpr_free( + const_cast<char*>(debug_error_string_)); } } // TODO(soheil): Find callers that set debug string even for status OK, @@ -880,6 +882,9 @@ class CallOpSet : public CallOpSetInterface, interceptor_methods_(InterceptorBatchMethodsImpl()) {} CallOpSet& operator=(const CallOpSet& other) { + if (&other == this) { + return *this; + } core_cq_tag_ = this; return_tag_ = this; call_ = other.call_; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h index 3c3bfd7e76..612b820066 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h @@ -28,6 +28,7 @@ #include <grpc/impl/codegen/grpc_types.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/status.h> @@ -193,6 +194,7 @@ class CallbackWithSuccessTag void force_run(bool ok) { Run(ok); } /// check if this tag is currently set + /* NOLINTNEXTLINE(google-explicit-constructor) */ operator bool() const { return call_ != nullptr; } private: diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h index ea0752d90e..7f03fb0345 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h @@ -44,8 +44,7 @@ template <class W> class ClientAsyncWriterFactory; template <class W, class R> class ClientAsyncReaderWriterFactory; -template <class R> -class ClientAsyncResponseReaderFactory; +class ClientAsyncResponseReaderHelper; template <class W, class R> class ClientCallbackReaderWriterFactory; template <class R> @@ -120,8 +119,7 @@ class ChannelInterface { friend class ::grpc::internal::ClientAsyncWriterFactory; template <class W, class R> friend class ::grpc::internal::ClientAsyncReaderWriterFactory; - template <class R> - friend class ::grpc::internal::ClientAsyncResponseReaderFactory; + friend class ::grpc::internal::ClientAsyncResponseReaderHelper; template <class W, class R> friend class ::grpc::internal::ClientCallbackReaderWriterFactory; template <class R> diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_callback.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_callback.h index 90c817ceaa..a73d525003 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_callback.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_callback.h @@ -27,6 +27,7 @@ #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/status.h> +#include <grpcpp/impl/codegen/sync.h> namespace grpc { class Channel; @@ -35,15 +36,25 @@ class ClientContext; namespace internal { class RpcMethod; -/// Perform a callback-based unary call +/// Perform a callback-based unary call. May optionally specify the base +/// class of the Request and Response so that the internal calls and structures +/// below this may be based on those base classes and thus achieve code reuse +/// across different RPCs (e.g., for protobuf, MessageLite would be a base +/// class). /// TODO(vjpai): Combine as much as possible with the blocking unary call code -template <class InputMessage, class OutputMessage> +template <class InputMessage, class OutputMessage, + class BaseInputMessage = InputMessage, + class BaseOutputMessage = OutputMessage> void CallbackUnaryCall(::grpc::ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, const InputMessage* request, OutputMessage* result, std::function<void(::grpc::Status)> on_completion) { - CallbackUnaryCallImpl<InputMessage, OutputMessage> x( + static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value, + "Invalid input message specification"); + static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value, + "Invalid output message specification"); + CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x( channel, method, context, request, result, on_completion); } @@ -100,6 +111,8 @@ class CallbackUnaryCallImpl { // Base class for public API classes. class ClientReactor { public: + virtual ~ClientReactor() = default; + /// Called by the library when all operations associated with this RPC have /// completed and all Holds have been removed. OnDone provides the RPC status /// outcome for both successful and failed RPCs. If it is never called on an @@ -201,16 +214,19 @@ class ClientCallbackUnary { // activated by calling StartCall, possibly after initiating StartRead, // StartWrite, or AddHold operations on the streaming object. Note that none of // the classes are pure; all reactions have a default empty reaction so that the -// user class only needs to override those classes that it cares about. +// user class only needs to override those reactions that it cares about. // The reactor must be passed to the stub invocation before any of the below -// operations can be called. +// operations can be called and its reactions will be invoked by the library in +// response to the completion of various operations. Reactions must not include +// blocking operations (such as blocking I/O, starting synchronous RPCs, or +// waiting on condition variables). Reactions may be invoked concurrently, +// except that OnDone is called after all others (assuming proper API usage). +// The reactor may not be deleted until OnDone is called. /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC. template <class Request, class Response> class ClientBidiReactor : public internal::ClientReactor { public: - virtual ~ClientBidiReactor() {} - /// Activate the RPC and initiate any reads or writes that have been Start'ed /// before this call. All streaming RPCs issued by the client MUST have /// StartCall invoked on them (even if they are canceled) as this call is the @@ -241,7 +257,7 @@ class ClientBidiReactor : public internal::ClientReactor { /// not deleted or modified until OnWriteDone is called. /// \param[in] options The WriteOptions to use for writing this message void StartWrite(const Request* req, ::grpc::WriteOptions options) { - stream_->Write(req, std::move(options)); + stream_->Write(req, options); } /// Initiate/post a write operation with specified options and an indication @@ -254,7 +270,7 @@ class ClientBidiReactor : public internal::ClientReactor { /// not deleted or modified until OnWriteDone is called. /// \param[in] options The WriteOptions to use for writing this message void StartWriteLast(const Request* req, ::grpc::WriteOptions options) { - StartWrite(req, std::move(options.set_last_message())); + StartWrite(req, options.set_last_message()); } /// Indicate that the RPC will have no more write operations. This can only be @@ -347,8 +363,6 @@ class ClientBidiReactor : public internal::ClientReactor { template <class Response> class ClientReadReactor : public internal::ClientReactor { public: - virtual ~ClientReadReactor() {} - void StartCall() { reader_->StartCall(); } void StartRead(Response* resp) { reader_->Read(resp); } @@ -374,17 +388,15 @@ class ClientReadReactor : public internal::ClientReactor { template <class Request> class ClientWriteReactor : public internal::ClientReactor { public: - virtual ~ClientWriteReactor() {} - void StartCall() { writer_->StartCall(); } void StartWrite(const Request* req) { StartWrite(req, ::grpc::WriteOptions()); } void StartWrite(const Request* req, ::grpc::WriteOptions options) { - writer_->Write(req, std::move(options)); + writer_->Write(req, options); } void StartWriteLast(const Request* req, ::grpc::WriteOptions options) { - StartWrite(req, std::move(options.set_last_message())); + StartWrite(req, options.set_last_message()); } void StartWritesDone() { writer_->WritesDone(); } @@ -420,8 +432,6 @@ class ClientWriteReactor : public internal::ClientReactor { /// initiation API among all the reactor flavors. class ClientUnaryReactor : public internal::ClientReactor { public: - virtual ~ClientUnaryReactor() {} - void StartCall() { call_->StartCall(); } void OnDone(const ::grpc::Status& /*s*/) override {} virtual void OnReadInitialMetadataDone(bool /*ok*/) {} @@ -463,7 +473,7 @@ class ClientCallbackReaderWriterImpl // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - void StartCall() override { + void StartCall() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any read backlog @@ -512,7 +522,8 @@ class ClientCallbackReaderWriterImpl call_.PerformOps(&read_ops_); } - void Write(const Request* msg, ::grpc::WriteOptions options) override { + void Write(const Request* msg, ::grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); @@ -535,14 +546,15 @@ class ClientCallbackReaderWriterImpl } call_.PerformOps(&write_ops_); } - void WritesDone() override { + void WritesDone() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { writes_done_ops_.ClientSendClose(); - writes_done_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWritesDoneDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &writes_done_ops_, /*can_inline=*/false); + writes_done_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); if (GPR_UNLIKELY(corked_write_needed_)) { @@ -579,29 +591,32 @@ class ClientCallbackReaderWriterImpl this->BindReactor(reactor); // Set up the unchanging parts of the start, read, and write tags and ops. - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &start_ops_, /*can_inline=*/false); + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); start_ops_.RecvInitialMetadata(context_); start_ops_.set_core_cq_tag(&start_tag_); - write_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &write_ops_, /*can_inline=*/false); + write_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &write_ops_, /*can_inline=*/false); write_ops_.set_core_cq_tag(&write_tag_); - read_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &read_ops_, /*can_inline=*/false); + read_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); // Also set up the Finish tag and op set. @@ -672,7 +687,7 @@ class ClientCallbackReaderWriterImpl bool writes_done_ops = false; bool read_ops = false; }; - StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; + StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_); // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish std::atomic<intptr_t> callbacks_outstanding_{3}; @@ -719,12 +734,13 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { // 2. Any backlog // 3. Recv trailing metadata - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &start_ops_, /*can_inline=*/false); + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); start_ops_.RecvInitialMetadata(context_); @@ -732,12 +748,13 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { call_.PerformOps(&start_ops_); // Also set up the read tag so it doesn't have to be set up each time - read_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &read_ops_, /*can_inline=*/false); + read_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); { @@ -828,7 +845,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { struct StartCallBacklog { bool read_ops = false; }; - StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; + StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_); // Minimum of 2 callbacks to pre-register for start and finish std::atomic<intptr_t> callbacks_outstanding_{2}; @@ -869,7 +886,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - void StartCall() override { + void StartCall() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any backlog @@ -901,7 +918,8 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { this->MaybeFinish(/*from_reaction=*/false); } - void Write(const Request* msg, ::grpc::WriteOptions options) override { + void Write(const Request* msg, ::grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { if (GPR_UNLIKELY(options.is_last_message())) { options.set_buffer_hint(); write_ops_.ClientSendClose(); @@ -926,14 +944,15 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { call_.PerformOps(&write_ops_); } - void WritesDone() override { + void WritesDone() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override { writes_done_ops_.ClientSendClose(); - writes_done_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWritesDoneDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &writes_done_ops_, /*can_inline=*/false); + writes_done_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); @@ -973,21 +992,23 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { this->BindReactor(reactor); // Set up the unchanging parts of the start and write tags and ops. - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &start_ops_, /*can_inline=*/false); + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); start_ops_.RecvInitialMetadata(context_); start_ops_.set_core_cq_tag(&start_tag_); - write_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &write_ops_, /*can_inline=*/false); + write_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &write_ops_, /*can_inline=*/false); write_ops_.set_core_cq_tag(&write_tag_); // Also set up the Finish tag and op set. @@ -1052,7 +1073,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { bool write_ops = false; bool writes_done_ops = false; }; - StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; + StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_); // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish std::atomic<intptr_t> callbacks_outstanding_{3}; @@ -1097,21 +1118,22 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { // 1. Send initial metadata + write + writes done + recv initial metadata // 2. Read message, recv trailing metadata - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); - }, - &start_ops_, /*can_inline=*/false); + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); start_ops_.RecvInitialMetadata(context_); start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, - /*can_inline=*/false); + finish_tag_.Set( + call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_, + /*can_inline=*/false); finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); @@ -1171,7 +1193,8 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { class ClientCallbackUnaryFactory { public: - template <class Request, class Response> + template <class Request, class Response, class BaseRequest = Request, + class BaseResponse = Response> static void Create(::grpc::ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, const Request* request, @@ -1183,7 +1206,9 @@ class ClientCallbackUnaryFactory { new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( call.call(), sizeof(ClientCallbackUnaryImpl))) - ClientCallbackUnaryImpl(call, context, request, response, reactor); + ClientCallbackUnaryImpl(call, context, + static_cast<const BaseRequest*>(request), + static_cast<BaseResponse*>(response), reactor); } }; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h index a4e58f34c5..82b93587a8 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h @@ -72,6 +72,7 @@ template <class Request> class ClientCallbackWriterImpl; class ClientCallbackUnaryImpl; class ClientContextAccessor; +class ClientAsyncResponseReaderHelper; } // namespace internal template <class R> @@ -310,7 +311,7 @@ class ClientContext { /// /// \see grpc::AuthContext. std::shared_ptr<const grpc::AuthContext> auth_context() const { - if (auth_context_.get() == nullptr) { + if (auth_context_ == nullptr) { auth_context_ = grpc::CreateAuthContext(call_); } return auth_context_; @@ -439,6 +440,7 @@ class ClientContext { friend class ::grpc::ClientAsyncReaderWriter; template <class R> friend class ::grpc::ClientAsyncResponseReader; + friend class ::grpc::internal::ClientAsyncResponseReaderHelper; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; template <class InputMessage, class OutputMessage> diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_unary_call.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_unary_call.h index 098bb50ee2..d41ea1ad2d 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_unary_call.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_unary_call.h @@ -20,6 +20,7 @@ #define GRPCPP_IMPL_CODEGEN_CLIENT_UNARY_CALL_H #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> @@ -30,12 +31,23 @@ namespace grpc { class ClientContext; namespace internal { class RpcMethod; -/// Wrapper that performs a blocking unary call -template <class InputMessage, class OutputMessage> + +/// Wrapper that performs a blocking unary call. May optionally specify the base +/// class of the Request and Response so that the internal calls and structures +/// below this may be based on those base classes and thus achieve code reuse +/// across different RPCs (e.g., for protobuf, MessageLite would be a base +/// class). +template <class InputMessage, class OutputMessage, + class BaseInputMessage = InputMessage, + class BaseOutputMessage = OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, grpc::ClientContext* context, const InputMessage& request, OutputMessage* result) { - return BlockingUnaryCallImpl<InputMessage, OutputMessage>( + static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value, + "Invalid input message specification"); + static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value, + "Invalid output message specification"); + return BlockingUnaryCallImpl<BaseInputMessage, BaseOutputMessage>( channel, method, context, request, result) .status(); } diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/completion_queue.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/completion_queue.h index ca0c77276a..d23e0e2bb3 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/completion_queue.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/completion_queue.h @@ -38,6 +38,7 @@ #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/grpc_library.h> +#include <grpcpp/impl/codegen/rpc_service_method.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/sync.h> #include <grpcpp/impl/codegen/time.h> @@ -59,7 +60,12 @@ namespace internal { template <class W, class R> class ServerReaderWriterBody; -template <class ServiceType, class RequestType, class ResponseType> +template <class ResponseType> +void UnaryRunHandlerHelper( + const ::grpc::internal::MethodHandler::HandlerParameter&, ResponseType*, + ::grpc::Status&); +template <class ServiceType, class RequestType, class ResponseType, + class BaseRequestType, class BaseResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> class ClientStreamingHandler; @@ -108,7 +114,7 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { explicit CompletionQueue(grpc_completion_queue* take); /// Destructor. Destroys the owned wrapped completion queue / instance. - ~CompletionQueue() { + ~CompletionQueue() override { ::grpc::g_core_codegen_interface->grpc_completion_queue_destroy(cq_); } @@ -123,8 +129,8 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { /// Read from the queue, blocking until an event is available or the queue is /// shutting down. /// - /// \param tag [out] Updated to point to the read event's tag. - /// \param ok [out] true if read a successful event, false otherwise. + /// \param[out] tag Updated to point to the read event's tag. + /// \param[out] ok true if read a successful event, false otherwise. /// /// Note that each tag sent to the completion queue (through RPC operations /// or alarms) will be delivered out of the completion queue by a call to @@ -179,10 +185,10 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { /// within the \a deadline). A \a tag points to an arbitrary location usually /// employed to uniquely identify an event. /// - /// \param tag [out] Upon success, updated to point to the event's tag. - /// \param ok [out] Upon success, true if a successful event, false otherwise + /// \param[out] tag Upon success, updated to point to the event's tag. + /// \param[out] ok Upon success, true if a successful event, false otherwise /// See documentation for CompletionQueue::Next for explanation of ok - /// \param deadline [in] How long to block in wait for an event. + /// \param[in] deadline How long to block in wait for an event. /// /// \return The type of event read. template <typename T> @@ -198,11 +204,11 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { /// within the \a deadline). A \a tag points to an arbitrary location usually /// employed to uniquely identify an event. /// - /// \param f [in] Function to execute before calling AsyncNext on this queue. - /// \param tag [out] Upon success, updated to point to the event's tag. - /// \param ok [out] Upon success, true if read a regular event, false + /// \param[in] f Function to execute before calling AsyncNext on this queue. + /// \param[out] tag Upon success, updated to point to the event's tag. + /// \param[out] ok Upon success, true if read a regular event, false /// otherwise. - /// \param deadline [in] How long to block in wait for an event. + /// \param[in] deadline How long to block in wait for an event. /// /// \return The type of event read. template <typename T, typename F> @@ -237,11 +243,11 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { protected: /// Private constructor of CompletionQueue only visible to friend classes - CompletionQueue(const grpc_completion_queue_attributes& attributes) { + explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) { cq_ = ::grpc::g_core_codegen_interface->grpc_completion_queue_create( ::grpc::g_core_codegen_interface->grpc_completion_queue_factory_lookup( &attributes), - &attributes, NULL); + &attributes, nullptr); InitialAvalanching(); // reserve this for the future shutdown } @@ -265,8 +271,10 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { friend class ::grpc::ServerWriter; template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; - template <class ServiceType, class RequestType, class ResponseType> - friend class ::grpc::internal::RpcMethodHandler; + template <class ResponseType> + friend void ::grpc::internal::UnaryRunHandlerHelper( + const ::grpc::internal::MethodHandler::HandlerParameter&, ResponseType*, + ::grpc::Status&); template <class ServiceType, class RequestType, class ResponseType> friend class ::grpc::internal::ClientStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> @@ -293,7 +301,7 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { /// initialized, it must be flushed on the same thread. class CompletionQueueTLSCache { public: - CompletionQueueTLSCache(CompletionQueue* cq); + explicit CompletionQueueTLSCache(CompletionQueue* cq); ~CompletionQueueTLSCache(); bool Flush(void** tag, bool* ok); @@ -401,6 +409,9 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { return true; } + static CompletionQueue* CallbackAlternativeCQ(); + static void ReleaseCallbackAlternativeCQ(CompletionQueue* cq); + grpc_completion_queue* cq_; // owned gpr_atm avalanches_in_flight_; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/config.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/config.h index 87f9914273..d177668551 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/config.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/config.h @@ -36,7 +36,7 @@ namespace grpc { // Using grpc::string and grpc::to_string is discouraged in favor of // TString and ToString. This is only for legacy code using // them explictly. -typedef TString string; // deprecated +typedef TString string; } // namespace grpc diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/config_protobuf.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/config_protobuf.h index c4012fb00c..49db4c5dc3 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/config_protobuf.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/config_protobuf.h @@ -65,6 +65,7 @@ #ifndef GRPC_CUSTOM_JSONUTIL #include <google/protobuf/util/json_util.h> +#include <google/protobuf/util/type_resolver_util.h> #define GRPC_CUSTOM_JSONUTIL ::google::protobuf::util #define GRPC_CUSTOM_UTIL_STATUS ::google::protobuf::util::Status #endif @@ -90,6 +91,7 @@ namespace util { typedef GRPC_CUSTOM_UTIL_STATUS Status; } // namespace util +// NOLINTNEXTLINE(misc-unused-alias-decls) namespace json = GRPC_CUSTOM_JSONUTIL; namespace io { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h index 50c8da4ffe..df2a03cd25 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h @@ -31,10 +31,9 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen final : public CoreCodegenInterface { private: - virtual const grpc_completion_queue_factory* - grpc_completion_queue_factory_lookup( + const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( const grpc_completion_queue_attributes* attributes) override; - virtual grpc_completion_queue* grpc_completion_queue_create( + grpc_completion_queue* grpc_completion_queue_create( const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved) override; @@ -115,8 +114,8 @@ class CoreCodegen final : public CoreCodegenInterface { gpr_timespec gpr_inf_future(gpr_clock_type type) override; gpr_timespec gpr_time_0(gpr_clock_type type) override; - virtual const Status& ok() override; - virtual const Status& cancelled() override; + const Status& ok() override; + const Status& cancelled() override; void assert_fail(const char* failed_assertion, const char* file, int line) override; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/delegating_channel.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/delegating_channel.h index 1a3bbd3349..0479567b86 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/delegating_channel.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/delegating_channel.h @@ -19,14 +19,19 @@ #ifndef GRPCPP_IMPL_CODEGEN_DELEGATING_CHANNEL_H #define GRPCPP_IMPL_CODEGEN_DELEGATING_CHANNEL_H +#include <memory> + +#include <grpcpp/impl/codegen/channel_interface.h> + namespace grpc { namespace experimental { class DelegatingChannel : public ::grpc::ChannelInterface { public: - virtual ~DelegatingChannel() {} + ~DelegatingChannel() override {} - DelegatingChannel(std::shared_ptr<::grpc::ChannelInterface> delegate_channel) + explicit DelegatingChannel( + std::shared_ptr<::grpc::ChannelInterface> delegate_channel) : delegate_channel_(delegate_channel) {} grpc_connectivity_state GetState(bool try_to_connect) override { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/grpc_library.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/grpc_library.h index 17c904d71a..660d6d0abd 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/grpc_library.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/grpc_library.h @@ -37,7 +37,8 @@ extern GrpcLibraryInterface* g_glip; /// Classes that require gRPC to be initialized should inherit from this class. class GrpcLibraryCodegen { public: - GrpcLibraryCodegen(bool call_grpc_init = true) : grpc_init_called_(false) { + explicit GrpcLibraryCodegen(bool call_grpc_init = true) + : grpc_init_called_(false) { if (call_grpc_init) { GPR_CODEGEN_ASSERT(g_glip && "gRPC library not initialized. See " diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/intercepted_channel.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/intercepted_channel.h index c729970ca8..e3a4c8e768 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/intercepted_channel.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/intercepted_channel.h @@ -34,7 +34,7 @@ class InterceptorBatchMethodsImpl; /// see the RPC. class InterceptedChannel : public ChannelInterface { public: - virtual ~InterceptedChannel() { channel_ = nullptr; } + ~InterceptedChannel() override { channel_ = nullptr; } /// Get the current channel state. If the channel is in IDLE and /// \a try_to_connect is set to true, try to connect. diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h index 714351f543..72d8009ded 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h @@ -49,7 +49,7 @@ class InterceptorBatchMethodsImpl } } - ~InterceptorBatchMethodsImpl() {} + ~InterceptorBatchMethodsImpl() override {} bool QueryInterceptionHookPoint( experimental::InterceptionHookPoints type) override { @@ -227,19 +227,11 @@ class InterceptorBatchMethodsImpl bool InterceptorsListEmpty() { auto* client_rpc_info = call_->client_rpc_info(); if (client_rpc_info != nullptr) { - if (client_rpc_info->interceptors_.size() == 0) { - return true; - } else { - return false; - } + return client_rpc_info->interceptors_.empty(); } auto* server_rpc_info = call_->server_rpc_info(); - if (server_rpc_info == nullptr || - server_rpc_info->interceptors_.size() == 0) { - return true; - } - return false; + return server_rpc_info == nullptr || server_rpc_info->interceptors_.empty(); } // This should be used only by subclasses of CallOpSetInterface. SetCall and @@ -251,7 +243,7 @@ class InterceptorBatchMethodsImpl GPR_CODEGEN_ASSERT(ops_); auto* client_rpc_info = call_->client_rpc_info(); if (client_rpc_info != nullptr) { - if (client_rpc_info->interceptors_.size() == 0) { + if (client_rpc_info->interceptors_.empty()) { return true; } else { RunClientInterceptors(); @@ -260,8 +252,7 @@ class InterceptorBatchMethodsImpl } auto* server_rpc_info = call_->server_rpc_info(); - if (server_rpc_info == nullptr || - server_rpc_info->interceptors_.size() == 0) { + if (server_rpc_info == nullptr || server_rpc_info->interceptors_.empty()) { return true; } RunServerInterceptors(); @@ -277,8 +268,7 @@ class InterceptorBatchMethodsImpl GPR_CODEGEN_ASSERT(reverse_ == true); GPR_CODEGEN_ASSERT(call_->client_rpc_info() == nullptr); auto* server_rpc_info = call_->server_rpc_info(); - if (server_rpc_info == nullptr || - server_rpc_info->interceptors_.size() == 0) { + if (server_rpc_info == nullptr || server_rpc_info->interceptors_.empty()) { return true; } callback_ = std::move(f); @@ -426,11 +416,7 @@ class CancelInterceptorBatchMethods public: bool QueryInterceptionHookPoint( experimental::InterceptionHookPoints type) override { - if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) { - return true; - } else { - return false; - } + return type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL; } void Proceed() override { @@ -493,7 +479,6 @@ class CancelInterceptorBatchMethods GPR_CODEGEN_ASSERT(false && "It is illegal to call ModifySendStatus on a method " "which has a Cancel notification"); - return; } std::multimap<TString, TString>* GetSendTrailingMetadata() override { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h index 0033936b04..756f4aae75 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h @@ -49,8 +49,52 @@ template <class Callable> #endif // GRPC_ALLOW_EXCEPTIONS } +/// A helper function with reduced templating to do the common work needed to +/// actually send the server response. Uses non-const parameter for Status since +/// this should only ever be called from the end of the RunHandler method. + +template <class ResponseType> +void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param, + ResponseType* rsp, ::grpc::Status& status) { + GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_); + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> + ops; + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + if (status.ok()) { + status = ops.SendMessagePtr(rsp); + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); +} + +/// A helper function with reduced templating to do deserializing. + +template <class RequestType> +void* UnaryDeserializeHelper(grpc_byte_buffer* req, ::grpc::Status* status, + RequestType* request) { + ::grpc::ByteBuffer buf; + buf.set_buffer(req); + *status = ::grpc::SerializationTraits<RequestType>::Deserialize( + &buf, static_cast<RequestType*>(request)); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; +} + /// A wrapper class of an application provided rpc method handler. -template <class ServiceType, class RequestType, class ResponseType> +template <class ServiceType, class RequestType, class ResponseType, + class BaseRequestType = RequestType, + class BaseResponseType = ResponseType> class RpcMethodHandler : public ::grpc::internal::MethodHandler { public: RpcMethodHandler( @@ -71,40 +115,16 @@ class RpcMethodHandler : public ::grpc::internal::MethodHandler { }); static_cast<RequestType*>(param.request)->~RequestType(); } - - GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_); - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpServerSendStatus> - ops; - ops.SendInitialMetadata(¶m.server_context->initial_metadata_, - param.server_context->initial_metadata_flags()); - if (param.server_context->compression_level_set()) { - ops.set_compression_level(param.server_context->compression_level()); - } - if (status.ok()) { - status = ops.SendMessagePtr(&rsp); - } - ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); + UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status); } void* Deserialize(grpc_call* call, grpc_byte_buffer* req, ::grpc::Status* status, void** /*handler_data*/) final { - ::grpc::ByteBuffer buf; - buf.set_buffer(req); auto* request = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(RequestType))) RequestType(); - *status = - ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); - buf.Release(); - if (status->ok()) { - return request; - } - request->~RequestType(); - return nullptr; + call, sizeof(RequestType))) RequestType; + return UnaryDeserializeHelper(req, status, + static_cast<BaseRequestType*>(request)); } private: @@ -241,7 +261,7 @@ class ServerStreamingHandler : public ::grpc::internal::MethodHandler { template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler { public: - TemplatedBidiStreamingHandler( + explicit TemplatedBidiStreamingHandler( std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func) : func_(func), write_needed_(WriteNeeded) {} diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler_impl.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler_impl.h new file mode 100644 index 0000000000..cc88a135ca --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler_impl.h @@ -0,0 +1,22 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H +#define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H + +#endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h index 487471290d..d25b79a402 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h @@ -59,7 +59,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { } } - ~ProtoBufferReader() { + ~ProtoBufferReader() override { if (status_.ok()) { g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_); } @@ -76,7 +76,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) - backup_count_; GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX); - *size = (int)backup_count_; + *size = static_cast<int>(backup_count_); backup_count_ = 0; return true; } @@ -88,7 +88,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { *data = GRPC_SLICE_START_PTR(*slice_); // On win x64, int is only 32bit GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX); - byte_count_ += * size = (int)GRPC_SLICE_LENGTH(*slice_); + byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(*slice_)); return true; } diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h index 0af4616e50..cd9d70c5a5 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -65,12 +65,12 @@ class ProtoBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { GPR_CODEGEN_ASSERT(!byte_buffer->Valid()); /// Create an empty raw byte buffer and look at its underlying slice buffer grpc_byte_buffer* bp = - g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); + g_core_codegen_interface->grpc_raw_byte_buffer_create(nullptr, 0); byte_buffer->set_buffer(bp); slice_buffer_ = &bp->data.raw.slice_buffer; } - ~ProtoBufferWriter() { + ~ProtoBufferWriter() override { if (have_backup_) { g_core_codegen_interface->grpc_slice_unref(backup_slice_); } @@ -107,7 +107,7 @@ class ProtoBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { *data = GRPC_SLICE_START_PTR(slice_); // On win x64, int is only 32bit GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); + byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_)); g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); return true; } @@ -122,7 +122,7 @@ class ProtoBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { /// 4. Mark that we still have the remaining part (for later use/unref) GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_))); g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); - if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { + if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) { backup_slice_ = slice_; } else { backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( @@ -133,7 +133,7 @@ class ProtoBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { // on a following Next() call, a reference will be returned to this slice // via GRPC_SLICE_START_PTR, which will not be an address held by // slice_buffer_. - have_backup_ = backup_slice_.refcount != NULL; + have_backup_ = backup_slice_.refcount != nullptr; byte_count_ -= count; } diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h index 2e102135a3..7f5c6e9a99 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h @@ -49,8 +49,8 @@ Status GenericSerialize(const grpc::protobuf::MessageLite& msg, ByteBuffer* bb, "ProtoBufferWriter must be a subclass of " "::protobuf::io::ZeroCopyOutputStream"); *own_buffer = true; - int byte_size = msg.ByteSizeLong(); - if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) { + int byte_size = static_cast<int>(msg.ByteSizeLong()); + if (static_cast<size_t>(byte_size) <= GRPC_SLICE_INLINED_SIZE) { Slice slice(byte_size); // We serialize directly into the allocated slices memory GPR_CODEGEN_ASSERT(slice.end() == msg.SerializeWithCachedSizesToArray( diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_method.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_method.h index 9dcde954f1..394a29b837 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_method.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_method.h @@ -36,7 +36,7 @@ class RpcMethod { }; RpcMethod(const char* name, RpcType type) - : name_(name), method_type_(type), channel_tag_(NULL) {} + : name_(name), method_type_(type), channel_tag_(nullptr) {} RpcMethod(const char* name, RpcType type, const std::shared_ptr<ChannelInterface>& channel) diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h index 220b78f2eb..e2b0435652 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h @@ -42,7 +42,7 @@ class AuthPropertyIterator AuthPropertyIterator operator++(int); bool operator==(const AuthPropertyIterator& rhs) const; bool operator!=(const AuthPropertyIterator& rhs) const; - const AuthProperty operator*(); + AuthProperty operator*(); protected: AuthPropertyIterator(); @@ -86,7 +86,7 @@ class AuthContext { /// Mutation functions: should only be used by an AuthMetadataProcessor. virtual void AddProperty(const TString& key, const string_ref& value) = 0; - virtual bool SetPeerIdentityPropertyName(const string& name) = 0; + virtual bool SetPeerIdentityPropertyName(const TString& name) = 0; }; } // namespace grpc diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback.h index 3794a9ffa7..701ef561a3 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback.h @@ -29,6 +29,7 @@ #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/message_allocator.h> #include <grpcpp/impl/codegen/status.h> +#include <grpcpp/impl/codegen/sync.h> namespace grpc { @@ -190,7 +191,7 @@ class ServerBidiReactor; // the API. class ServerCallbackUnary : public internal::ServerCallbackCall { public: - virtual ~ServerCallbackUnary() {} + ~ServerCallbackUnary() override {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; @@ -206,7 +207,7 @@ class ServerCallbackUnary : public internal::ServerCallbackCall { template <class Request> class ServerCallbackReader : public internal::ServerCallbackCall { public: - virtual ~ServerCallbackReader() {} + ~ServerCallbackReader() override {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; @@ -220,7 +221,7 @@ class ServerCallbackReader : public internal::ServerCallbackCall { template <class Response> class ServerCallbackWriter : public internal::ServerCallbackCall { public: - virtual ~ServerCallbackWriter() {} + ~ServerCallbackWriter() override {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; @@ -237,7 +238,7 @@ class ServerCallbackWriter : public internal::ServerCallbackCall { template <class Request, class Response> class ServerCallbackReaderWriter : public internal::ServerCallbackCall { public: - virtual ~ServerCallbackReaderWriter() {} + ~ServerCallbackReaderWriter() override {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; @@ -256,7 +257,12 @@ class ServerCallbackReaderWriter : public internal::ServerCallbackCall { // by the user, returned as the output parameter of the method handler for a // callback method. Note that none of the classes are pure; all reactions have a // default empty reaction so that the user class only needs to override those -// classes that it cares about. +// reactions that it cares about. The reaction methods will be invoked by the +// library in response to the completion of various operations. Reactions must +// not include blocking operations (such as blocking I/O, starting synchronous +// RPCs, or waiting on condition variables). Reactions may be invoked +// concurrently, except that OnDone is called after all others (assuming proper +// API usage). The reactor may not be deleted until OnDone is called. /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. template <class Request, class Response> @@ -268,12 +274,12 @@ class ServerBidiReactor : public internal::ServerReactor { // TODO(vjpai): Switch to default constructor and default initializer when // gcc-4.x is no longer supported ServerBidiReactor() : stream_(nullptr) {} - ~ServerBidiReactor() = default; + ~ServerBidiReactor() override = default; /// Send any initial metadata stored in the RPC context. If not invoked, /// any initial metadata will be passed along with the first Write or the /// Finish (if there are no writes). - void StartSendInitialMetadata() { + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { ServerCallbackReaderWriter<Request, Response>* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { @@ -291,7 +297,7 @@ class ServerBidiReactor : public internal::ServerReactor { /// /// \param[out] req Where to eventually store the read message. Valid when /// the library calls OnReadDone - void StartRead(Request* req) { + void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { ServerCallbackReaderWriter<Request, Response>* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { @@ -320,7 +326,8 @@ class ServerBidiReactor : public internal::ServerReactor { /// ownership but the caller must ensure that the message is /// not deleted or modified until OnWriteDone is called. /// \param[in] options The WriteOptions to use for writing this message - void StartWrite(const Response* resp, ::grpc::WriteOptions options) { + void StartWrite(const Response* resp, ::grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { ServerCallbackReaderWriter<Request, Response>* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { @@ -328,11 +335,11 @@ class ServerBidiReactor : public internal::ServerReactor { stream = stream_.load(std::memory_order_relaxed); if (stream == nullptr) { backlog_.write_wanted = resp; - backlog_.write_options_wanted = std::move(options); + backlog_.write_options_wanted = options; return; } } - stream->Write(resp, std::move(options)); + stream->Write(resp, options); } /// Initiate a write operation with specified options and final RPC Status, @@ -349,7 +356,7 @@ class ServerBidiReactor : public internal::ServerReactor { /// \param[in] options The WriteOptions to use for writing this message /// \param[in] s The status outcome of this RPC void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, - ::grpc::Status s) { + ::grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { ServerCallbackReaderWriter<Request, Response>* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { @@ -358,12 +365,12 @@ class ServerBidiReactor : public internal::ServerReactor { if (stream == nullptr) { backlog_.write_and_finish_wanted = true; backlog_.write_wanted = resp; - backlog_.write_options_wanted = std::move(options); + backlog_.write_options_wanted = options; backlog_.status_wanted = std::move(s); return; } } - stream->WriteAndFinish(resp, std::move(options), std::move(s)); + stream->WriteAndFinish(resp, options, std::move(s)); } /// Inform system of a planned write operation with specified options, but @@ -375,7 +382,7 @@ class ServerBidiReactor : public internal::ServerReactor { /// not deleted or modified until OnWriteDone is called. /// \param[in] options The WriteOptions to use for writing this message void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { - StartWrite(resp, std::move(options.set_last_message())); + StartWrite(resp, options.set_last_message()); } /// Indicate that the stream is to be finished and the trailing metadata and @@ -384,7 +391,7 @@ class ServerBidiReactor : public internal::ServerReactor { /// cancelled. /// /// \param[in] s The status outcome of this RPC - void Finish(::grpc::Status s) { + void Finish(::grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) { ServerCallbackReaderWriter<Request, Response>* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { @@ -476,7 +483,7 @@ class ServerBidiReactor : public internal::ServerReactor { ::grpc::WriteOptions write_options_wanted; ::grpc::Status status_wanted; }; - PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(stream_mu_); }; /// \a ServerReadReactor is the interface for a client-streaming RPC. @@ -484,10 +491,10 @@ template <class Request> class ServerReadReactor : public internal::ServerReactor { public: ServerReadReactor() : reader_(nullptr) {} - ~ServerReadReactor() = default; + ~ServerReadReactor() override = default; /// The following operation initiations are exactly like ServerBidiReactor. - void StartSendInitialMetadata() { + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { ServerCallbackReader<Request>* reader = reader_.load(std::memory_order_acquire); if (reader == nullptr) { @@ -500,7 +507,7 @@ class ServerReadReactor : public internal::ServerReactor { } reader->SendInitialMetadata(); } - void StartRead(Request* req) { + void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { ServerCallbackReader<Request>* reader = reader_.load(std::memory_order_acquire); if (reader == nullptr) { @@ -513,7 +520,7 @@ class ServerReadReactor : public internal::ServerReactor { } reader->Read(req); } - void Finish(::grpc::Status s) { + void Finish(::grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { ServerCallbackReader<Request>* reader = reader_.load(std::memory_order_acquire); if (reader == nullptr) { @@ -539,7 +546,8 @@ class ServerReadReactor : public internal::ServerReactor { // May be overridden by internal implementation details. This is not a public // customization point. - virtual void InternalBindReader(ServerCallbackReader<Request>* reader) { + virtual void InternalBindReader(ServerCallbackReader<Request>* reader) + Y_ABSL_LOCKS_EXCLUDED(reader_mu_) { grpc::internal::MutexLock l(&reader_mu_); if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { @@ -563,7 +571,7 @@ class ServerReadReactor : public internal::ServerReactor { Request* read_wanted = nullptr; ::grpc::Status status_wanted; }; - PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(reader_mu_); }; /// \a ServerWriteReactor is the interface for a server-streaming RPC. @@ -571,10 +579,10 @@ template <class Response> class ServerWriteReactor : public internal::ServerReactor { public: ServerWriteReactor() : writer_(nullptr) {} - ~ServerWriteReactor() = default; + ~ServerWriteReactor() override = default; /// The following operation initiations are exactly like ServerBidiReactor. - void StartSendInitialMetadata() { + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { ServerCallbackWriter<Response>* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { @@ -590,7 +598,8 @@ class ServerWriteReactor : public internal::ServerReactor { void StartWrite(const Response* resp) { StartWrite(resp, ::grpc::WriteOptions()); } - void StartWrite(const Response* resp, ::grpc::WriteOptions options) { + void StartWrite(const Response* resp, ::grpc::WriteOptions options) + Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { ServerCallbackWriter<Response>* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { @@ -598,14 +607,14 @@ class ServerWriteReactor : public internal::ServerReactor { writer = writer_.load(std::memory_order_relaxed); if (writer == nullptr) { backlog_.write_wanted = resp; - backlog_.write_options_wanted = std::move(options); + backlog_.write_options_wanted = options; return; } } - writer->Write(resp, std::move(options)); + writer->Write(resp, options); } void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, - ::grpc::Status s) { + ::grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { ServerCallbackWriter<Response>* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { @@ -614,17 +623,17 @@ class ServerWriteReactor : public internal::ServerReactor { if (writer == nullptr) { backlog_.write_and_finish_wanted = true; backlog_.write_wanted = resp; - backlog_.write_options_wanted = std::move(options); + backlog_.write_options_wanted = options; backlog_.status_wanted = std::move(s); return; } } - writer->WriteAndFinish(resp, std::move(options), std::move(s)); + writer->WriteAndFinish(resp, options, std::move(s)); } void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { - StartWrite(resp, std::move(options.set_last_message())); + StartWrite(resp, options.set_last_message()); } - void Finish(::grpc::Status s) { + void Finish(::grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { ServerCallbackWriter<Response>* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { @@ -649,7 +658,8 @@ class ServerWriteReactor : public internal::ServerReactor { friend class ServerCallbackWriter<Response>; // May be overridden by internal implementation details. This is not a public // customization point. - virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) { + virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) + Y_ABSL_LOCKS_EXCLUDED(writer_mu_) { grpc::internal::MutexLock l(&writer_mu_); if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { @@ -682,16 +692,16 @@ class ServerWriteReactor : public internal::ServerReactor { ::grpc::WriteOptions write_options_wanted; ::grpc::Status status_wanted; }; - PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(writer_mu_); }; class ServerUnaryReactor : public internal::ServerReactor { public: ServerUnaryReactor() : call_(nullptr) {} - ~ServerUnaryReactor() = default; + ~ServerUnaryReactor() override = default; /// StartSendInitialMetadata is exactly like ServerBidiReactor. - void StartSendInitialMetadata() { + void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(call_mu_) { ServerCallbackUnary* call = call_.load(std::memory_order_acquire); if (call == nullptr) { grpc::internal::MutexLock l(&call_mu_); @@ -706,7 +716,7 @@ class ServerUnaryReactor : public internal::ServerReactor { /// Finish is similar to ServerBidiReactor except for one detail. /// If the status is non-OK, any message will not be sent. Instead, /// the client will only receive the status and any trailing metadata. - void Finish(::grpc::Status s) { + void Finish(::grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(call_mu_) { ServerCallbackUnary* call = call_.load(std::memory_order_acquire); if (call == nullptr) { grpc::internal::MutexLock l(&call_mu_); @@ -729,7 +739,8 @@ class ServerUnaryReactor : public internal::ServerReactor { friend class ServerCallbackUnary; // May be overridden by internal implementation details. This is not a public // customization point. - virtual void InternalBindCall(ServerCallbackUnary* call) { + virtual void InternalBindCall(ServerCallbackUnary* call) + Y_ABSL_LOCKS_EXCLUDED(call_mu_) { grpc::internal::MutexLock l(&call_mu_); if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { @@ -749,7 +760,7 @@ class ServerUnaryReactor : public internal::ServerReactor { bool finish_wanted = false; ::grpc::Status status_wanted; }; - PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */; + PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(call_mu_); }; namespace internal { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h index 8120fcaf85..76e655a9b3 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_callback_handlers.h @@ -53,7 +53,7 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { param.call->call(), sizeof(ServerCallbackUnaryImpl))) ServerCallbackUnaryImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, allocator_state, std::move(param.call_requester)); + param.call, allocator_state, param.call_requester); param.server_context->BeginCompletionOp( param.call, [call](bool) { call->MaybeDone(); }, call); @@ -157,14 +157,15 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor // thread. However, any OnDone needed after that can be inlined because it // is already running on an executor thread. - meta_tag_.Set(call_.call(), - [this](bool ok) { - ServerUnaryReactor* reactor = - reactor_.load(std::memory_order_relaxed); - reactor->OnSendInitialMetadataDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &meta_ops_, /*can_inline=*/false); + meta_tag_.Set( + call_.call(), + [this](bool ok) { + ServerUnaryReactor* reactor = + reactor_.load(std::memory_order_relaxed); + reactor->OnSendInitialMetadataDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &meta_ops_, /*can_inline=*/false); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -209,6 +210,9 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); allocator_state_->Release(); + if (ctx_->context_allocator() != nullptr) { + ctx_->context_allocator()->Release(ctx_); + } this->~ServerCallbackUnaryImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); @@ -265,7 +269,7 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { param.call->call(), sizeof(ServerCallbackReaderImpl))) ServerCallbackReaderImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, std::move(param.call_requester)); + param.call, param.call_requester); // Inlineable OnDone can be false in the CompletionOp callback because there // is no read reactor that has an inlineable OnDone; this only applies to // the DefaultReactor (which is unary). @@ -305,14 +309,15 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { // A finish tag with only MaybeDone can have its callback inlined // regardless even if OnDone is not inlineable because this callback just // checks a ref and then decides whether or not to dispatch OnDone. - finish_tag_.Set(call_.call(), - [this](bool) { - // Inlineable OnDone can be false here because there is - // no read reactor that has an inlineable OnDone; this - // only applies to the DefaultReactor (which is unary). - this->MaybeDone(/*inlineable_ondone=*/false); - }, - &finish_ops_, /*can_inline=*/true); + finish_tag_.Set( + call_.call(), + [this](bool) { + // Inlineable OnDone can be false here because there is + // no read reactor that has an inlineable OnDone; this + // only applies to the DefaultReactor (which is unary). + this->MaybeDone(/*inlineable_ondone=*/false); + }, + &finish_ops_, /*can_inline=*/true); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); @@ -338,14 +343,15 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - meta_tag_.Set(call_.call(), - [this](bool ok) { - ServerReadReactor<RequestType>* reactor = - reactor_.load(std::memory_order_relaxed); - reactor->OnSendInitialMetadataDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &meta_ops_, /*can_inline=*/false); + meta_tag_.Set( + call_.call(), + [this](bool ok) { + ServerReadReactor<RequestType>* reactor = + reactor_.load(std::memory_order_relaxed); + reactor->OnSendInitialMetadataDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &meta_ops_, /*can_inline=*/false); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -375,12 +381,13 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - read_tag_.Set(call_.call(), - [this, reactor](bool ok) { - reactor->OnReadDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &read_ops_, /*can_inline=*/false); + read_tag_.Set( + call_.call(), + [this, reactor](bool ok) { + reactor->OnReadDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); this->BindReactor(reactor); this->MaybeCallOnCancel(reactor); @@ -398,6 +405,9 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); + if (ctx_->context_allocator() != nullptr) { + ctx_->context_allocator()->Release(ctx_); + } this->~ServerCallbackReaderImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); @@ -449,7 +459,7 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { ServerCallbackWriterImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), param.call, static_cast<RequestType*>(param.request), - std::move(param.call_requester)); + param.call_requester); // Inlineable OnDone can be false in the CompletionOp callback because there // is no write reactor that has an inlineable OnDone; this only applies to // the DefaultReactor (which is unary). @@ -505,14 +515,15 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { // A finish tag with only MaybeDone can have its callback inlined // regardless even if OnDone is not inlineable because this callback just // checks a ref and then decides whether or not to dispatch OnDone. - finish_tag_.Set(call_.call(), - [this](bool) { - // Inlineable OnDone can be false here because there is - // no write reactor that has an inlineable OnDone; this - // only applies to the DefaultReactor (which is unary). - this->MaybeDone(/*inlineable_ondone=*/false); - }, - &finish_ops_, /*can_inline=*/true); + finish_tag_.Set( + call_.call(), + [this](bool) { + // Inlineable OnDone can be false here because there is + // no write reactor that has an inlineable OnDone; this + // only applies to the DefaultReactor (which is unary). + this->MaybeDone(/*inlineable_ondone=*/false); + }, + &finish_ops_, /*can_inline=*/true); finish_ops_.set_core_cq_tag(&finish_tag_); if (!ctx_->sent_initial_metadata_) { @@ -533,14 +544,15 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - meta_tag_.Set(call_.call(), - [this](bool ok) { - ServerWriteReactor<ResponseType>* reactor = - reactor_.load(std::memory_order_relaxed); - reactor->OnSendInitialMetadataDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &meta_ops_, /*can_inline=*/false); + meta_tag_.Set( + call_.call(), + [this](bool ok) { + ServerWriteReactor<ResponseType>* reactor = + reactor_.load(std::memory_order_relaxed); + reactor->OnSendInitialMetadataDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &meta_ops_, /*can_inline=*/false); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -595,12 +607,13 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - write_tag_.Set(call_.call(), - [this, reactor](bool ok) { - reactor->OnWriteDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &write_ops_, /*can_inline=*/false); + write_tag_.Set( + call_.call(), + [this, reactor](bool ok) { + reactor->OnWriteDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &write_ops_, /*can_inline=*/false); write_ops_.set_core_cq_tag(&write_tag_); this->BindReactor(reactor); this->MaybeCallOnCancel(reactor); @@ -609,7 +622,11 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { // DefaultReactor (which is unary). this->MaybeDone(/*inlineable_ondone=*/false); } - ~ServerCallbackWriterImpl() { req_->~RequestType(); } + ~ServerCallbackWriterImpl() { + if (req_ != nullptr) { + req_->~RequestType(); + } + } const RequestType* request() { return req_; } @@ -617,6 +634,9 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); + if (ctx_->context_allocator() != nullptr) { + ctx_->context_allocator()->Release(ctx_); + } this->~ServerCallbackWriterImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); @@ -666,7 +686,7 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) ServerCallbackReaderWriterImpl( static_cast<::grpc::CallbackServerContext*>(param.server_context), - param.call, std::move(param.call_requester)); + param.call, param.call_requester); // Inlineable OnDone can be false in the CompletionOp callback because there // is no bidi reactor that has an inlineable OnDone; this only applies to // the DefaultReactor (which is unary). @@ -707,14 +727,15 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { // A finish tag with only MaybeDone can have its callback inlined // regardless even if OnDone is not inlineable because this callback just // checks a ref and then decides whether or not to dispatch OnDone. - finish_tag_.Set(call_.call(), - [this](bool) { - // Inlineable OnDone can be false here because there is - // no bidi reactor that has an inlineable OnDone; this - // only applies to the DefaultReactor (which is unary). - this->MaybeDone(/*inlineable_ondone=*/false); - }, - &finish_ops_, /*can_inline=*/true); + finish_tag_.Set( + call_.call(), + [this](bool) { + // Inlineable OnDone can be false here because there is + // no bidi reactor that has an inlineable OnDone; this + // only applies to the DefaultReactor (which is unary). + this->MaybeDone(/*inlineable_ondone=*/false); + }, + &finish_ops_, /*can_inline=*/true); finish_ops_.set_core_cq_tag(&finish_tag_); if (!ctx_->sent_initial_metadata_) { @@ -735,14 +756,15 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { // The callback for this function should not be inlined because it invokes // a user-controlled reaction, but any resulting OnDone can be inlined in // the executor to which this callback is dispatched. - meta_tag_.Set(call_.call(), - [this](bool ok) { - ServerBidiReactor<RequestType, ResponseType>* reactor = - reactor_.load(std::memory_order_relaxed); - reactor->OnSendInitialMetadataDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &meta_ops_, /*can_inline=*/false); + meta_tag_.Set( + call_.call(), + [this](bool ok) { + ServerBidiReactor<RequestType, ResponseType>* reactor = + reactor_.load(std::memory_order_relaxed); + reactor->OnSendInitialMetadataDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &meta_ops_, /*can_inline=*/false); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -798,19 +820,21 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { // The callbacks for these functions should not be inlined because they // invoke user-controlled reactions, but any resulting OnDones can be // inlined in the executor to which a callback is dispatched. - write_tag_.Set(call_.call(), - [this, reactor](bool ok) { - reactor->OnWriteDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &write_ops_, /*can_inline=*/false); + write_tag_.Set( + call_.call(), + [this, reactor](bool ok) { + reactor->OnWriteDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &write_ops_, /*can_inline=*/false); write_ops_.set_core_cq_tag(&write_tag_); - read_tag_.Set(call_.call(), - [this, reactor](bool ok) { - reactor->OnReadDone(ok); - this->MaybeDone(/*inlineable_ondone=*/true); - }, - &read_ops_, /*can_inline=*/false); + read_tag_.Set( + call_.call(), + [this, reactor](bool ok) { + reactor->OnReadDone(ok); + this->MaybeDone(/*inlineable_ondone=*/true); + }, + &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); this->BindReactor(reactor); this->MaybeCallOnCancel(reactor); @@ -824,6 +848,9 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler { reactor_.load(std::memory_order_relaxed)->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); + if (ctx_->context_allocator() != nullptr) { + ctx_->context_allocator()->Release(ctx_); + } this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor ::grpc::g_core_codegen_interface->grpc_call_unref(call); call_requester(); diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h index 685f006cda..a73155e59f 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h @@ -37,6 +37,7 @@ #include <grpcpp/impl/codegen/create_auth_context.h> #include <grpcpp/impl/codegen/message_allocator.h> #include <grpcpp/impl/codegen/metadata_map.h> +#include <grpcpp/impl/codegen/rpc_service_method.h> #include <grpcpp/impl/codegen/security/auth_context.h> #include <grpcpp/impl/codegen/server_callback.h> #include <grpcpp/impl/codegen/server_interceptor.h> @@ -75,7 +76,11 @@ template <class RequestType, class ResponseType> class CallbackBidiHandler; template <class ServiceType, class RequestType, class ResponseType> class ClientStreamingHandler; -template <class ServiceType, class RequestType, class ResponseType> +template <class ResponseType> +void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter&, + ResponseType*, Status&); +template <class ServiceType, class RequestType, class ResponseType, + class BaseRequestType, class BaseResponseType> class RpcMethodHandler; template <class Base> class FinishOnlyReactor; @@ -95,6 +100,7 @@ class CompletionQueue; class GenericServerContext; class Server; class ServerInterface; +class ContextAllocator; // TODO(vjpai): Remove namespace experimental when de-experimentalized fully. namespace experimental { @@ -260,7 +266,7 @@ class ServerContextBase { /// /// \see grpc::AuthContext. std::shared_ptr<const ::grpc::AuthContext> auth_context() const { - if (auth_context_.get() == nullptr) { + if (auth_context_ == nullptr) { auth_context_ = ::grpc::CreateAuthContext(call_.call); } return auth_context_; @@ -335,6 +341,12 @@ class ServerContextBase { ServerContextBase(); ServerContextBase(gpr_timespec deadline, grpc_metadata_array* arr); + void set_context_allocator(ContextAllocator* context_allocator) { + context_allocator_ = context_allocator; + } + + ContextAllocator* context_allocator() const { return context_allocator_; } + private: friend class ::grpc::testing::InteropServerContextInspector; friend class ::grpc::testing::ServerContextTestSpouse; @@ -355,7 +367,12 @@ class ServerContextBase { friend class ::grpc::ServerWriter; template <class W, class R> friend class ::grpc::internal::ServerReaderWriterBody; - template <class ServiceType, class RequestType, class ResponseType> + template <class ResponseType> + friend void ::grpc::internal::UnaryRunHandlerHelper( + const internal::MethodHandler::HandlerParameter& param, ResponseType* rsp, + Status& status); + template <class ServiceType, class RequestType, class ResponseType, + class BaseRequestType, class BaseResponseType> friend class ::grpc::internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> friend class ::grpc::internal::ClientStreamingHandler; @@ -405,7 +422,7 @@ class ServerContextBase { const char* method, ::grpc::internal::RpcMethod::RpcType type, const std::vector<std::unique_ptr< ::grpc::experimental::ServerInterceptorFactoryInterface>>& creators) { - if (creators.size() != 0) { + if (!creators.empty()) { rpc_info_ = new ::grpc::experimental::ServerRpcInfo(this, method, type); rpc_info_->RegisterInterceptors(creators); } @@ -453,6 +470,7 @@ class ServerContextBase { ::grpc::experimental::ServerRpcInfo* rpc_info_ = nullptr; ::grpc::experimental::RpcAllocatorState* message_allocator_state_ = nullptr; + ContextAllocator* context_allocator_ = nullptr; class Reactor : public ::grpc::ServerUnaryReactor { public: @@ -466,6 +484,7 @@ class ServerContextBase { }; void SetupTestDefaultReactor(std::function<void(::grpc::Status)> func) { + // NOLINTNEXTLINE(modernize-make-unique) test_unary_.reset(new TestServerCallbackUnary(this, std::move(func))); } bool test_status_set() const { @@ -579,12 +598,14 @@ class CallbackServerContext : public ServerContextBase { using ServerContextBase::compression_algorithm; using ServerContextBase::compression_level; using ServerContextBase::compression_level_set; + using ServerContextBase::context_allocator; using ServerContextBase::deadline; using ServerContextBase::IsCancelled; using ServerContextBase::peer; using ServerContextBase::raw_deadline; using ServerContextBase::set_compression_algorithm; using ServerContextBase::set_compression_level; + using ServerContextBase::set_context_allocator; using ServerContextBase::SetLoadReportingCosts; using ServerContextBase::TryCancel; @@ -601,6 +622,37 @@ class CallbackServerContext : public ServerContextBase { CallbackServerContext& operator=(const CallbackServerContext&) = delete; }; +/// A CallbackServerContext allows users to use the contents of the +/// CallbackServerContext or GenericCallbackServerContext structure for the +/// callback API. +/// The library will invoke the allocator any time a new call is initiated. +/// and call the Release method after the server OnDone. +class ContextAllocator { + public: + virtual ~ContextAllocator() {} + + virtual CallbackServerContext* NewCallbackServerContext() { return nullptr; } + +#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL + virtual experimental::GenericCallbackServerContext* + NewGenericCallbackServerContext() { + return nullptr; + } +#else + virtual GenericCallbackServerContext* NewGenericCallbackServerContext() { + return nullptr; + } +#endif + + virtual void Release(CallbackServerContext*) {} + +#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL + virtual void Release(experimental::GenericCallbackServerContext*) {} +#else + virtual void Release(GenericCallbackServerContext*) {} +#endif +}; + } // namespace grpc static_assert( diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h index d97b725025..7fb5038826 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h @@ -64,7 +64,7 @@ class ServerInterceptorFactoryInterface; class ServerInterface : public internal::CallHook { public: - virtual ~ServerInterface() {} + ~ServerInterface() override {} /// \a Shutdown does the following things: /// @@ -147,6 +147,8 @@ class ServerInterface : public internal::CallHook { /// May not be abstract since this is a post-1.0 API addition virtual void RegisterCallbackGenericService( experimental::CallbackGenericService* /*service*/) {} + virtual void RegisterContextAllocator( + std::unique_ptr<ContextAllocator> /*context_allocator*/) {} }; /// NOTE: The function experimental_registration() is not stable public API. @@ -186,8 +188,8 @@ class ServerInterface : public internal::CallHook { virtual grpc_server* server() = 0; - virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) = 0; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override = 0; class BaseAsyncRequest : public internal::CompletionQueueTag { public: @@ -196,7 +198,7 @@ class ServerInterface : public internal::CallHook { ::grpc::CompletionQueue* call_cq, ::grpc::ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize); - virtual ~BaseAsyncRequest(); + ~BaseAsyncRequest() override; bool FinalizeResult(void** tag, bool* status) override; @@ -228,7 +230,7 @@ class ServerInterface : public internal::CallHook { void* tag, const char* name, internal::RpcMethod::RpcType type); - virtual bool FinalizeResult(void** tag, bool* status) override { + bool FinalizeResult(void** tag, bool* status) override { /* If we are done intercepting, then there is nothing more for us to do */ if (done_intercepting_) { return BaseAsyncRequest::FinalizeResult(tag, status); @@ -283,7 +285,7 @@ class ServerInterface : public internal::CallHook { notification_cq); } - ~PayloadAsyncRequest() { + ~PayloadAsyncRequest() override { payload_.Release(); // We do not own the payload_ } diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/service_type.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/service_type.h index 30be904a3c..57ca9f0944 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/service_type.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/service_type.h @@ -91,7 +91,7 @@ class Service { bool has_generic_methods() const { for (const auto& method : methods_) { - if (method.get() == nullptr) { + if (method == nullptr) { return true; } } diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/slice.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/slice.h index b1a24dcef8..0b9752ccff 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/slice.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/slice.h @@ -49,7 +49,7 @@ class Slice final { Slice(grpc_slice slice, StealRef) : slice_(slice) {} /// Allocate a slice of specified size - Slice(size_t len) + explicit Slice(size_t len) : slice_(g_core_codegen_interface->grpc_slice_malloc(len)) {} /// Construct a slice from a copied buffer @@ -58,6 +58,7 @@ class Slice final { reinterpret_cast<const char*>(buf), len)) {} /// Construct a slice from a copied string + /* NOLINTNEXTLINE(google-explicit-constructor) */ Slice(const TString& str) : slice_(g_core_codegen_interface->grpc_slice_from_copied_buffer( str.c_str(), str.length())) {} diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/string_ref.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/string_ref.h index c5dcd31c1d..a13533eca5 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/string_ref.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/string_ref.h @@ -53,14 +53,17 @@ class string_ref { string_ref() : data_(nullptr), length_(0) {} string_ref(const string_ref& other) : data_(other.data_), length_(other.length_) {} + // NOLINTNEXTLINE(bugprone-unhandled-self-assignment) string_ref& operator=(const string_ref& rhs) { data_ = rhs.data_; length_ = rhs.length_; return *this; } + /* NOLINTNEXTLINE(google-explicit-constructor) */ string_ref(const char* s) : data_(s), length_(strlen(s)) {} string_ref(const char* s, size_t l) : data_(s), length_(l) {} + /* NOLINTNEXTLINE(google-explicit-constructor) */ string_ref(const TString& s) : data_(s.data()), length_(s.length()) {} /// iterators diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/sync.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/sync.h index 146f182e57..bf89a0e39a 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/sync.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/sync.h @@ -32,6 +32,8 @@ #include <grpcpp/impl/codegen/core_codegen_interface.h> +#include "y_absl/synchronization/mutex.h" + // The core library is not accessible in C++ codegen headers, and vice versa. // Thus, we need to have duplicate headers with similar functionality. // Make sure any change to this file is also reflected in @@ -44,7 +46,16 @@ namespace grpc { namespace internal { -class Mutex { +#ifdef GRPCPP_ABSEIL_SYNC + +using Mutex = y_absl::Mutex; +using MutexLock = y_absl::MutexLock; +using ReleasableMutexLock = y_absl::ReleasableMutexLock; +using CondVar = y_absl::CondVar; + +#else + +class Y_ABSL_LOCKABLE Mutex { public: Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); } ~Mutex() { g_core_codegen_interface->gpr_mu_destroy(&mu_); } @@ -52,8 +63,12 @@ class Mutex { Mutex(const Mutex&) = delete; Mutex& operator=(const Mutex&) = delete; - gpr_mu* get() { return &mu_; } - const gpr_mu* get() const { return &mu_; } + void Lock() Y_ABSL_EXCLUSIVE_LOCK_FUNCTION() { + g_core_codegen_interface->gpr_mu_lock(&mu_); + } + void Unlock() Y_ABSL_UNLOCK_FUNCTION() { + g_core_codegen_interface->gpr_mu_unlock(&mu_); + } private: union { @@ -63,55 +78,45 @@ class Mutex { pthread_mutex_t do_not_use_pth_; #endif }; + + friend class CondVar; }; -// MutexLock is a std:: -class MutexLock { +class Y_ABSL_SCOPED_LOCKABLE MutexLock { public: - explicit MutexLock(Mutex* mu) : mu_(mu->get()) { - g_core_codegen_interface->gpr_mu_lock(mu_); + explicit MutexLock(Mutex* mu) Y_ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) { + mu_->Lock(); } - explicit MutexLock(gpr_mu* mu) : mu_(mu) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - ~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); } + ~MutexLock() Y_ABSL_UNLOCK_FUNCTION() { mu_->Unlock(); } MutexLock(const MutexLock&) = delete; MutexLock& operator=(const MutexLock&) = delete; private: - gpr_mu* const mu_; + Mutex* const mu_; }; -class ReleasableMutexLock { +class Y_ABSL_SCOPED_LOCKABLE ReleasableMutexLock { public: - explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { - g_core_codegen_interface->gpr_mu_lock(mu_); + explicit ReleasableMutexLock(Mutex* mu) Y_ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + mu_->Lock(); } - ~ReleasableMutexLock() { - if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_); + ~ReleasableMutexLock() Y_ABSL_UNLOCK_FUNCTION() { + if (!released_) mu_->Unlock(); } ReleasableMutexLock(const ReleasableMutexLock&) = delete; ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; - void Lock() { - GPR_DEBUG_ASSERT(released_); - g_core_codegen_interface->gpr_mu_lock(mu_); - released_ = false; - } - - void Unlock() { + void Release() Y_ABSL_UNLOCK_FUNCTION() { GPR_DEBUG_ASSERT(!released_); released_ = true; - g_core_codegen_interface->gpr_mu_unlock(mu_); + mu_->Unlock(); } private: - gpr_mu* const mu_; + Mutex* const mu_; bool released_ = false; }; @@ -124,27 +129,27 @@ class CondVar { CondVar& operator=(const CondVar&) = delete; void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); } - void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } - - int Wait(Mutex* mu) { - return Wait(mu, - g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); - } - int Wait(Mutex* mu, const gpr_timespec& deadline) { - return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline); - } + void SignalAll() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } - template <typename Predicate> - void WaitUntil(Mutex* mu, Predicate pred) { - while (!pred()) { - Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); - } + void Wait(Mutex* mu) { + g_core_codegen_interface->gpr_cv_wait( + &cv_, &mu->mu_, + g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); } private: gpr_cv cv_; }; +#endif // GRPCPP_ABSEIL_SYNC + +template <typename Predicate> +static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) { + while (!pred()) { + cv->Wait(mu); + } +} + } // namespace internal } // namespace grpc diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/time.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/time.h index 3a54db45bf..88cf12fd82 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/time.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/time.h @@ -54,6 +54,7 @@ class TimePoint { template <> class TimePoint<gpr_timespec> { public: + /* NOLINTNEXTLINE(google-explicit-constructor) */ TimePoint(const gpr_timespec& time) : time_(time) {} gpr_timespec raw_time() { return time_; } @@ -77,6 +78,7 @@ std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t); template <> class TimePoint<std::chrono::system_clock::time_point> { public: + /* NOLINTNEXTLINE(google-explicit-constructor) */ TimePoint(const std::chrono::system_clock::time_point& time) { Timepoint2Timespec(time, &time_); } diff --git a/contrib/libs/grpc/include/grpcpp/impl/server_initializer.h b/contrib/libs/grpc/include/grpcpp/impl/server_initializer.h index 38b17edacd..17928e9a4a 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/server_initializer.h +++ b/contrib/libs/grpc/include/grpcpp/impl/server_initializer.h @@ -30,7 +30,7 @@ class Service; class ServerInitializer { public: - ServerInitializer(grpc::Server* server) : server_(server) {} + explicit ServerInitializer(grpc::Server* server) : server_(server) {} bool RegisterService(std::shared_ptr<grpc::Service> service) { if (!server_->RegisterService(nullptr, service.get())) { |