diff options
author | heretic <heretic@yandex-team.ru> | 2022-02-10 16:45:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:46 +0300 |
commit | 81eddc8c0b55990194e112b02d127b87d54164a9 (patch) | |
tree | 9142afc54d335ea52910662635b898e79e192e49 /contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h | |
parent | 397cbe258b9e064f49c4ca575279f02f39fef76e (diff) | |
download | ydb-81eddc8c0b55990194e112b02d127b87d54164a9.tar.gz |
Restoring authorship annotation for <heretic@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h')
-rw-r--r-- | contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h | 1792 |
1 files changed, 896 insertions, 896 deletions
diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h index be124aaeb2..408f42f280 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h @@ -1,6 +1,6 @@ /* * - * Copyright 2019 gRPC authors. + * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,926 +18,926 @@ #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H -#include <grpcpp/impl/codegen/call.h> -#include <grpcpp/impl/codegen/channel_interface.h> -#include <grpcpp/impl/codegen/client_context.h> -#include <grpcpp/impl/codegen/completion_queue.h> -#include <grpcpp/impl/codegen/core_codegen_interface.h> -#include <grpcpp/impl/codegen/server_context.h> -#include <grpcpp/impl/codegen/service_type.h> -#include <grpcpp/impl/codegen/status.h> +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/client_context.h> +#include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/impl/codegen/service_type.h> +#include <grpcpp/impl/codegen/status.h> namespace grpc { namespace internal { -/// Common interface for all synchronous client side streaming. -class ClientStreamingInterface { - public: - virtual ~ClientStreamingInterface() {} - - /// Block waiting until the stream finishes and a final status of the call is - /// available. - /// - /// It is appropriate to call this method exactly once when both: - /// * the calling code (client-side) has no more message to send - /// (this can be declared implicitly by calling this method, or - /// explicitly through an earlier call to <i>WritesDone</i> method of the - /// class in use, e.g. \a ClientWriterInterface::WritesDone or - /// \a ClientReaderWriterInterface::WritesDone). - /// * there are no more messages to be received from the server (which can - /// be known implicitly, or explicitly from an earlier call to \a - /// ReaderInterface::Read that returned "false"). - /// - /// This function will return either: - /// - when all incoming messages have been read and the server has - /// returned status. - /// - when the server has returned a non-OK status. - /// - OR when the call failed for some reason and the library generated a - /// status. - /// - /// Return values: - /// - \a Status contains the status code, message and details for the call - /// - the \a ClientContext associated with this call is updated with - /// possible trailing metadata sent from the server. - virtual ::grpc::Status Finish() = 0; -}; - -/// Common interface for all synchronous server side streaming. -class ServerStreamingInterface { - public: - virtual ~ServerStreamingInterface() {} - - /// Block to send initial metadata to client. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// The initial metadata that will be sent to the client will be - /// taken from the \a ServerContext associated with the call. - virtual void SendInitialMetadata() = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. +/// Common interface for all synchronous client side streaming. +class ClientStreamingInterface { + public: + virtual ~ClientStreamingInterface() {} + + /// Block waiting until the stream finishes and a final status of the call is + /// available. + /// + /// It is appropriate to call this method exactly once when both: + /// * the calling code (client-side) has no more message to send + /// (this can be declared implicitly by calling this method, or + /// explicitly through an earlier call to <i>WritesDone</i> method of the + /// class in use, e.g. \a ClientWriterInterface::WritesDone or + /// \a ClientReaderWriterInterface::WritesDone). + /// * there are no more messages to be received from the server (which can + /// be known implicitly, or explicitly from an earlier call to \a + /// ReaderInterface::Read that returned "false"). + /// + /// This function will return either: + /// - when all incoming messages have been read and the server has + /// returned status. + /// - when the server has returned a non-OK status. + /// - OR when the call failed for some reason and the library generated a + /// status. + /// + /// Return values: + /// - \a Status contains the status code, message and details for the call + /// - the \a ClientContext associated with this call is updated with + /// possible trailing metadata sent from the server. + virtual ::grpc::Status Finish() = 0; +}; + +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; + +/// An interface that yields a sequence of messages of type \a R. template <class R> -class ReaderInterface { - public: - virtual ~ReaderInterface() {} - - /// Get an upper bound on the next message size available for reading on this - /// stream. - virtual bool NextMessageSize(uint32_t* sz) = 0; - - /// Block to read a message and parse to \a msg. Returns \a true on success. - /// This is thread-safe with respect to \a Write or \WritesDone methods on - /// the same stream. It should not be called concurrently with another \a - /// Read on the same stream as the order of delivery will not be defined. - /// - /// \param[out] msg The read message. - /// - /// \return \a false when there will be no more incoming messages, either - /// because the other side has called \a WritesDone() or the stream has failed - /// (or been cancelled). - virtual bool Read(R* msg) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. +class ReaderInterface { + public: + virtual ~ReaderInterface() {} + + /// Get an upper bound on the next message size available for reading on this + /// stream. + virtual bool NextMessageSize(uint32_t* sz) = 0; + + /// Block to read a message and parse to \a msg. Returns \a true on success. + /// This is thread-safe with respect to \a Write or \WritesDone methods on + /// the same stream. It should not be called concurrently with another \a + /// Read on the same stream as the order of delivery will not be defined. + /// + /// \param[out] msg The read message. + /// + /// \return \a false when there will be no more incoming messages, either + /// because the other side has called \a WritesDone() or the stream has failed + /// (or been cancelled). + virtual bool Read(R* msg) = 0; +}; + +/// An interface that can be fed a sequence of messages of type \a W. template <class W> -class WriterInterface { - public: - virtual ~WriterInterface() {} - - /// Block to write \a msg to the stream with WriteOptions \a options. - /// This is thread-safe with respect to \a ReaderInterface::Read - /// - /// \param msg The message to be written to the stream. - /// \param options The WriteOptions affecting the write operation. - /// - /// \return \a true on success, \a false when the stream has been closed. - virtual bool Write(const W& msg, ::grpc::WriteOptions options) = 0; - - /// Block to write \a msg to the stream with default write options. - /// This is thread-safe with respect to \a ReaderInterface::Read - /// - /// \param msg The message to be written to the stream. - /// - /// \return \a true on success, \a false when the stream has been closed. - inline bool Write(const W& msg) { return Write(msg, ::grpc::WriteOptions()); } - - /// Write \a msg and coalesce it with the writing of trailing metadata, using - /// WriteOptions \a options. - /// - /// For client, WriteLast is equivalent of performing Write and WritesDone in - /// a single step. \a msg and trailing metadata are coalesced and sent on wire - /// by calling this function. For server, WriteLast buffers the \a msg. - /// The writing of \a msg is held until the service handler returns, - /// where \a msg and trailing metadata are coalesced and sent on wire. - /// Note that WriteLast can only buffer \a msg up to the flow control window - /// size. If \a msg size is larger than the window size, it will be sent on - /// wire without buffering. - /// - /// \param[in] msg The message to be written to the stream. - /// \param[in] options The WriteOptions to be used to write this message. - void WriteLast(const W& msg, ::grpc::WriteOptions options) { - Write(msg, options.set_last_message()); - } -}; - +class WriterInterface { + public: + virtual ~WriterInterface() {} + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + virtual bool Write(const W& msg, ::grpc::WriteOptions options) = 0; + + /// Block to write \a msg to the stream with default write options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// + /// \return \a true on success, \a false when the stream has been closed. + inline bool Write(const W& msg) { return Write(msg, ::grpc::WriteOptions()); } + + /// Write \a msg and coalesce it with the writing of trailing metadata, using + /// WriteOptions \a options. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. \a msg and trailing metadata are coalesced and sent on wire + /// by calling this function. For server, WriteLast buffers the \a msg. + /// The writing of \a msg is held until the service handler returns, + /// where \a msg and trailing metadata are coalesced and sent on wire. + /// Note that WriteLast can only buffer \a msg up to the flow control window + /// size. If \a msg size is larger than the window size, it will be sent on + /// wire without buffering. + /// + /// \param[in] msg The message to be written to the stream. + /// \param[in] options The WriteOptions to be used to write this message. + void WriteLast(const W& msg, ::grpc::WriteOptions options) { + Write(msg, options.set_last_message()); + } +}; + } // namespace internal -/// Client-side interface for streaming reads of message of type \a R. +/// Client-side interface for streaming reads of message of type \a R. template <class R> -class ClientReaderInterface : public internal::ClientStreamingInterface, - public internal::ReaderInterface<R> { - public: - /// Block to wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; -}; - -namespace internal { +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +namespace internal { template <class R> -class ClientReaderFactory { - public: - template <class W> - static ClientReader<R>* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ::grpc::ClientContext* context, - const W& request) { - return new ClientReader<R>(channel, method, context, request); - } -}; -} // namespace internal - -/// Synchronous (blocking) client-side API for doing server-streaming RPCs, -/// where the stream of messages coming from the server has messages -/// of type \a R. -template <class R> -class ClientReader final : public ClientReaderInterface<R> { - public: - /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for - /// semantics. - /// - // Side effect: - /// Once complete, the initial metadata read from - /// the server will be accessible through the \a ClientContext used to - /// construct this object. - void WaitForInitialMetadata() override { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); /// status ignored - } - - bool NextMessageSize(uint32_t* sz) override { - int result = call_.max_receive_message_size(); - *sz = (result > 0) ? result : UINT32_MAX; - return true; - } - - /// See the \a ReaderInterface.Read method for semantics. - /// Side effect: - /// This also receives initial metadata from the server, if not - /// already received (if initial metadata is received, it can be then - /// accessed through the \a ClientContext associated with this call). - bool Read(R* msg) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>> - ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - /// See the \a ClientStreamingInterface.Finish method for semantics. - /// - /// Side effect: - /// The \a ClientContext associated with this call is updated with - /// possible metadata received from the server. - ::grpc::Status Finish() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; - ::grpc::Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - friend class internal::ClientReaderFactory<R>; - ::grpc::ClientContext* context_; - ::grpc::CompletionQueue cq_; - ::grpc::internal::Call call_; - - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial - /// metadata used to send to the server when starting the call. - template <class W> - ClientReader(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ::grpc::ClientContext* context, const W& request) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, - nullptr}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> - ops; - ops.SendInitialMetadata(&context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } -}; - -/// Client-side interface for streaming writes of message type \a W. +class ClientReaderFactory { + public: + template <class W> + static ClientReader<R>* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ::grpc::ClientContext* context, + const W& request) { + return new ClientReader<R>(channel, method, context, request); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for doing server-streaming RPCs, +/// where the stream of messages coming from the server has messages +/// of type \a R. +template <class R> +class ClientReader final : public ClientReaderInterface<R> { + public: + /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for + /// semantics. + /// + // Side effect: + /// Once complete, the initial metadata read from + /// the server will be accessible through the \a ClientContext used to + /// construct this object. + void WaitForInitialMetadata() override { + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); /// status ignored + } + + bool NextMessageSize(uint32_t* sz) override { + int result = call_.max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + /// See the \a ReaderInterface.Read method for semantics. + /// Side effect: + /// This also receives initial metadata from the server, if not + /// already received (if initial metadata is received, it can be then + /// accessed through the \a ClientContext associated with this call). + bool Read(R* msg) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + /// See the \a ClientStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// The \a ClientContext associated with this call is updated with + /// possible metadata received from the server. + ::grpc::Status Finish() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; + ::grpc::Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + friend class internal::ClientReaderFactory<R>; + ::grpc::ClientContext* context_; + ::grpc::CompletionQueue cq_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template <class W> + ClientReader(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ::grpc::ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } +}; + +/// Client-side interface for streaming writes of message type \a W. +template <class W> +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W> { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the client is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +namespace internal { template <class W> -class ClientWriterInterface : public internal::ClientStreamingInterface, - public internal::WriterInterface<W> { - public: - /// Half close writing from the client. (signal that the stream of messages - /// coming from the client is complete). - /// Blocks until currently-pending writes are completed. - /// Thread safe with respect to \a ReaderInterface::Read operations only - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -namespace internal { +class ClientWriterFactory { + public: + template <class R> + static ClientWriter<W>* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ::grpc::ClientContext* context, R* response) { + return new ClientWriter<W>(channel, method, context, response); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for doing client-streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W. template <class W> -class ClientWriterFactory { - public: - template <class R> - static ClientWriter<W>* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ::grpc::ClientContext* context, R* response) { - return new ClientWriter<W>(channel, method, context, response); - } -}; -} // namespace internal - -/// Synchronous (blocking) client-side API for doing client-streaming RPCs, -/// where the outgoing message stream coming from the client has messages of -/// type \a W. -template <class W> -class ClientWriter : public ClientWriterInterface<W> { - public: - /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for - /// semantics. - /// - // Side effect: - /// Once complete, the initial metadata read from the server will be - /// accessible through the \a ClientContext used to construct this object. - void WaitForInitialMetadata() { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - /// See the WriterInterface.Write(const W& msg, WriteOptions options) method - /// for semantics. - /// - /// Side effect: - /// Also sends initial metadata if not already sent (using the - /// \a ClientContext associated with this call). - using internal::WriterInterface<W>::Write; - bool Write(const W& msg, ::grpc::WriteOptions options) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> - ops; - - if (options.is_last_message()) { - options.set_buffer_hint(); - ops.ClientSendClose(); - } - if (context_->initial_metadata_corked_) { - ops.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - context_->set_initial_metadata_corked(false); - } - if (!ops.SendMessagePtr(&msg, options).ok()) { - return false; - } - - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - /// See the ClientStreamingInterface.Finish method for semantics. - /// Side effects: - /// - Also receives initial metadata if not already received. - /// - Attempts to fill in the \a response parameter passed - /// to the constructor of this instance with the response - /// message from the server. - ::grpc::Status Finish() override { - ::grpc::Status status; - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, &status); - call_.PerformOps(&finish_ops_); - GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_)); - return status; - } - - private: - friend class internal::ClientWriterFactory<W>; - - /// Block to create a stream (i.e. send request headers and other initial - /// metadata to the server). Note that \a context will be used to fill - /// in custom initial metadata. \a response will be filled in with the - /// single expected response message from the server upon a successful - /// call to the \a Finish method of this instance. - template <class R> - ClientWriter(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ::grpc::ClientContext* context, R* response) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, - nullptr}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - - if (!context_->initial_metadata_corked_) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - ops; - ops.SendInitialMetadata(&context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - } - - ::grpc::ClientContext* context_; - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpGenericRecvMessage, - ::grpc::internal::CallOpClientRecvStatus> - finish_ops_; - ::grpc::CompletionQueue cq_; - ::grpc::internal::Call call_; -}; - -/// Client-side interface for bi-directional streaming with -/// client-to-server stream messages of type \a W and -/// server-to-client stream messages of type \a R. +class ClientWriter : public ClientWriterInterface<W> { + public: + /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for + /// semantics. + /// + // Side effect: + /// Once complete, the initial metadata read from the server will be + /// accessible through the \a ClientContext used to construct this object. + void WaitForInitialMetadata() { + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + /// See the WriterInterface.Write(const W& msg, WriteOptions options) method + /// for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, ::grpc::WriteOptions options) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessagePtr(&msg, options).ok()) { + return false; + } + + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// See the ClientStreamingInterface.Finish method for semantics. + /// Side effects: + /// - Also receives initial metadata if not already received. + /// - Attempts to fill in the \a response parameter passed + /// to the constructor of this instance with the response + /// message from the server. + ::grpc::Status Finish() override { + ::grpc::Status status; + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_)); + return status; + } + + private: + friend class internal::ClientWriterFactory<W>; + + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template <class R> + ClientWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ::grpc::ClientContext* context, R* response) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } + + ::grpc::ClientContext* context_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; + ::grpc::CompletionQueue cq_; + ::grpc::internal::Call call_; +}; + +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template <class W, class R> +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the client is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +namespace internal { template <class W, class R> -class ClientReaderWriterInterface : public internal::ClientStreamingInterface, - public internal::WriterInterface<W>, - public internal::ReaderInterface<R> { - public: - /// Block to wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; - - /// Half close writing from the client. (signal that the stream of messages - /// coming from the client is complete). - /// Blocks until currently-pending writes are completed. - /// Thread-safe with respect to \a ReaderInterface::Read - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -namespace internal { +class ClientReaderWriterFactory { + public: + static ClientReaderWriter<W, R>* Create( + ::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ::grpc::ClientContext* context) { + return new ClientReaderWriter<W, R>(channel, method, context); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. template <class W, class R> -class ClientReaderWriterFactory { - public: - static ClientReaderWriter<W, R>* Create( - ::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ::grpc::ClientContext* context) { - return new ClientReaderWriter<W, R>(channel, method, context); - } -}; -} // namespace internal - -/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, -/// where the outgoing message stream coming from the client has messages of -/// type \a W, and the incoming messages stream coming from the server has -/// messages of type \a R. -template <class W, class R> -class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { - public: - /// Block waiting to read initial metadata from the server. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// Once complete, the initial metadata read from the server will be - /// accessible through the \a ClientContext used to construct this object. - void WaitForInitialMetadata() override { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> - ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - bool NextMessageSize(uint32_t* sz) override { - int result = call_.max_receive_message_size(); - *sz = (result > 0) ? result : UINT32_MAX; - return true; - } - - /// See the \a ReaderInterface.Read method for semantics. - /// Side effect: - /// Also receives initial metadata if not already received (updates the \a - /// ClientContext associated with this call in that case). - bool Read(R* msg) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpRecvMessage<R>> - ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - /// See the \a WriterInterface.Write method for semantics. - /// - /// Side effect: - /// Also sends initial metadata if not already sent (using the - /// \a ClientContext associated with this call to fill in values). - using internal::WriterInterface<W>::Write; - bool Write(const W& msg, ::grpc::WriteOptions options) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, - ::grpc::internal::CallOpSendMessage, - ::grpc::internal::CallOpClientSendClose> - ops; - - if (options.is_last_message()) { - options.set_buffer_hint(); - ops.ClientSendClose(); - } - if (context_->initial_metadata_corked_) { - ops.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - context_->set_initial_metadata_corked(false); - } - if (!ops.SendMessagePtr(&msg, options).ok()) { - return false; - } - - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - /// See the ClientStreamingInterface.Finish method for semantics. - /// - /// Side effect: - /// - the \a ClientContext associated with this call is updated with - /// possible trailing metadata sent from the server. - ::grpc::Status Finish() override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, - ::grpc::internal::CallOpClientRecvStatus> - ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ::grpc::Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - friend class internal::ClientReaderWriterFactory<W, R>; - - ::grpc::ClientContext* context_; - ::grpc::CompletionQueue cq_; - ::grpc::internal::Call call_; - - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - ClientReaderWriter(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ::grpc::ClientContext* context) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, - nullptr}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - if (!context_->initial_metadata_corked_) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - ops; - ops.SendInitialMetadata(&context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - } -}; - -/// Server-side interface for streaming reads of message of type \a R. +class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { + public: + /// Block waiting to read initial metadata from the server. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// Once complete, the initial metadata read from the server will be + /// accessible through the \a ClientContext used to construct this object. + void WaitForInitialMetadata() override { + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool NextMessageSize(uint32_t* sz) override { + int result = call_.max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + /// See the \a ReaderInterface.Read method for semantics. + /// Side effect: + /// Also receives initial metadata if not already received (updates the \a + /// ClientContext associated with this call in that case). + bool Read(R* msg) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + /// See the \a WriterInterface.Write method for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call to fill in values). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, ::grpc::WriteOptions options) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessagePtr(&msg, options).ok()) { + return false; + } + + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// See the ClientStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible trailing metadata sent from the server. + ::grpc::Status Finish() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ::grpc::Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + friend class internal::ClientReaderWriterFactory<W, R>; + + ::grpc::ClientContext* context_; + ::grpc::CompletionQueue cq_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ::grpc::ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, + nullptr}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } +}; + +/// Server-side interface for streaming reads of message of type \a R. template <class R> -class ServerReaderInterface : public internal::ServerStreamingInterface, - public internal::ReaderInterface<R> {}; +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface<R> {}; -/// Synchronous (blocking) server-side API for doing client-streaming RPCs, -/// where the incoming message stream coming from the client has messages of -/// type \a R. +/// Synchronous (blocking) server-side API for doing client-streaming RPCs, +/// where the incoming message stream coming from the client has messages of +/// type \a R. template <class R> -class ServerReader final : public ServerReaderInterface<R> { - public: - /// See the \a ServerStreamingInterface.SendInitialMetadata method - /// for semantics. Note that initial metadata will be affected by the - /// \a ServerContext associated with this call. - void SendInitialMetadata() override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - ops; - ops.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool NextMessageSize(uint32_t* sz) override { - int result = call_->max_receive_message_size(); - *sz = (result > 0) ? result : UINT32_MAX; - return true; - } - - bool Read(R* msg) override { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - private: - ::grpc::internal::Call* const call_; - ServerContext* const ctx_; - - template <class ServiceType, class RequestType, class ResponseType> - friend class internal::ClientStreamingHandler; - - ServerReader(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) - : call_(call), ctx_(ctx) {} -}; - -/// Server-side interface for streaming writes of message of type \a W. +class ServerReader final : public ServerReaderInterface<R> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool NextMessageSize(uint32_t* sz) override { + int result = call_->max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + bool Read(R* msg) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + private: + ::grpc::internal::Call* const call_; + ServerContext* const ctx_; + + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ClientStreamingHandler; + + ServerReader(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; + +/// Server-side interface for streaming writes of message of type \a W. template <class W> -class ServerWriterInterface : public internal::ServerStreamingInterface, - public internal::WriterInterface<W> {}; +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W> {}; -/// Synchronous (blocking) server-side API for doing for doing a -/// server-streaming RPCs, where the outgoing message stream coming from the -/// server has messages of type \a W. +/// Synchronous (blocking) server-side API for doing for doing a +/// server-streaming RPCs, where the outgoing message stream coming from the +/// server has messages of type \a W. template <class W> -class ServerWriter final : public ServerWriterInterface<W> { - public: - /// See the \a ServerStreamingInterface.SendInitialMetadata method - /// for semantics. - /// Note that initial metadata will be affected by the - /// \a ServerContext associated with this call. - void SendInitialMetadata() override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> - ops; - ops.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - /// See the \a WriterInterface.Write method for semantics. - /// - /// Side effect: - /// Also sends initial metadata if not already sent (using the - /// \a ClientContext associated with this call to fill in values). - using internal::WriterInterface<W>::Write; - bool Write(const W& msg, ::grpc::WriteOptions options) override { - if (options.is_last_message()) { - options.set_buffer_hint(); - } - - if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ctx_->pending_ops_); - // if this is the last message we defer the pluck until AFTER we start - // the trailing md op. This prevents hangs. See - // https://github.com/grpc/grpc/issues/11546 - if (options.is_last_message()) { - ctx_->has_pending_ops_ = true; - return true; - } - ctx_->has_pending_ops_ = false; - return call_->cq()->Pluck(&ctx_->pending_ops_); - } - - private: - ::grpc::internal::Call* const call_; - ::grpc::ServerContext* const ctx_; - - template <class ServiceType, class RequestType, class ResponseType> - friend class internal::ServerStreamingHandler; - - ServerWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) - : call_(call), ctx_(ctx) {} -}; - -/// Server-side interface for bi-directional streaming. +class ServerWriter final : public ServerWriterInterface<W> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. + /// Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + /// See the \a WriterInterface.Write method for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call to fill in values). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, ::grpc::WriteOptions options) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); + } + + private: + ::grpc::internal::Call* const call_; + ::grpc::ServerContext* const ctx_; + + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ServerStreamingHandler; + + ServerWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; + +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> {}; + +/// Actual implementation of bi-directional streaming +namespace internal { template <class W, class R> -class ServerReaderWriterInterface : public internal::ServerStreamingInterface, - public internal::WriterInterface<W>, - public internal::ReaderInterface<R> {}; +class ServerReaderWriterBody final { + public: + ServerReaderWriterBody(grpc::internal::Call* call, ::grpc::ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); -/// Actual implementation of bi-directional streaming -namespace internal { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool NextMessageSize(uint32_t* sz) { + int result = call_->max_receive_message_size(); + *sz = (result > 0) ? result : UINT32_MAX; + return true; + } + + bool Read(R* msg) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + bool Write(const W& msg, ::grpc::WriteOptions options) { + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); + } + + private: + grpc::internal::Call* const call_; + ::grpc::ServerContext* const ctx_; +}; + +} // namespace internal + +/// Synchronous (blocking) server-side API for a bidirectional +/// streaming call, where the incoming message stream coming from the client has +/// messages of type \a R, and the outgoing message streaming coming from +/// the server has messages of type \a W. template <class W, class R> -class ServerReaderWriterBody final { - public: - ServerReaderWriterBody(grpc::internal::Call* call, ::grpc::ServerContext* ctx) - : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool NextMessageSize(uint32_t* sz) { - int result = call_->max_receive_message_size(); - *sz = (result > 0) ? result : UINT32_MAX; - return true; - } - - bool Read(R* msg) { - ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - bool Write(const W& msg, ::grpc::WriteOptions options) { - if (options.is_last_message()) { - options.set_buffer_hint(); - } - if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ctx_->pending_ops_); - // if this is the last message we defer the pluck until AFTER we start - // the trailing md op. This prevents hangs. See - // https://github.com/grpc/grpc/issues/11546 - if (options.is_last_message()) { - ctx_->has_pending_ops_ = true; - return true; - } - ctx_->has_pending_ops_ = false; - return call_->cq()->Pluck(&ctx_->pending_ops_); - } - - private: - grpc::internal::Call* const call_; - ::grpc::ServerContext* const ctx_; -}; - -} // namespace internal - -/// Synchronous (blocking) server-side API for a bidirectional -/// streaming call, where the incoming message stream coming from the client has -/// messages of type \a R, and the outgoing message streaming coming from -/// the server has messages of type \a W. -template <class W, class R> -class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { - public: - /// See the \a ServerStreamingInterface.SendInitialMetadata method - /// for semantics. Note that initial metadata will be affected by the - /// \a ServerContext associated with this call. - void SendInitialMetadata() override { body_.SendInitialMetadata(); } - - bool NextMessageSize(uint32_t* sz) override { - return body_.NextMessageSize(sz); - } - - bool Read(R* msg) override { return body_.Read(msg); } - - /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) - /// method for semantics. - /// Side effect: - /// Also sends initial metadata if not already sent (using the \a - /// ServerContext associated with this call). - using internal::WriterInterface<W>::Write; - bool Write(const W& msg, ::grpc::WriteOptions options) override { - return body_.Write(msg, options); - } - - private: - internal::ServerReaderWriterBody<W, R> body_; - - friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, - false>; - ServerReaderWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) - : body_(call, ctx) {} -}; - -/// A class to represent a flow-controlled unary call. This is something -/// of a hybrid between conventional unary and streaming. This is invoked -/// through a unary call on the client side, but the server responds to it -/// as though it were a single-ping-pong streaming call. The server can use -/// the \a NextMessageSize method to determine an upper-bound on the size of -/// the message. A key difference relative to streaming: ServerUnaryStreamer -/// must have exactly 1 Read and exactly 1 Write, in that order, to function -/// correctly. Otherwise, the RPC is in error. +class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + bool Read(R* msg) override { return body_.Read(msg); } + + /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) + /// method for semantics. + /// Side effect: + /// Also sends initial metadata if not already sent (using the \a + /// ServerContext associated with this call). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, ::grpc::WriteOptions options) override { + return body_.Write(msg, options); + } + + private: + internal::ServerReaderWriterBody<W, R> body_; + + friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, + false>; + ServerReaderWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) + : body_(call, ctx) {} +}; + +/// A class to represent a flow-controlled unary call. This is something +/// of a hybrid between conventional unary and streaming. This is invoked +/// through a unary call on the client side, but the server responds to it +/// as though it were a single-ping-pong streaming call. The server can use +/// the \a NextMessageSize method to determine an upper-bound on the size of +/// the message. A key difference relative to streaming: ServerUnaryStreamer +/// must have exactly 1 Read and exactly 1 Write, in that order, to function +/// correctly. Otherwise, the RPC is in error. template <class RequestType, class ResponseType> -class ServerUnaryStreamer final - : public ServerReaderWriterInterface<ResponseType, RequestType> { - public: - /// Block to send initial metadata to client. - /// Implicit input parameter: - /// - the \a ServerContext associated with this call will be used for - /// sending initial metadata. - void SendInitialMetadata() override { body_.SendInitialMetadata(); } - - /// Get an upper bound on the request message size from the client. - bool NextMessageSize(uint32_t* sz) override { - return body_.NextMessageSize(sz); - } - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// This is thread-safe with respect to \a Write or \a WritesDone methods. It - /// should not be called concurrently with other streaming APIs - /// on the same stream. It is not meaningful to call it concurrently - /// with another \a ReaderInterface::Read on the same stream since reads on - /// the same stream are delivered in order. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - bool Read(RequestType* request) override { - if (read_done_) { - return false; - } - read_done_ = true; - return body_.Read(request); - } - - /// Block to write \a msg to the stream with WriteOptions \a options. - /// This is thread-safe with respect to \a ReaderInterface::Read - /// - /// \param msg The message to be written to the stream. - /// \param options The WriteOptions affecting the write operation. - /// - /// \return \a true on success, \a false when the stream has been closed. - using internal::WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - ::grpc::WriteOptions options) override { - if (write_done_ || !read_done_) { - return false; - } - write_done_ = true; - return body_.Write(response, options); - } - - private: - internal::ServerReaderWriterBody<ResponseType, RequestType> body_; - bool read_done_; - bool write_done_; - - friend class internal::TemplatedBidiStreamingHandler< - ServerUnaryStreamer<RequestType, ResponseType>, true>; - ServerUnaryStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) - : body_(call, ctx), read_done_(false), write_done_(false) {} -}; - -/// A class to represent a flow-controlled server-side streaming call. -/// This is something of a hybrid between server-side and bidi streaming. -/// This is invoked through a server-side streaming call on the client side, -/// but the server responds to it as though it were a bidi streaming call that -/// must first have exactly 1 Read and then any number of Writes. +class ServerUnaryStreamer final + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + /// Block to send initial metadata to client. + /// Implicit input parameter: + /// - the \a ServerContext associated with this call will be used for + /// sending initial metadata. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + /// Get an upper bound on the request message size from the client. + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a ReaderInterface::Read on the same stream since reads on + /// the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + bool Read(RequestType* request) override { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + using internal::WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, + ::grpc::WriteOptions options) override { + if (write_done_ || !read_done_) { + return false; + } + write_done_ = true; + return body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; + bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>; + ServerUnaryStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} +}; + +/// A class to represent a flow-controlled server-side streaming call. +/// This is something of a hybrid between server-side and bidi streaming. +/// This is invoked through a server-side streaming call on the client side, +/// but the server responds to it as though it were a bidi streaming call that +/// must first have exactly 1 Read and then any number of Writes. template <class RequestType, class ResponseType> -class ServerSplitStreamer final - : public ServerReaderWriterInterface<ResponseType, RequestType> { - public: - /// Block to send initial metadata to client. - /// Implicit input parameter: - /// - the \a ServerContext associated with this call will be used for - /// sending initial metadata. - void SendInitialMetadata() override { body_.SendInitialMetadata(); } - - /// Get an upper bound on the request message size from the client. - bool NextMessageSize(uint32_t* sz) override { - return body_.NextMessageSize(sz); - } - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// This is thread-safe with respect to \a Write or \a WritesDone methods. It - /// should not be called concurrently with other streaming APIs - /// on the same stream. It is not meaningful to call it concurrently - /// with another \a ReaderInterface::Read on the same stream since reads on - /// the same stream are delivered in order. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - bool Read(RequestType* request) override { - if (read_done_) { - return false; - } - read_done_ = true; - return body_.Read(request); - } - - /// Block to write \a msg to the stream with WriteOptions \a options. - /// This is thread-safe with respect to \a ReaderInterface::Read - /// - /// \param msg The message to be written to the stream. - /// \param options The WriteOptions affecting the write operation. - /// - /// \return \a true on success, \a false when the stream has been closed. - using internal::WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - ::grpc::WriteOptions options) override { - return read_done_ && body_.Write(response, options); - } - - private: - internal::ServerReaderWriterBody<ResponseType, RequestType> body_; - bool read_done_; - - friend class internal::TemplatedBidiStreamingHandler< - ServerSplitStreamer<RequestType, ResponseType>, false>; - ServerSplitStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) - : body_(call, ctx), read_done_(false) {} -}; - +class ServerSplitStreamer final + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + /// Block to send initial metadata to client. + /// Implicit input parameter: + /// - the \a ServerContext associated with this call will be used for + /// sending initial metadata. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + /// Get an upper bound on the request message size from the client. + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a ReaderInterface::Read on the same stream since reads on + /// the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + bool Read(RequestType* request) override { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + using internal::WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, + ::grpc::WriteOptions options) override { + return read_done_ && body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>; + ServerSplitStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} +}; + } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H |