diff options
author | leonidlazarev <leonidlazarev@yandex-team.com> | 2023-06-02 15:07:38 +0300 |
---|---|---|
committer | leonidlazarev <leonidlazarev@yandex-team.com> | 2023-06-02 15:07:38 +0300 |
commit | 59e0045a61e61c2ac38878f2adc7ec91ca914cc1 (patch) | |
tree | b6b64c51025630a7d40923d4aa7b7b882e1a8848 /contrib/libs/grpc/include | |
parent | 7506c5c295065b4360d617393af34203f037946d (diff) | |
download | ydb-59e0045a61e61c2ac38878f2adc7ec91ca914cc1.tar.gz |
feat grpc: update to grpc 1.50.2
update grpc to 1.50.2
update grpcio to 1.50.0
Удаленные патчи:
06-flow_control.patch - логика в upstream удалена
10-fix-crash-on-fork.patch - логика в upstream удалена
12-coverity-fix.patch - логика в upstream удалена
20-P2166-string-nullptr.patch - в upstream временный объект вместо nullptr
PR29209-fix-heap-use-after-free.patch - решение есть в upstream
Добавленные патчи:
pr33085_fix_epoll1_engine_reinit.patch
21-windows_build.patch
Diffstat (limited to 'contrib/libs/grpc/include')
66 files changed, 3214 insertions, 2341 deletions
diff --git a/contrib/libs/grpc/include/grpc/compression.h b/contrib/libs/grpc/include/grpc/compression.h index 176956642a..2c39c6b3af 100644 --- a/contrib/libs/grpc/include/grpc/compression.h +++ b/contrib/libs/grpc/include/grpc/compression.h @@ -23,7 +23,7 @@ #include <stdlib.h> -#include <grpc/impl/codegen/compression_types.h> +#include <grpc/impl/codegen/compression_types.h> // IWYU pragma: export #include <grpc/slice.h> #ifdef __cplusplus diff --git a/contrib/libs/grpc/include/grpc/event_engine/endpoint_config.h b/contrib/libs/grpc/include/grpc/event_engine/endpoint_config.h index f11f5fd6c3..89dd7fc56f 100644 --- a/contrib/libs/grpc/include/grpc/event_engine/endpoint_config.h +++ b/contrib/libs/grpc/include/grpc/event_engine/endpoint_config.h @@ -20,7 +20,7 @@ #include <util/string/cast.h> #include "y_absl/strings/string_view.h" -#include "y_absl/types/variant.h" +#include "y_absl/types/optional.h" namespace grpc_event_engine { namespace experimental { @@ -32,10 +32,16 @@ namespace experimental { class EndpointConfig { public: virtual ~EndpointConfig() = default; - using Setting = y_absl::variant<y_absl::monostate, int, y_absl::string_view, void*>; - /// Returns the Setting for a specified key, or \a y_absl::monostate if there is - /// no such entry. Caller does not take ownership of the resulting value. - virtual Setting Get(y_absl::string_view key) const = 0; + // If the key points to an integer config, an integer value gets returned. + // Otherwise it returns an y_absl::nullopt_t + virtual y_absl::optional<int> GetInt(y_absl::string_view key) const = 0; + // If the key points to an string config, an string value gets returned. + // Otherwise it returns an y_absl::nullopt_t + virtual y_absl::optional<y_absl::string_view> GetString( + y_absl::string_view key) const = 0; + // If the key points to an void* config, a void* pointer value gets returned. + // Otherwise it returns nullptr + virtual void* GetVoidPointer(y_absl::string_view key) const = 0; }; } // namespace experimental diff --git a/contrib/libs/grpc/include/grpc/event_engine/event_engine.h b/contrib/libs/grpc/include/grpc/event_engine/event_engine.h index d38ec7290a..c30a2c01dc 100644 --- a/contrib/libs/grpc/include/grpc/event_engine/event_engine.h +++ b/contrib/libs/grpc/include/grpc/event_engine/event_engine.h @@ -19,15 +19,16 @@ #include <functional> #include <vector> +#include "y_absl/functional/any_invocable.h" #include "y_absl/status/status.h" #include "y_absl/status/statusor.h" -#include "y_absl/time/time.h" #include <grpc/event_engine/endpoint_config.h> #include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/port.h> +#include <grpc/event_engine/slice_buffer.h> -// TODO(hork): Define the Endpoint::Write metrics collection system +// TODO(vigneshbabu): Define the Endpoint::Write metrics collection system namespace grpc_event_engine { namespace experimental { @@ -71,8 +72,13 @@ namespace experimental { /// server->Wait(); /// //////////////////////////////////////////////////////////////////////////////// -class EventEngine { +class EventEngine : public std::enable_shared_from_this<EventEngine> { public: + /// A duration between two events. + /// + /// Throughout the EventEngine API durations are used to express how long + /// until an action should be performed. + using Duration = std::chrono::duration<int64_t, std::nano>; /// A custom closure type for EventEngine task execution. /// /// Throughout the EventEngine API, \a Closure ownership is retained by the @@ -138,6 +144,18 @@ class EventEngine { /// Shuts down all connections and invokes all pending read or write /// callbacks with an error status. virtual ~Endpoint() = default; + /// A struct representing optional arguments that may be provided to an + /// EventEngine Endpoint Read API call. + /// + /// Passed as argument to an Endpoint \a Read + struct ReadArgs { + // A suggestion to the endpoint implementation to read at-least the + // specified number of bytes over the network connection before marking + // the endpoint read operation as complete. gRPC may use this argument + // to minimize the number of endpoint read API calls over the lifetime + // of a connection. + int64_t read_hint_bytes; + }; /// Reads data from the Endpoint. /// /// When data is available on the connection, that data is moved into the @@ -155,8 +173,22 @@ class EventEngine { /// For failed read operations, implementations should pass the appropriate /// statuses to \a on_read. For example, callbacks might expect to receive /// CANCELLED on endpoint shutdown. - virtual void Read(std::function<void(y_absl::Status)> on_read, - SliceBuffer* buffer) = 0; + virtual void Read(y_absl::AnyInvocable<void(y_absl::Status)> on_read, + SliceBuffer* buffer, const ReadArgs* args) = 0; + /// A struct representing optional arguments that may be provided to an + /// EventEngine Endpoint Write API call. + /// + /// Passed as argument to an Endpoint \a Write + struct WriteArgs { + // Represents private information that may be passed by gRPC for + // select endpoints expected to be used only within google. + void* google_specific = nullptr; + // A suggestion to the endpoint implementation to group data to be written + // into frames of the specified max_frame_size. gRPC may use this + // argument to dynamically control the max sizes of frames sent to a + // receiver in response to high receiver memory pressure. + int64_t max_frame_size; + }; /// Writes data out on the connection. /// /// \a on_writable is called when the connection is ready for more data. The @@ -175,8 +207,8 @@ class EventEngine { /// For failed write operations, implementations should pass the appropriate /// statuses to \a on_writable. For example, callbacks might expect to /// receive CANCELLED on endpoint shutdown. - virtual void Write(std::function<void(y_absl::Status)> on_writable, - SliceBuffer* data) = 0; + virtual void Write(y_absl::AnyInvocable<void(y_absl::Status)> on_writable, + SliceBuffer* data, const WriteArgs* args) = 0; /// Returns an address in the format described in DNSResolver. The returned /// values are expected to remain valid for the life of the Endpoint. virtual const ResolvedAddress& GetPeerAddress() const = 0; @@ -190,14 +222,14 @@ class EventEngine { /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or /// CANCELLED statuses on EventEngine shutdown. using OnConnectCallback = - std::function<void(y_absl::StatusOr<std::unique_ptr<Endpoint>>)>; + y_absl::AnyInvocable<void(y_absl::StatusOr<std::unique_ptr<Endpoint>>)>; /// Listens for incoming connection requests from gRPC clients and initiates /// request processing once connections are established. class Listener { public: /// Called when the listener has accepted a new client connection. - using AcceptCallback = std::function<void( + using AcceptCallback = y_absl::AnyInvocable<void( std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>; virtual ~Listener() = default; /// Bind an address/port to this Listener. @@ -225,7 +257,7 @@ class EventEngine { /// MemoryAllocators for Endpoint construction. virtual y_absl::StatusOr<std::unique_ptr<Listener>> CreateListener( Listener::AcceptCallback on_accept, - std::function<void(y_absl::Status)> on_shutdown, + y_absl::AnyInvocable<void(y_absl::Status)> on_shutdown, const EndpointConfig& config, std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0; /// Creates a client network connection to a remote network listener. @@ -243,7 +275,7 @@ class EventEngine { const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, - y_absl::Time deadline) = 0; + Duration timeout) = 0; /// Request cancellation of a connection attempt. /// @@ -259,7 +291,13 @@ class EventEngine { public: /// Task handle for DNS Resolution requests. struct LookupTaskHandle { - intptr_t key[2]; + intptr_t keys[2]; + }; + /// Optional configuration for DNSResolvers. + struct ResolverOptions { + /// If empty, default DNS servers will be used. + /// Must be in the "IP:port" format as described in naming.md. + TString dns_server; }; /// DNS SRV record type. struct SRVRecord { @@ -271,12 +309,13 @@ class EventEngine { /// Called with the collection of sockaddrs that were resolved from a given /// target address. using LookupHostnameCallback = - std::function<void(y_absl::StatusOr<std::vector<ResolvedAddress>>)>; + y_absl::AnyInvocable<void(y_absl::StatusOr<std::vector<ResolvedAddress>>)>; /// Called with a collection of SRV records. using LookupSRVCallback = - std::function<void(y_absl::StatusOr<std::vector<SRVRecord>>)>; + y_absl::AnyInvocable<void(y_absl::StatusOr<std::vector<SRVRecord>>)>; /// Called with the result of a TXT record lookup - using LookupTXTCallback = std::function<void(y_absl::StatusOr<TString>)>; + using LookupTXTCallback = + y_absl::AnyInvocable<void(y_absl::StatusOr<TString>)>; virtual ~DNSResolver() = default; @@ -293,23 +332,23 @@ class EventEngine { /// /// If cancelled, \a on_resolve will not be executed. virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, - y_absl::string_view address, + y_absl::string_view name, y_absl::string_view default_port, - y_absl::Time deadline) = 0; + Duration timeout) = 0; /// Asynchronously perform an SRV record lookup. /// /// \a on_resolve has the same meaning and expectations as \a /// LookupHostname's \a on_resolve callback. virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, y_absl::string_view name, - y_absl::Time deadline) = 0; + Duration timeout) = 0; /// Asynchronously perform a TXT record lookup. /// /// \a on_resolve has the same meaning and expectations as \a /// LookupHostname's \a on_resolve callback. virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, y_absl::string_view name, - y_absl::Time deadline) = 0; + Duration timeout) = 0; /// Cancel an asynchronous lookup operation. /// /// This shares the same semantics with \a EventEngine::Cancel: successfully @@ -331,8 +370,10 @@ class EventEngine { // de-experimentalize this API. virtual bool IsWorkerThread() = 0; - /// Creates and returns an instance of a DNSResolver. - virtual std::unique_ptr<DNSResolver> GetDNSResolver() = 0; + /// Creates and returns an instance of a DNSResolver, optionally configured by + /// the \a options struct. + virtual std::unique_ptr<DNSResolver> GetDNSResolver( + const DNSResolver::ResolverOptions& options) = 0; /// Asynchronously executes a task as soon as possible. /// @@ -342,39 +383,45 @@ class EventEngine { /// Asynchronously executes a task as soon as possible. /// /// \a Closures scheduled with \a Run cannot be cancelled. Unlike the - /// overloaded \a Closure alternative, the std::function version's \a closure - /// will be deleted by the EventEngine after the closure has been run. + /// overloaded \a Closure alternative, the y_absl::AnyInvocable version's \a + /// closure will be deleted by the EventEngine after the closure has been run. /// /// This version of \a Run may be less performant than the \a Closure version /// in some scenarios. This overload is useful in situations where performance /// is not a critical concern. - virtual void Run(std::function<void()> closure) = 0; - /// Synonymous with scheduling an alarm to run at time \a when. + virtual void Run(y_absl::AnyInvocable<void()> closure) = 0; + /// Synonymous with scheduling an alarm to run after duration \a when. /// /// The \a closure will execute when time \a when arrives unless it has been /// cancelled via the \a Cancel method. If cancelled, the closure will not be /// run, nor will it be deleted. Ownership remains with the caller. - virtual TaskHandle RunAt(y_absl::Time when, Closure* closure) = 0; - /// Synonymous with scheduling an alarm to run at time \a when. + virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0; + /// Synonymous with scheduling an alarm to run after duration \a when. /// /// The \a closure will execute when time \a when arrives unless it has been /// cancelled via the \a Cancel method. If cancelled, the closure will not be - /// run. Unilke the overloaded \a Closure alternative, the std::function + /// run. Unilke the overloaded \a Closure alternative, the y_absl::AnyInvocable /// version's \a closure will be deleted by the EventEngine after the closure /// has been run, or upon cancellation. /// - /// This version of \a RunAt may be less performant than the \a Closure + /// This version of \a RunAfter may be less performant than the \a Closure /// version in some scenarios. This overload is useful in situations where /// performance is not a critical concern. - virtual TaskHandle RunAt(y_absl::Time when, std::function<void()> closure) = 0; + virtual TaskHandle RunAfter(Duration when, + y_absl::AnyInvocable<void()> closure) = 0; /// Request cancellation of a task. /// /// If the associated closure has already been scheduled to run, it will not /// be cancelled, and this function will return false. /// /// If the associated callback has not been scheduled to run, it will be - /// cancelled, and the associated std::function or \a Closure* will not be - /// executed. In this case, Cancel will return true. + /// cancelled, and the associated y_absl::AnyInvocable or \a Closure* will not + /// be executed. In this case, Cancel will return true. + /// + /// Implementation note: closures should be destroyed in a timely manner after + /// execution or cancelliation (milliseconds), since any state bound to the + /// closure may need to be destroyed for things to progress (e.g., if a + /// closure holds a ref to some ref-counted object). virtual bool Cancel(TaskHandle handle) = 0; }; @@ -388,7 +435,7 @@ class EventEngine { /// created, applications must set a custom EventEngine factory method *before* /// grpc is initialized. void SetDefaultEventEngineFactory( - const std::function<std::unique_ptr<EventEngine>()>* factory); + y_absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory); /// Create an EventEngine using the default factory. std::unique_ptr<EventEngine> CreateEventEngine(); diff --git a/contrib/libs/grpc/include/grpc/event_engine/memory_allocator.h b/contrib/libs/grpc/include/grpc/event_engine/memory_allocator.h index 0f6c68d7ae..42fb9c040c 100644 --- a/contrib/libs/grpc/include/grpc/event_engine/memory_allocator.h +++ b/contrib/libs/grpc/include/grpc/event_engine/memory_allocator.h @@ -26,24 +26,9 @@ #include <grpc/event_engine/internal/memory_allocator_impl.h> #include <grpc/slice.h> -// forward-declaring an internal struct, not used publicly. -struct grpc_slice_buffer; - namespace grpc_event_engine { namespace experimental { -// TODO(nnoble): needs implementation -class SliceBuffer { - public: - SliceBuffer() { abort(); } - explicit SliceBuffer(grpc_slice_buffer*) { abort(); } - - grpc_slice_buffer* RawSliceBuffer() { return slice_buffer_; } - - private: - grpc_slice_buffer* slice_buffer_; -}; - // Tracks memory allocated by one system. // Is effectively a thin wrapper/smart pointer for a MemoryAllocatorImpl, // providing a convenient and stable API. diff --git a/contrib/libs/grpc/include/grpc/event_engine/port.h b/contrib/libs/grpc/include/grpc/event_engine/port.h index 942458646a..ea2d68e1e2 100644 --- a/contrib/libs/grpc/include/grpc/event_engine/port.h +++ b/contrib/libs/grpc/include/grpc/event_engine/port.h @@ -20,7 +20,7 @@ #if defined(GPR_ANDROID) || defined(GPR_LINUX) || defined(GPR_APPLE) || \ defined(GPR_FREEBSD) || defined(GPR_OPENBSD) || defined(GPR_SOLARIS) || \ defined(GPR_AIX) || defined(GPR_NACL) || defined(GPR_FUCHSIA) || \ - defined(GRPC_POSIX_SOCKET) + defined(GRPC_POSIX_SOCKET) || defined(GPR_NETBSD) #define GRPC_EVENT_ENGINE_POSIX #include <arpa/inet.h> #include <netdb.h> @@ -31,6 +31,13 @@ #include <winsock2.h> #include <ws2tcpip.h> // must be included after the above + +// For some reason OPTIONAL is not defined and build for window is not OK +// Let, define it here as empty +#ifndef OPTIONAL +#define OPTIONAL +#endif + #include <mswsock.h> #else #error UNKNOWN PLATFORM diff --git a/contrib/libs/grpc/include/grpc/event_engine/slice.h b/contrib/libs/grpc/include/grpc/event_engine/slice.h new file mode 100644 index 0000000000..39531d907b --- /dev/null +++ b/contrib/libs/grpc/include/grpc/event_engine/slice.h @@ -0,0 +1,287 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_EVENT_ENGINE_SLICE_H +#define GRPC_EVENT_ENGINE_SLICE_H + +#include <grpc/support/port_platform.h> + +#include <string.h> + +#include <cstdint> +#include <util/generic/string.h> +#include <util/string/cast.h> +#include <utility> + +#include "y_absl/strings/string_view.h" + +#include <grpc/slice.h> +#include <grpc/support/log.h> + +// This public slice definition largely based of the internal grpc_core::Slice +// implementation. Changes to this implementation might warrant changes to the +// internal grpc_core::Slice type as well. + +namespace grpc_event_engine { +namespace experimental { + +// Forward declarations +class Slice; +class MutableSlice; + +namespace slice_detail { + +// Returns an empty slice. +static constexpr grpc_slice EmptySlice() { return {nullptr, {}}; } + +// BaseSlice holds the grpc_slice object, but does not apply refcounting policy. +// It does export immutable access into the slice, such that this can be shared +// by all storage policies. +class BaseSlice { + public: + BaseSlice(const BaseSlice&) = delete; + BaseSlice& operator=(const BaseSlice&) = delete; + BaseSlice(BaseSlice&& other) = delete; + BaseSlice& operator=(BaseSlice&& other) = delete; + + // Iterator access to the underlying bytes + const uint8_t* begin() const { return GRPC_SLICE_START_PTR(c_slice()); } + const uint8_t* end() const { return GRPC_SLICE_END_PTR(c_slice()); } + const uint8_t* cbegin() const { return GRPC_SLICE_START_PTR(c_slice()); } + const uint8_t* cend() const { return GRPC_SLICE_END_PTR(c_slice()); } + + // Retrieve a borrowed reference to the underlying grpc_slice. + const grpc_slice& c_slice() const { return slice_; } + + // Retrieve the underlying grpc_slice, and replace the one in this object with + // EmptySlice(). + grpc_slice TakeCSlice() { + grpc_slice out = slice_; + slice_ = EmptySlice(); + return out; + } + + // As other things... borrowed references. + y_absl::string_view as_string_view() const { + return y_absl::string_view(reinterpret_cast<const char*>(data()), size()); + } + + // Array access + uint8_t operator[](size_t i) const { + return GRPC_SLICE_START_PTR(c_slice())[i]; + } + + // Access underlying data + const uint8_t* data() const { return GRPC_SLICE_START_PTR(c_slice()); } + + // Size of the slice + size_t size() const { return GRPC_SLICE_LENGTH(c_slice()); } + size_t length() const { return size(); } + bool empty() const { return size() == 0; } + + // For inlined slices - are these two slices equal? + // For non-inlined slices - do these two slices refer to the same block of + // memory? + bool is_equivalent(const BaseSlice& other) const { + return grpc_slice_is_equivalent(slice_, other.slice_); + } + + uint32_t Hash() const; + + protected: + BaseSlice() : slice_(EmptySlice()) {} + explicit BaseSlice(const grpc_slice& slice) : slice_(slice) {} + ~BaseSlice() = default; + + void Swap(BaseSlice* other) { std::swap(slice_, other->slice_); } + void SetCSlice(const grpc_slice& slice) { slice_ = slice; } + + uint8_t* mutable_data() { return GRPC_SLICE_START_PTR(slice_); } + + grpc_slice* c_slice_ptr() { return &slice_; } + + private: + grpc_slice slice_; +}; + +inline bool operator==(const BaseSlice& a, const BaseSlice& b) { + return grpc_slice_eq(a.c_slice(), b.c_slice()) != 0; +} + +inline bool operator!=(const BaseSlice& a, const BaseSlice& b) { + return grpc_slice_eq(a.c_slice(), b.c_slice()) == 0; +} + +inline bool operator==(const BaseSlice& a, y_absl::string_view b) { + return a.as_string_view() == b; +} + +inline bool operator!=(const BaseSlice& a, y_absl::string_view b) { + return a.as_string_view() != b; +} + +inline bool operator==(y_absl::string_view a, const BaseSlice& b) { + return a == b.as_string_view(); +} + +inline bool operator!=(y_absl::string_view a, const BaseSlice& b) { + return a != b.as_string_view(); +} + +inline bool operator==(const BaseSlice& a, const grpc_slice& b) { + return grpc_slice_eq(a.c_slice(), b) != 0; +} + +inline bool operator!=(const BaseSlice& a, const grpc_slice& b) { + return grpc_slice_eq(a.c_slice(), b) == 0; +} + +inline bool operator==(const grpc_slice& a, const BaseSlice& b) { + return grpc_slice_eq(a, b.c_slice()) != 0; +} + +inline bool operator!=(const grpc_slice& a, const BaseSlice& b) { + return grpc_slice_eq(a, b.c_slice()) == 0; +} + +template <typename Out> +struct CopyConstructors { + static Out FromCopiedString(const char* s) { + return FromCopiedBuffer(s, strlen(s)); + } + static Out FromCopiedString(y_absl::string_view s) { + return FromCopiedBuffer(s.data(), s.size()); + } + static Out FromCopiedString(TString s); + + static Out FromCopiedBuffer(const char* p, size_t len) { + return Out(grpc_slice_from_copied_buffer(p, len)); + } + + template <typename Buffer> + static Out FromCopiedBuffer(const Buffer& buffer) { + return FromCopiedBuffer(reinterpret_cast<const char*>(buffer.data()), + buffer.size()); + } +}; + +} // namespace slice_detail + +class MutableSlice : public slice_detail::BaseSlice, + public slice_detail::CopyConstructors<MutableSlice> { + public: + MutableSlice() = default; + explicit MutableSlice(const grpc_slice& slice); + ~MutableSlice(); + + MutableSlice(const MutableSlice&) = delete; + MutableSlice& operator=(const MutableSlice&) = delete; + MutableSlice(MutableSlice&& other) noexcept + : slice_detail::BaseSlice(other.TakeCSlice()) {} + MutableSlice& operator=(MutableSlice&& other) noexcept { + Swap(&other); + return *this; + } + + static MutableSlice CreateUninitialized(size_t length) { + return MutableSlice(grpc_slice_malloc(length)); + } + + // Return a sub slice of this one. Leaves this slice in an indeterminate but + // valid state. + MutableSlice TakeSubSlice(size_t pos, size_t n) { + return MutableSlice(grpc_slice_sub_no_ref(TakeCSlice(), pos, pos + n)); + } + + // Iterator access to the underlying bytes + uint8_t* begin() { return mutable_data(); } + uint8_t* end() { return mutable_data() + size(); } + uint8_t* data() { return mutable_data(); } + + // Array access + uint8_t& operator[](size_t i) { return mutable_data()[i]; } +}; + +class Slice : public slice_detail::BaseSlice, + public slice_detail::CopyConstructors<Slice> { + public: + Slice() = default; + ~Slice(); + explicit Slice(const grpc_slice& slice) : slice_detail::BaseSlice(slice) {} + explicit Slice(slice_detail::BaseSlice&& other) + : slice_detail::BaseSlice(other.TakeCSlice()) {} + + Slice(const Slice&) = delete; + Slice& operator=(const Slice&) = delete; + Slice(Slice&& other) noexcept : slice_detail::BaseSlice(other.TakeCSlice()) {} + Slice& operator=(Slice&& other) noexcept { + Swap(&other); + return *this; + } + + // A slice might refer to some memory that we keep a refcount to (this is + // owned), or some memory that's inlined into the slice (also owned), or some + // other block of memory that we know will be available for the lifetime of + // some operation in the common case (not owned). In the *less common* case + // that we need to keep that slice text for longer than our API's guarantee us + // access, we need to take a copy and turn this into something that we do own. + + // TakeOwned returns an owned slice regardless of current ownership, and + // leaves the current slice in a valid but externally unpredictable state - in + // doing so it can avoid adding a ref to the underlying slice. + Slice TakeOwned(); + + // AsOwned returns an owned slice but does not mutate the current slice, + // meaning that it may add a reference to the underlying slice. + Slice AsOwned() const; + + // TakeMutable returns a MutableSlice, and leaves the current slice in an + // indeterminate but valid state. + // A mutable slice requires only one reference to the bytes of the slice - + // this can be achieved either with inlined storage or with a single + // reference. + // If the current slice is refcounted and there are more than one references + // to that slice, then the slice is copied in order to achieve a mutable + // version. + MutableSlice TakeMutable(); + + // Return a sub slice of this one. Leaves this slice in an indeterminate but + // valid state. + Slice TakeSubSlice(size_t pos, size_t n) { + return Slice(grpc_slice_sub_no_ref(TakeCSlice(), pos, pos + n)); + } + + // Return a sub slice of this one. Adds a reference to the underlying slice. + Slice RefSubSlice(size_t pos, size_t n) const { + return Slice(grpc_slice_sub(c_slice(), pos, pos + n)); + } + + // Split this slice, returning a new slice containing (split:end] and + // leaving this slice with [begin:split). + Slice Split(size_t split) { + return Slice(grpc_slice_split_tail(c_slice_ptr(), split)); + } + + Slice Ref() const; + + Slice Copy() const { return Slice(grpc_slice_copy(c_slice())); } + + static Slice FromRefcountAndBytes(grpc_slice_refcount* r, + const uint8_t* begin, const uint8_t* end); +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_EVENT_ENGINE_SLICE_H diff --git a/contrib/libs/grpc/include/grpc/event_engine/slice_buffer.h b/contrib/libs/grpc/include/grpc/event_engine/slice_buffer.h new file mode 100644 index 0000000000..b1430f6733 --- /dev/null +++ b/contrib/libs/grpc/include/grpc/event_engine/slice_buffer.h @@ -0,0 +1,119 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_EVENT_ENGINE_SLICE_BUFFER_H +#define GRPC_EVENT_ENGINE_SLICE_BUFFER_H + +#include <grpc/support/port_platform.h> + +#include <string.h> + +#include <cstdint> +#include <util/generic/string.h> +#include <util/string/cast.h> + +#include "y_absl/strings/string_view.h" +#include "y_absl/utility/utility.h" + +#include <grpc/event_engine/slice.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> +#include <grpc/support/log.h> + +namespace grpc_event_engine { +namespace experimental { + +/// A Wrapper around \a grpc_slice_buffer pointer. +/// +/// A slice buffer holds the memory for a collection of slices. +/// The SliceBuffer object itself is meant to only hide the C-style API, +/// and won't hold the data itself. In terms of lifespan, the +/// grpc_slice_buffer ought to be kept somewhere inside the caller's objects, +/// like a transport or an endpoint. +/// +/// This lifespan rule is likely to change in the future, as we may +/// collapse the grpc_slice_buffer structure straight into this class. +/// +/// The SliceBuffer API is basically a replica of the grpc_slice_buffer's, +/// and its documentation will move here once we remove the C structure, +/// which should happen before the Event Engine's API is no longer +/// an experimental API. +class SliceBuffer { + public: + explicit SliceBuffer() { grpc_slice_buffer_init(&slice_buffer_); } + SliceBuffer(const SliceBuffer& other) = delete; + SliceBuffer(SliceBuffer&& other) noexcept + : slice_buffer_(other.slice_buffer_) { + grpc_slice_buffer_init(&slice_buffer_); + grpc_slice_buffer_swap(&slice_buffer_, &other.slice_buffer_); + } + /// Upon destruction, the underlying raw slice buffer is cleaned out and all + /// slices are unreffed. + ~SliceBuffer() { grpc_slice_buffer_destroy(&slice_buffer_); } + + SliceBuffer& operator=(const SliceBuffer&) = delete; + SliceBuffer& operator=(SliceBuffer&& other) noexcept { + grpc_slice_buffer_swap(&slice_buffer_, &other.slice_buffer_); + return *this; + } + + /// Appends a new slice into the SliceBuffer and makes an attempt to merge + /// this slice with the last slice in the SliceBuffer. + void Append(Slice slice); + + /// Adds a new slice into the SliceBuffer at the next available index. + /// Returns the index at which the new slice is added. + size_t AppendIndexed(Slice slice); + + /// Returns the number of slices held by the SliceBuffer. + size_t Count() { return slice_buffer_.count; } + + /// Removes/deletes the last n bytes in the SliceBuffer. + void RemoveLastNBytes(size_t n) { + grpc_slice_buffer_trim_end(&slice_buffer_, n, nullptr); + } + + /// Move the first n bytes of the SliceBuffer into a memory pointed to by dst. + void MoveFirstNBytesIntoBuffer(size_t n, void* dst) { + grpc_slice_buffer_move_first_into_buffer(&slice_buffer_, n, dst); + } + + /// Removes and unrefs all slices in the SliceBuffer. + void Clear() { grpc_slice_buffer_reset_and_unref(&slice_buffer_); } + + /// Removes the first slice in the SliceBuffer and returns it. + Slice TakeFirst(); + + /// Prepends the slice to the the front of the SliceBuffer. + void Prepend(Slice slice); + + /// Increased the ref-count of slice at the specified index and returns the + /// associated slice. + Slice RefSlice(size_t index); + + /// The total number of bytes held by the SliceBuffer + size_t Length() { return slice_buffer_.length; } + + /// Return a pointer to the back raw grpc_slice_buffer + grpc_slice_buffer* c_slice_buffer() { return &slice_buffer_; } + + private: + /// The backing raw slice buffer. + grpc_slice_buffer slice_buffer_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_EVENT_ENGINE_SLICE_BUFFER_H diff --git a/contrib/libs/grpc/include/grpc/grpc.h b/contrib/libs/grpc/include/grpc/grpc.h index 9a00711794..56b1552e63 100644 --- a/contrib/libs/grpc/include/grpc/grpc.h +++ b/contrib/libs/grpc/include/grpc/grpc.h @@ -24,8 +24,8 @@ #include <stddef.h> #include <grpc/byte_buffer.h> -#include <grpc/impl/codegen/connectivity_state.h> -#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/impl/codegen/connectivity_state.h> // IWYU pragma: export +#include <grpc/impl/codegen/grpc_types.h> // IWYU pragma: export #include <grpc/impl/codegen/propagation_bits.h> #include <grpc/slice.h> #include <grpc/status.h> @@ -545,7 +545,7 @@ GRPCAPI void grpc_resource_quota_set_max_threads( /** EXPERIMENTAL. Dumps xDS configs as a serialized ClientConfig proto. The full name of the proto is envoy.service.status.v3.ClientConfig. */ -GRPCAPI grpc_slice grpc_dump_xds_configs(); +GRPCAPI grpc_slice grpc_dump_xds_configs(void); /** Fetch a vtable for a grpc_channel_arg that points to a grpc_resource_quota */ diff --git a/contrib/libs/grpc/include/grpc/grpc_security.h b/contrib/libs/grpc/include/grpc/grpc_security.h index a90904e7d6..63810f6fa9 100644 --- a/contrib/libs/grpc/include/grpc/grpc_security.h +++ b/contrib/libs/grpc/include/grpc/grpc_security.h @@ -1050,6 +1050,17 @@ grpc_tls_certificate_verifier* grpc_tls_certificate_verifier_external_create( /** * EXPERIMENTAL API - Subject to change * + * Factory function for an internal verifier that won't perform any + * post-handshake verification. Note: using this solely without any other + * authentication mechanisms on the peer identity will leave your applications + * to the MITM(Man-In-The-Middle) attacks. Users should avoid doing so in + * production environments. + */ +grpc_tls_certificate_verifier* grpc_tls_certificate_verifier_no_op_create(); + +/** + * EXPERIMENTAL API - Subject to change + * * Factory function for an internal verifier that will do the default hostname * check. */ diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_atomic.h b/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_atomic.h index 05d6e42cec..ad3f91f0b1 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_atomic.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_atomic.h @@ -33,20 +33,6 @@ typedef intptr_t gpr_atm; #define GPR_ATM_MAX INTPTR_MAX #define GPR_ATM_MIN INTPTR_MIN -#ifdef GPR_LOW_LEVEL_COUNTERS -extern gpr_atm gpr_counter_atm_cas; -extern gpr_atm gpr_counter_atm_add; -#define GPR_ATM_INC_COUNTER(counter) \ - __atomic_fetch_add(&counter, 1, __ATOMIC_RELAXED) -#define GPR_ATM_INC_CAS_THEN(blah) \ - (GPR_ATM_INC_COUNTER(gpr_counter_atm_cas), blah) -#define GPR_ATM_INC_ADD_THEN(blah) \ - (GPR_ATM_INC_COUNTER(gpr_counter_atm_add), blah) -#else -#define GPR_ATM_INC_CAS_THEN(blah) blah -#define GPR_ATM_INC_ADD_THEN(blah) blah -#endif - #define gpr_atm_full_barrier() (__atomic_thread_fence(__ATOMIC_SEQ_CST)) #define gpr_atm_acq_load(p) (__atomic_load_n((p), __ATOMIC_ACQUIRE)) @@ -57,34 +43,39 @@ extern gpr_atm gpr_counter_atm_add; (__atomic_store_n((p), (intptr_t)(value), __ATOMIC_RELAXED)) #define gpr_atm_no_barrier_fetch_add(p, delta) \ - GPR_ATM_INC_ADD_THEN( \ - __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED)) + __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_RELAXED) #define gpr_atm_full_fetch_add(p, delta) \ - GPR_ATM_INC_ADD_THEN( \ - __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL)) + __atomic_fetch_add((p), (intptr_t)(delta), __ATOMIC_ACQ_REL) static __inline int gpr_atm_no_barrier_cas(gpr_atm* p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( - p, &o, n, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + // Need to be c89 compatible, so we can't use false for the fourth argument. + // NOLINTNEXTLINE(modernize-use-bool-literals) + return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); } static __inline int gpr_atm_acq_cas(gpr_atm* p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( - p, &o, n, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + // Need to be c89 compatible, so we can't use false for the fourth argument. + // NOLINTNEXTLINE(modernize-use-bool-literals) + return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED); } static __inline int gpr_atm_rel_cas(gpr_atm* p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( - p, &o, n, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); + // Need to be c89 compatible, so we can't use false for the fourth argument. + // NOLINTNEXTLINE(modernize-use-bool-literals) + return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_RELEASE, + __ATOMIC_RELAXED); } static __inline int gpr_atm_full_cas(gpr_atm* p, gpr_atm o, gpr_atm n) { - return GPR_ATM_INC_CAS_THEN(__atomic_compare_exchange_n( - p, &o, n, 0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)); + // Need to be c89 compatible, so we can't use false for the fourth argument. + // NOLINTNEXTLINE(modernize-use-bool-literals) + return __atomic_compare_exchange_n(p, &o, n, 0, __ATOMIC_ACQ_REL, + __ATOMIC_RELAXED); } -#define gpr_atm_full_xchg(p, n) \ - GPR_ATM_INC_CAS_THEN(__atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL)) +#define gpr_atm_full_xchg(p, n) __atomic_exchange_n((p), (n), __ATOMIC_ACQ_REL) #ifdef __cplusplus } diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_sync.h b/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_sync.h index bdc7a172bc..cf946748d6 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_sync.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/atm_gcc_sync.h @@ -28,8 +28,6 @@ typedef intptr_t gpr_atm; #define GPR_ATM_MAX INTPTR_MAX #define GPR_ATM_MIN INTPTR_MIN -#define GPR_ATM_INC_CAS_THEN(blah) blah -#define GPR_ATM_INC_ADD_THEN(blah) blah #define GPR_ATM_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory") diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/atm_windows.h b/contrib/libs/grpc/include/grpc/impl/codegen/atm_windows.h index 816c9a9c64..39cd839979 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/atm_windows.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/atm_windows.h @@ -29,8 +29,6 @@ typedef intptr_t gpr_atm; #define GPR_ATM_MAX INTPTR_MAX #define GPR_ATM_MIN INTPTR_MIN -#define GPR_ATM_INC_CAS_THEN(blah) blah -#define GPR_ATM_INC_ADD_THEN(blah) blah #define gpr_atm_full_barrier MemoryBarrier diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/compression_types.h b/contrib/libs/grpc/include/grpc/impl/codegen/compression_types.h index 35f49075d0..c187030791 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/compression_types.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/compression_types.h @@ -19,7 +19,8 @@ #ifndef GRPC_IMPL_CODEGEN_COMPRESSION_TYPES_H #define GRPC_IMPL_CODEGEN_COMPRESSION_TYPES_H -// IWYU pragma: private +// IWYU pragma: private, include <grpc/compression.h> +// IWYU pragma: friend "src/.*" #include <grpc/impl/codegen/port_platform.h> diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/connectivity_state.h b/contrib/libs/grpc/include/grpc/impl/codegen/connectivity_state.h index 52084da4cd..218afd778d 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/connectivity_state.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/connectivity_state.h @@ -19,7 +19,8 @@ #ifndef GRPC_IMPL_CODEGEN_CONNECTIVITY_STATE_H #define GRPC_IMPL_CODEGEN_CONNECTIVITY_STATE_H -// IWYU pragma: private +// IWYU pragma: private, include <grpc/grpc.h> +// IWYU pragma: friend "src/.*" #ifdef __cplusplus extern "C" { diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/gpr_types.h b/contrib/libs/grpc/include/grpc/impl/codegen/gpr_types.h index 415bf7a9da..253ff3ae96 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/gpr_types.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/gpr_types.h @@ -19,7 +19,8 @@ #ifndef GRPC_IMPL_CODEGEN_GPR_TYPES_H #define GRPC_IMPL_CODEGEN_GPR_TYPES_H -// IWYU pragma: private +// IWYU pragma: private, include <grpc/grpc.h> +// IWYU pragma: friend "src/.*" #include <grpc/impl/codegen/port_platform.h> diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/grpc_types.h b/contrib/libs/grpc/include/grpc/impl/codegen/grpc_types.h index 40f3075c23..3bbef00b4f 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/grpc_types.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/grpc_types.h @@ -19,7 +19,8 @@ #ifndef GRPC_IMPL_CODEGEN_GRPC_TYPES_H #define GRPC_IMPL_CODEGEN_GRPC_TYPES_H -// IWYU pragma: private +// IWYU pragma: private, include <grpc/grpc.h> +// IWYU pragma: friend "src/.*" #include <grpc/impl/codegen/port_platform.h> @@ -219,7 +220,7 @@ typedef struct { "grpc.http2.min_ping_interval_without_data_ms" /** Channel arg to override the http2 :scheme header */ #define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme" -/** How many pings can we send before needing to send a +/** How many pings can the client send before needing to send a data/header frame? (0 indicates that an infinite number of pings can be sent without sending a data frame or header frame) */ #define GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA \ @@ -241,8 +242,8 @@ typedef struct { not receive the ping ack, it will close the transport. Int valued, milliseconds. */ #define GRPC_ARG_KEEPALIVE_TIMEOUT_MS "grpc.keepalive_timeout_ms" -/** Is it permissible to send keepalive pings without any outstanding streams. - Int valued, 0(false)/1(true). */ +/** Is it permissible to send keepalive pings from the client without any + outstanding streams. Int valued, 0(false)/1(true). */ #define GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS \ "grpc.keepalive_permit_without_calls" /** Default authority to pass if none specified on call construction. A string. @@ -367,6 +368,12 @@ typedef struct { balancer before using fallback backend addresses from the resolver. If 0, enter fallback mode immediately. Default value is 10000. */ #define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms" +/* Experimental Arg. Channel args to be used for the control-plane channel + * created to the grpclb load balancers. This is a pointer arg whose value is a + * grpc_channel_args object. If unset, most channel args from the parent channel + * will be propagated to the grpclb channel. */ +#define GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS \ + "grpc.experimental.grpclb_channel_args" /* Timeout in milliseconds to wait for the child of a specific priority to complete its initial connection attempt before the priority LB policy fails over to the next priority. Default value is 10 seconds. */ @@ -518,25 +525,16 @@ typedef enum grpc_call_error { /** Initial metadata flags */ /** These flags are to be passed to the `grpc_op::flags` field */ -/** Signal that the call is idempotent */ -#define GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST (0x00000010u) /** Signal that the call should not return UNAVAILABLE before it has started */ #define GRPC_INITIAL_METADATA_WAIT_FOR_READY (0x00000020u) -/** Signal that the call is cacheable. GRPC is free to use GET verb */ -#define GRPC_INITIAL_METADATA_CACHEABLE_REQUEST (0x00000040u) /** Signal that GRPC_INITIAL_METADATA_WAIT_FOR_READY was explicitly set by the calling application. */ #define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET (0x00000080u) -/** Signal that the initial metadata should be corked */ -#define GRPC_INITIAL_METADATA_CORKED (0x00000100u) /** Mask of all valid flags */ #define GRPC_INITIAL_METADATA_USED_MASK \ - (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \ - GRPC_INITIAL_METADATA_WAIT_FOR_READY | \ - GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \ - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \ - GRPC_INITIAL_METADATA_CORKED | GRPC_WRITE_THROUGH) + (GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \ + GRPC_INITIAL_METADATA_WAIT_FOR_READY | GRPC_WRITE_THROUGH) /** A single metadata element */ typedef struct grpc_metadata { @@ -591,8 +589,6 @@ typedef struct { grpc_slice method; grpc_slice host; gpr_timespec deadline; - uint32_t flags; - void* reserved; } grpc_call_details; typedef enum { diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/port_platform.h b/contrib/libs/grpc/include/grpc/impl/codegen/port_platform.h index 34320cc938..ec8217a345 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/port_platform.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/port_platform.h @@ -48,13 +48,6 @@ #endif #endif // GPR_ABSEIL_SYNC -/* - * Defines GRPC_ERROR_IS_ABSEIL_STATUS to use y_absl::Status for grpc_error_handle - */ -#ifndef GRPC_ERROR_IS_ABSEIL_STATUS -// #define GRPC_ERROR_IS_ABSEIL_STATUS 1 -#endif - /* Get windows.h included everywhere (we need it) */ #if defined(_WIN64) || defined(WIN64) || defined(_WIN32) || defined(WIN32) #ifndef WIN32_LEAN_AND_MEAN @@ -207,6 +200,7 @@ #define GPR_CPU_POSIX 1 #define GPR_PLATFORM_STRING "asylo" #define GPR_GCC_SYNC 1 +#define GPR_POSIX_STAT 1 #define GPR_POSIX_SYNC 1 #define GPR_POSIX_STRING 1 #define GPR_POSIX_LOG 1 @@ -219,6 +213,8 @@ #define GRPC_TIMER_USE_GENERIC 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_HAVE_MSG_NOSIGNAL 1 +#define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_ARES 0 #define GPR_NO_AUTODETECT_PLATFORM 1 #elif defined(__APPLE__) @@ -352,6 +348,32 @@ #else /* _LP64 */ #define GPR_ARCH_32 1 #endif /* _LP64 */ +#elif defined(__NetBSD__) +// NetBSD is a community-supported platform. +// Please contact Thomas Klausner <wiz@NetBSD.org> for support. +#define GPR_PLATFORM_STRING "netbsd" +#ifndef _BSD_SOURCE +#define _BSD_SOURCE +#endif +#define GPR_NETBSD 1 +#define GPR_CPU_POSIX 1 +#define GPR_GCC_ATOMIC 1 +#define GPR_GCC_TLS 1 +#define GPR_POSIX_LOG 1 +#define GPR_POSIX_ENV 1 +#define GPR_POSIX_TMPFILE 1 +#define GPR_POSIX_STAT 1 +#define GPR_POSIX_STRING 1 +#define GPR_POSIX_SUBPROCESS 1 +#define GPR_POSIX_SYNC 1 +#define GPR_POSIX_TIME 1 +#define GPR_GETPID_IN_UNISTD_H 1 +#define GPR_SUPPORT_CHANNELS_FROM_FD 1 +#ifdef _LP64 +#define GPR_ARCH_64 1 +#else /* _LP64 */ +#define GPR_ARCH_32 1 +#endif /* _LP64 */ #elif defined(__native_client__) #define GPR_PLATFORM_STRING "nacl" #ifndef _BSD_SOURCE @@ -404,6 +426,32 @@ #define GPR_HAS_PTHREAD_H 1 #define GPR_GETPID_IN_UNISTD_H 1 #define GRPC_ROOT_PEM_PATH "/config/ssl/cert.pem" +#elif defined(__HAIKU__) +#define GPR_PLATFORM_STRING "haiku" +// Haiku is a community-supported platform. +// Please contact Jerome Duval <jerome.duval@gmail.com> for support. +#ifndef _BSD_SOURCE +#define _BSD_SOURCE +#endif +#define GPR_HAIKU 1 +#define GPR_CPU_POSIX 1 +#define GPR_GCC_ATOMIC 1 +#define GPR_POSIX_LOG 1 +#define GPR_POSIX_ENV 1 +#define GPR_POSIX_TMPFILE 1 +#define GPR_POSIX_STAT 1 +#define GPR_POSIX_STRING 1 +#define GPR_POSIX_SUBPROCESS 1 +#define GPR_POSIX_SYNC 1 +#define GPR_POSIX_TIME 1 +#define GPR_HAS_PTHREAD_H 1 +#define GPR_GETPID_IN_UNISTD_H 1 +#define GPR_SUPPORT_CHANNELS_FROM_FD 1 +#ifdef _LP64 +#define GPR_ARCH_64 1 +#else /* _LP64 */ +#define GPR_ARCH_32 1 +#endif /* _LP64 */ #else #error "Could not auto-detect platform" #endif @@ -528,6 +576,19 @@ typedef unsigned __int64 uint64_t; #define GRPC_MUST_USE_RESULT #define GPR_ALIGN_STRUCT(n) #endif +#ifdef USE_STRICT_WARNING +/* When building with USE_STRICT_WARNING (which -Werror), types with this + attribute will be treated as annotated with warn_unused_result, enforcing + returned values of this type should be used. + This is added in grpc::Status in mind to address the issue where it always + has this annotation internally but OSS doesn't, sometimes causing internal + build failure. To prevent this, this is added while not introducing + a breaking change to existing user code which may not use returned values + of grpc::Status. */ +#define GRPC_MUST_USE_RESULT_WHEN_USE_STRICT_WARNING GRPC_MUST_USE_RESULT +#else +#define GRPC_MUST_USE_RESULT_WHEN_USE_STRICT_WARNING +#endif #endif #ifndef GRPC_UNUSED @@ -547,14 +608,43 @@ typedef unsigned __int64 uint64_t; #endif #endif /* GPR_PRINT_FORMAT_CHECK */ +#ifndef GPR_HAS_CPP_ATTRIBUTE +#ifdef __has_cpp_attribute +#define GPR_HAS_CPP_ATTRIBUTE(a) __has_cpp_attribute(a) +#else +#define GPR_HAS_CPP_ATTRIBUTE(a) 0 +#endif +#endif /* GPR_HAS_CPP_ATTRIBUTE */ + +#ifndef GPR_HAS_ATTRIBUTE +#ifdef __has_attribute +#define GPR_HAS_ATTRIBUTE(a) __has_attribute(a) +#else +#define GPR_HAS_ATTRIBUTE(a) 0 +#endif +#endif /* GPR_HAS_ATTRIBUTE */ + +#if GPR_HAS_ATTRIBUTE(noreturn) +#define GPR_ATTRIBUTE_NORETURN __attribute__((noreturn)) +#else +#define GPR_ATTRIBUTE_NORETURN +#endif + #if GPR_FORBID_UNREACHABLE_CODE #define GPR_UNREACHABLE_CODE(STATEMENT) #else -#define GPR_UNREACHABLE_CODE(STATEMENT) \ - do { \ - gpr_log(GPR_ERROR, "Should never reach here."); \ - abort(); \ - STATEMENT; \ +#ifdef __cplusplus +extern "C" { +#endif +extern void gpr_unreachable_code(const char* reason, const char* file, + int line) GPR_ATTRIBUTE_NORETURN; +#ifdef __cplusplus +} +#endif +#define GPR_UNREACHABLE_CODE(STATEMENT) \ + do { \ + gpr_unreachable_code(#STATEMENT, __FILE__, __LINE__); \ + STATEMENT; \ } while (0) #endif /* GPR_FORBID_UNREACHABLE_CODE */ @@ -570,22 +660,6 @@ typedef unsigned __int64 uint64_t; #define CENSUSAPI GRPCAPI #endif -#ifndef GPR_HAS_CPP_ATTRIBUTE -#ifdef __has_cpp_attribute -#define GPR_HAS_CPP_ATTRIBUTE(a) __has_cpp_attribute(a) -#else -#define GPR_HAS_CPP_ATTRIBUTE(a) 0 -#endif -#endif /* GPR_HAS_CPP_ATTRIBUTE */ - -#ifndef GPR_HAS_ATTRIBUTE -#ifdef __has_attribute -#define GPR_HAS_ATTRIBUTE(a) __has_attribute(a) -#else -#define GPR_HAS_ATTRIBUTE(a) 0 -#endif -#endif /* GPR_HAS_ATTRIBUTE */ - #ifndef GPR_HAS_FEATURE #ifdef __has_feature #define GPR_HAS_FEATURE(a) __has_feature(a) @@ -692,21 +766,6 @@ typedef unsigned __int64 uint64_t; #define __STDC_FORMAT_MACROS #endif -/* Selectively enable EventEngine on specific platforms. This default can be - * overridden using the GRPC_USE_EVENT_ENGINE compiler flag. - */ -#ifndef GRPC_USE_EVENT_ENGINE -/* Not enabled by default on any platforms yet. (2021.06) */ -#elif GRPC_USE_EVENT_ENGINE == 0 -/* Building with `-DGRPC_USE_EVENT_ENGINE=0` will override the default. */ -#undef GRPC_USE_EVENT_ENGINE -#endif /* GRPC_USE_EVENT_ENGINE */ - -#ifdef GRPC_USE_EVENT_ENGINE -#undef GPR_SUPPORT_CHANNELS_FROM_FD -#define GRPC_ARES 0 -#endif /* GRPC_USE_EVENT_ENGINE */ - #define GRPC_CALLBACK_API_NONEXPERIMENTAL /* clang 11 with msan miscompiles destruction of [[no_unique_address]] members diff --git a/contrib/libs/grpc/include/grpc/impl/codegen/slice.h b/contrib/libs/grpc/include/grpc/impl/codegen/slice.h index 130e5efffd..33c1f1b37b 100644 --- a/contrib/libs/grpc/include/grpc/impl/codegen/slice.h +++ b/contrib/libs/grpc/include/grpc/impl/codegen/slice.h @@ -19,7 +19,7 @@ #ifndef GRPC_IMPL_CODEGEN_SLICE_H #define GRPC_IMPL_CODEGEN_SLICE_H -// IWYU pragma: private, include <grpc/slice.h> +// IWYU pragma: private #include <grpc/impl/codegen/port_platform.h> diff --git a/contrib/libs/grpc/include/grpcpp/channel.h b/contrib/libs/grpc/include/grpcpp/channel.h index 25a4c4d5c6..cf7ec3d4bb 100644 --- a/contrib/libs/grpc/include/grpcpp/channel.h +++ b/contrib/libs/grpc/include/grpcpp/channel.h @@ -24,11 +24,11 @@ #include <grpc/grpc.h> #include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/channel_interface.h> -#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/grpc_library.h> #include <grpcpp/impl/codegen/sync.h> +#include <grpcpp/support/client_interceptor.h> struct grpc_channel; diff --git a/contrib/libs/grpc/include/grpcpp/create_channel.h b/contrib/libs/grpc/include/grpcpp/create_channel.h index 4b94a08e45..df015b9869 100644 --- a/contrib/libs/grpc/include/grpcpp/create_channel.h +++ b/contrib/libs/grpc/include/grpcpp/create_channel.h @@ -22,9 +22,9 @@ #include <memory> #include <grpcpp/channel.h> -#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/security/credentials.h> #include <grpcpp/support/channel_arguments.h> +#include <grpcpp/support/client_interceptor.h> #include <grpcpp/support/config.h> namespace grpc { diff --git a/contrib/libs/grpc/include/grpcpp/create_channel_binder.h b/contrib/libs/grpc/include/grpcpp/create_channel_binder.h index 9b68e3a8e8..377e4d4cd6 100644 --- a/contrib/libs/grpc/include/grpcpp/create_channel_binder.h +++ b/contrib/libs/grpc/include/grpcpp/create_channel_binder.h @@ -69,6 +69,37 @@ std::shared_ptr<grpc::Channel> CreateCustomBinderChannel( std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy, const ChannelArguments& args); +/// EXPERIMENTAL Create a new \a Channel based on binder transport. +/// +/// \param jni_env Pointer to a JNIEnv structure +/// \param context The context that we will use to invoke \a bindService See +/// https://developer.android.com/reference/android/content/Context#bindService(android.content.Intent,%20android.content.ServiceConnection,%20int) +/// for detail. +/// \param uri An URI that can be parsed as an `Intent` with +/// https://developer.android.com/reference/android/content/Intent#parseUri(java.lang.String,%20int) +/// \param security_policy Used for checking if remote component is allowed to +/// connect +std::shared_ptr<grpc::Channel> CreateBinderChannel( + void* jni_env, jobject context, y_absl::string_view uri, + std::shared_ptr<grpc::experimental::binder::SecurityPolicy> + security_policy); + +/// EXPERIMENTAL Create a new \a Channel based on binder transport. +/// +/// \param jni_env Pointer to a JNIEnv structure +/// \param context The context that we will use to invoke \a bindService See +/// https://developer.android.com/reference/android/content/Context#bindService(android.content.Intent,%20android.content.ServiceConnection,%20int) +/// for detail. +/// \param uri An URI that can be parsed as an `Intent` with +/// https://developer.android.com/reference/android/content/Intent#parseUri(java.lang.String,%20int) +/// \param security_policy Used for checking if remote component is allowed to +/// connect +/// \param args Options for channel creation. +std::shared_ptr<grpc::Channel> CreateCustomBinderChannel( + void* jni_env, jobject context, y_absl::string_view uri, + std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy, + const ChannelArguments& args); + /// EXPERIMENTAL Finds internal binder transport Java code. To create channels /// in threads created in native code, it is required to call this function /// once beforehand in a thread that is not created in native code. diff --git a/contrib/libs/grpc/include/grpcpp/ext/call_metric_recorder.h b/contrib/libs/grpc/include/grpcpp/ext/call_metric_recorder.h new file mode 100644 index 0000000000..91e25760f3 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/ext/call_metric_recorder.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_EXT_CALL_METRIC_RECORDER_H +#define GRPCPP_EXT_CALL_METRIC_RECORDER_H + +#include <memory> +#include <util/generic/string.h> +#include <util/string/cast.h> + +#include "y_absl/strings/string_view.h" +#include "y_absl/types/optional.h" + +#include <grpcpp/impl/codegen/slice.h> +#include <grpcpp/impl/codegen/sync.h> + +namespace grpc_core { +class Arena; +struct BackendMetricData; +} // namespace grpc_core + +namespace grpc { +class ServerBuilder; + +namespace experimental { +class OrcaServerInterceptor; + +// Registers the per-rpc orca load reporter into the \a ServerBuilder. +// Once this is done, the server will automatically send the load metrics +// after each RPC as they were reported. In order to report load metrics, +// call the \a ServerContext::ExperimentalGetCallMetricRecorder() method to +// retrieve the recorder for the current call. +void EnableCallMetricRecording(ServerBuilder*); + +/// Records call metrics for the purpose of load balancing. +/// During an RPC, call \a ServerContext::ExperimentalGetCallMetricRecorder() +/// method to retrive the recorder for the current call. +class CallMetricRecorder { + public: + explicit CallMetricRecorder(grpc_core::Arena* arena); + ~CallMetricRecorder(); + + /// Records a call metric measurement for CPU utilization. + /// Multiple calls to this method will override the stored value. + CallMetricRecorder& RecordCpuUtilizationMetric(double value); + + /// Records a call metric measurement for memory utilization. + /// Multiple calls to this method will override the stored value. + CallMetricRecorder& RecordMemoryUtilizationMetric(double value); + + /// Records a call metric measurement for utilization. + /// Multiple calls to this method with the same name will + /// override the corresponding stored value. The lifetime of the + /// name string needs to be longer than the lifetime of the RPC + /// itself, since it's going to be sent as trailers after the RPC + /// finishes. It is assumed the strings are common names that + /// are global constants. + CallMetricRecorder& RecordUtilizationMetric(string_ref name, double value); + + /// Records a call metric measurement for request cost. + /// Multiple calls to this method with the same name will + /// override the corresponding stored value. The lifetime of the + /// name string needs to be longer than the lifetime of the RPC + /// itself, since it's going to be sent as trailers after the RPC + /// finishes. It is assumed the strings are common names that + /// are global constants. + CallMetricRecorder& RecordRequestCostMetric(string_ref name, double value); + + private: + y_absl::optional<TString> CreateSerializedReport(); + + internal::Mutex mu_; + grpc_core::BackendMetricData* backend_metric_data_ Y_ABSL_GUARDED_BY(&mu_); + friend class experimental::OrcaServerInterceptor; +}; + +} // namespace experimental +} // namespace grpc + +#endif // GRPCPP_EXT_CALL_METRIC_RECORDER_H diff --git a/contrib/libs/grpc/include/grpcpp/generic/async_generic_service.h b/contrib/libs/grpc/include/grpcpp/generic/async_generic_service.h index 01c79428e2..9fb2bb9acb 100644 --- a/contrib/libs/grpc/include/grpcpp/generic/async_generic_service.h +++ b/contrib/libs/grpc/include/grpcpp/generic/async_generic_service.h @@ -19,6 +19,117 @@ #ifndef GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H #define GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H -#include <grpcpp/impl/codegen/async_generic_service.h> // IWYU pragma: export +#include <grpc/impl/codegen/port_platform.h> + +#include <grpcpp/impl/codegen/server_callback_handlers.h> +#include <grpcpp/support/async_stream.h> +#include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/server_callback.h> + +struct grpc_server; + +namespace grpc { + +typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer> + GenericServerAsyncReaderWriter; +typedef ServerAsyncResponseWriter<ByteBuffer> GenericServerAsyncResponseWriter; +typedef ServerAsyncReader<ByteBuffer, ByteBuffer> GenericServerAsyncReader; +typedef ServerAsyncWriter<ByteBuffer> GenericServerAsyncWriter; + +class GenericServerContext final : public ServerContext { + public: + const TString& method() const { return method_; } + const TString& host() const { return host_; } + + private: + friend class ServerInterface; + + TString method_; + TString host_; +}; + +// A generic service at the server side accepts all RPC methods and hosts. It is +// typically used in proxies. The generic service can be registered to a server +// which also has other services. +// Sample usage: +// ServerBuilder builder; +// auto cq = builder.AddCompletionQueue(); +// AsyncGenericService generic_service; +// builder.RegisterAsyncGenericService(&generic_service); +// auto server = builder.BuildAndStart(); +// +// // request a new call +// GenericServerContext context; +// GenericServerAsyncReaderWriter stream; +// generic_service.RequestCall(&context, &stream, cq.get(), cq.get(), tag); +// +// When tag is retrieved from cq->Next(), context.method() can be used to look +// at the method and the RPC can be handled accordingly. +class AsyncGenericService final { + public: + AsyncGenericService() : server_(nullptr) {} + + void RequestCall(GenericServerContext* ctx, + GenericServerAsyncReaderWriter* reader_writer, + grpc::CompletionQueue* call_cq, + grpc::ServerCompletionQueue* notification_cq, void* tag); + + private: + friend class grpc::Server; + grpc::Server* server_; +}; + +/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs +/// invoked on a CallbackGenericService. It is just a ServerBidi reactor with +/// ByteBuffer arguments. +using ServerGenericBidiReactor = ServerBidiReactor<ByteBuffer, ByteBuffer>; + +class GenericCallbackServerContext final : public grpc::CallbackServerContext { + public: + const TString& method() const { return method_; } + const TString& host() const { return host_; } + + private: + friend class grpc::Server; + + TString method_; + TString host_; +}; + +/// \a CallbackGenericService is the base class for generic services implemented +/// using the callback API and registered through the ServerBuilder using +/// RegisterCallbackGenericService. +class CallbackGenericService { + public: + CallbackGenericService() {} + virtual ~CallbackGenericService() {} + + /// The "method handler" for the generic API. This function should be + /// overridden to provide a ServerGenericBidiReactor that implements the + /// application-level interface for this RPC. Unimplemented by default. + virtual ServerGenericBidiReactor* CreateReactor( + GenericCallbackServerContext* /*ctx*/) { + class Reactor : public ServerGenericBidiReactor { + public: + Reactor() { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } + void OnDone() override { delete this; } + }; + return new Reactor; + } + + private: + friend class grpc::Server; + + internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>* Handler() { + return new internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>( + [this](grpc::CallbackServerContext* ctx) { + return CreateReactor(static_cast<GenericCallbackServerContext*>(ctx)); + }); + } + + grpc::Server* server_{nullptr}; +}; + +} // namespace grpc #endif // GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/call.h b/contrib/libs/grpc/include/grpcpp/impl/call.h index 97d8fdfc7c..2a119963f5 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/call.h +++ b/contrib/libs/grpc/include/grpcpp/impl/call.h @@ -19,6 +19,76 @@ #ifndef GRPCPP_IMPL_CALL_H #define GRPCPP_IMPL_CALL_H -#include <grpcpp/impl/codegen/call.h> // IWYU pragma: export +#include <grpc/impl/codegen/grpc_types.h> +#include <grpcpp/impl/call_hook.h> + +namespace grpc { +class CompletionQueue; +namespace experimental { +class ClientRpcInfo; +class ServerRpcInfo; +} // namespace experimental +namespace internal { +class CallHook; +class CallOpSetInterface; + +/// Straightforward wrapping of the C call object +class Call final { + public: + Call() + : call_hook_(nullptr), + cq_(nullptr), + call_(nullptr), + max_receive_message_size_(-1) {} + /** call is owned by the caller */ + Call(grpc_call* call, CallHook* call_hook, grpc::CompletionQueue* cq) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(-1) {} + + Call(grpc_call* call, CallHook* call_hook, grpc::CompletionQueue* cq, + experimental::ClientRpcInfo* rpc_info) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(-1), + client_rpc_info_(rpc_info) {} + + Call(grpc_call* call, CallHook* call_hook, grpc::CompletionQueue* cq, + int max_receive_message_size, experimental::ServerRpcInfo* rpc_info) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(max_receive_message_size), + server_rpc_info_(rpc_info) {} + + void PerformOps(CallOpSetInterface* ops) { + call_hook_->PerformOpsOnCall(ops, this); + } + + grpc_call* call() const { return call_; } + grpc::CompletionQueue* cq() const { return cq_; } + + int max_receive_message_size() const { return max_receive_message_size_; } + + experimental::ClientRpcInfo* client_rpc_info() const { + return client_rpc_info_; + } + + experimental::ServerRpcInfo* server_rpc_info() const { + return server_rpc_info_; + } + + private: + CallHook* call_hook_; + grpc::CompletionQueue* cq_; + grpc_call* call_; + int max_receive_message_size_; + experimental::ClientRpcInfo* client_rpc_info_ = nullptr; + experimental::ServerRpcInfo* server_rpc_info_ = nullptr; +}; +} // namespace internal +} // namespace grpc #endif // GRPCPP_IMPL_CALL_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/call_hook.h b/contrib/libs/grpc/include/grpcpp/impl/call_hook.h new file mode 100644 index 0000000000..8903b9a06e --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/impl/call_hook.h @@ -0,0 +1,39 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CALL_HOOK_H +#define GRPCPP_IMPL_CALL_HOOK_H + +namespace grpc { + +namespace internal { +class CallOpSetInterface; +class Call; + +/// This is an interface that Channel and Server implement to allow them to hook +/// performing ops. +class CallHook { + public: + virtual ~CallHook() {} + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; +}; +} // namespace internal + +} // namespace grpc + +#endif // GRPCPP_IMPL_CALL_HOOK_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/call_op_set_interface.h b/contrib/libs/grpc/include/grpcpp/impl/call_op_set_interface.h new file mode 100644 index 0000000000..43ed4ed94d --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/impl/call_op_set_interface.h @@ -0,0 +1,61 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CALL_OP_SET_INTERFACE_H +#define GRPCPP_IMPL_CALL_OP_SET_INTERFACE_H + +// IWYU pragma: private + +#include <grpcpp/impl/codegen/completion_queue_tag.h> + +namespace grpc { +namespace internal { + +class Call; + +/// An abstract collection of call ops, used to generate the +/// grpc_call_op structure to pass down to the lower layers, +/// and as it is-a CompletionQueueTag, also massages the final +/// completion into the correct form for consumption in the C++ +/// API. +class CallOpSetInterface : public CompletionQueueTag { + public: + /// Fills in grpc_op, starting from ops[*nops] and moving + /// upwards. + virtual void FillOps(internal::Call* call) = 0; + + /// Get the tag to be used at the core completion queue. Generally, the + /// value of core_cq_tag will be "this". However, it can be overridden if we + /// want core to process the tag differently (e.g., as a core callback) + virtual void* core_cq_tag() = 0; + + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + virtual void SetHijackingState() = 0; + + // Should be called after interceptors are done running + virtual void ContinueFillOpsAfterInterception() = 0; + + // Should be called after interceptors are done running on the finalize result + // path + virtual void ContinueFinalizeResultAfterInterception() = 0; +}; +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CALL_OP_SET_INTERFACE_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_generic_service.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_generic_service.h index 8aaeb2b824..03a096c52b 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_generic_service.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_generic_service.h @@ -19,119 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H #define GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H -// IWYU pragma: private, include <grpcpp/generic/async_generic_service.h> +// IWYU pragma: private -#include <grpc/impl/codegen/port_platform.h> - -#include <grpcpp/impl/codegen/async_stream.h> -#include <grpcpp/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/server_callback.h> -#include <grpcpp/impl/codegen/server_callback_handlers.h> - -struct grpc_server; - -namespace grpc { - -typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer> - GenericServerAsyncReaderWriter; -typedef ServerAsyncResponseWriter<ByteBuffer> GenericServerAsyncResponseWriter; -typedef ServerAsyncReader<ByteBuffer, ByteBuffer> GenericServerAsyncReader; -typedef ServerAsyncWriter<ByteBuffer> GenericServerAsyncWriter; - -class GenericServerContext final : public ServerContext { - public: - const TString& method() const { return method_; } - const TString& host() const { return host_; } - - private: - friend class ServerInterface; - - TString method_; - TString host_; -}; - -// A generic service at the server side accepts all RPC methods and hosts. It is -// typically used in proxies. The generic service can be registered to a server -// which also has other services. -// Sample usage: -// ServerBuilder builder; -// auto cq = builder.AddCompletionQueue(); -// AsyncGenericService generic_service; -// builder.RegisterAsyncGenericService(&generic_service); -// auto server = builder.BuildAndStart(); -// -// // request a new call -// GenericServerContext context; -// GenericServerAsyncReaderWriter stream; -// generic_service.RequestCall(&context, &stream, cq.get(), cq.get(), tag); -// -// When tag is retrieved from cq->Next(), context.method() can be used to look -// at the method and the RPC can be handled accordingly. -class AsyncGenericService final { - public: - AsyncGenericService() : server_(nullptr) {} - - void RequestCall(GenericServerContext* ctx, - GenericServerAsyncReaderWriter* reader_writer, - grpc::CompletionQueue* call_cq, - grpc::ServerCompletionQueue* notification_cq, void* tag); - - private: - friend class grpc::Server; - grpc::Server* server_; -}; - -/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs -/// invoked on a CallbackGenericService. It is just a ServerBidi reactor with -/// ByteBuffer arguments. -using ServerGenericBidiReactor = ServerBidiReactor<ByteBuffer, ByteBuffer>; - -class GenericCallbackServerContext final : public grpc::CallbackServerContext { - public: - const TString& method() const { return method_; } - const TString& host() const { return host_; } - - private: - friend class grpc::Server; - - TString method_; - TString host_; -}; - -/// \a CallbackGenericService is the base class for generic services implemented -/// using the callback API and registered through the ServerBuilder using -/// RegisterCallbackGenericService. -class CallbackGenericService { - public: - CallbackGenericService() {} - virtual ~CallbackGenericService() {} - - /// The "method handler" for the generic API. This function should be - /// overridden to provide a ServerGenericBidiReactor that implements the - /// application-level interface for this RPC. Unimplemented by default. - virtual ServerGenericBidiReactor* CreateReactor( - GenericCallbackServerContext* /*ctx*/) { - class Reactor : public ServerGenericBidiReactor { - public: - Reactor() { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } - void OnDone() override { delete this; } - }; - return new Reactor; - } - - private: - friend class grpc::Server; - - internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>* Handler() { - return new internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>( - [this](grpc::CallbackServerContext* ctx) { - return CreateReactor(static_cast<GenericCallbackServerContext*>(ctx)); - }); - } - - grpc::Server* server_{nullptr}; -}; - -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/generic/async_generic_service.h> #endif // GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_stream.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_stream.h index 5b63c30173..524f619f81 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_stream.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_stream.h @@ -18,1115 +18,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H -// IWYU pragma: private, include <grpcpp/support/async_stream.h> +// IWYU pragma: private -#include <grpcpp/impl/codegen/call.h> -#include <grpcpp/impl/codegen/channel_interface.h> -#include <grpcpp/impl/codegen/core_codegen_interface.h> -#include <grpcpp/impl/codegen/server_context.h> -#include <grpcpp/impl/codegen/service_type.h> -#include <grpcpp/impl/codegen/status.h> +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/support/async_stream.h> -namespace grpc { - -namespace internal { -/// Common interface for all client side asynchronous streaming. -class ClientAsyncStreamingInterface { - public: - virtual ~ClientAsyncStreamingInterface() {} - - /// Start the call that was set up by the constructor, but only if the - /// constructor was invoked through the "Prepare" API which doesn't actually - /// start the call - virtual void StartCall(void* tag) = 0; - - /// Request notification of the reading of the initial metadata. Completion - /// will be notified by \a tag on the associated completion queue. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a AsyncReaderInterface::Read method. - /// - /// \param[in] tag Tag identifying this request. - virtual void ReadInitialMetadata(void* tag) = 0; - - /// Indicate that the stream is to be finished and request notification for - /// when the call has been ended. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method exactly once when both: - /// * the client side has no more message to send - /// (this can be declared implicitly by calling this method, or - /// explicitly through an earlier call to the <i>WritesDone</i> method - /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or - /// \a ClientAsyncReaderWriterInterface::WritesDone). - /// * there are no more messages to be received from the server (this can - /// be known implicitly by the calling code, or explicitly from an - /// earlier call to \a AsyncReaderInterface::Read that yielded a failed - /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). - /// - /// The tag will be returned when either: - /// - all incoming messages have been read and the server has returned - /// a status. - /// - the server has returned a non-OK status. - /// - the call failed for some reason and the library generated a - /// status. - /// - /// Note that implementations of this method attempt to receive initial - /// metadata from the server if initial metadata hasn't yet been received. - /// - /// \param[in] tag Tag identifying this request. - /// \param[out] status To be updated with the operation status. - virtual void Finish(grpc::Status* status, void* tag) = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. -template <class R> -class AsyncReaderInterface { - public: - virtual ~AsyncReaderInterface() {} - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// This is thread-safe with respect to \a Write or \a WritesDone methods. It - /// should not be called concurrently with other streaming APIs - /// on the same stream. It is not meaningful to call it concurrently - /// with another \a AsyncReaderInterface::Read on the same stream since reads - /// on the same stream are delivered in order. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - /// - /// Side effect: note that this method attempt to receive initial metadata for - /// a stream if it hasn't yet been received. - virtual void Read(R* msg, void* tag) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. -template <class W> -class AsyncWriterInterface { - public: - virtual ~AsyncWriterInterface() {} - - /// Request the writing of \a msg with identifying tag \a tag. - /// - /// Only one write may be outstanding at any given time. This means that - /// after calling Write, one must wait to receive \a tag from the completion - /// queue BEFORE calling Write again. - /// This is thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to - /// to deallocate once Write returns. - /// - /// \param[in] msg The message to be written. - /// \param[in] tag The tag identifying the operation. - virtual void Write(const W& msg, void* tag) = 0; - - /// Request the writing of \a msg using WriteOptions \a options with - /// identifying tag \a tag. - /// - /// Only one write may be outstanding at any given time. This means that - /// after calling Write, one must wait to receive \a tag from the completion - /// queue BEFORE calling Write again. - /// WriteOptions \a options is used to set the write options of this message. - /// This is thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to - /// to deallocate once Write returns. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] tag The tag identifying the operation. - virtual void Write(const W& msg, grpc::WriteOptions options, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with the writing - /// of trailing metadata, using WriteOptions \a options with - /// identifying tag \a tag. - /// - /// For client, WriteLast is equivalent of performing Write and - /// WritesDone in a single step. - /// For server, WriteLast buffers the \a msg. The writing of \a msg is held - /// until Finish is called, where \a msg and trailing metadata are coalesced - /// and write is initiated. Note that WriteLast can only buffer \a msg up to - /// the flow control window size. If \a msg size is larger than the window - /// size, it will be sent on wire without buffering. - /// - /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to - /// to deallocate once Write returns. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] tag The tag identifying the operation. - void WriteLast(const W& msg, grpc::WriteOptions options, void* tag) { - Write(msg, options.set_last_message(), tag); - } -}; - -} // namespace internal - -template <class R> -class ClientAsyncReaderInterface - : public internal::ClientAsyncStreamingInterface, - public internal::AsyncReaderInterface<R> {}; - -namespace internal { -template <class R> -class ClientAsyncReaderFactory { - public: - /// Create a stream object. - /// Write the first request out if \a start is set. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. If \a start is not set, \a tag must be - /// nullptr and the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template <class W> - static ClientAsyncReader<R>* Create(grpc::ChannelInterface* channel, - grpc::CompletionQueue* cq, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, - const W& request, bool start, void* tag) { - grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader<R>))) - ClientAsyncReader<R>(call, context, request, start, tag); - } -}; -} // namespace internal - -/// Async client-side API for doing server-streaming RPCs, -/// where the incoming message stream coming from the server has -/// messages of type \a R. -template <class R> -class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall(void* tag) override { - GPR_CODEGEN_ASSERT(!started_); - started_ = true; - StartCallInternal(tag); - } - - /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata - /// method for semantics. - /// - /// Side effect: - /// - upon receiving initial metadata from the server, - /// the \a ClientContext associated with this call is updated, and the - /// calling code can access the received metadata through the - /// \a ClientContext. - void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. - /// - /// Side effect: - /// - the \a ClientContext associated with this call is updated with - /// possible initial and trailing metadata received from the server. - void Finish(grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class internal::ClientAsyncReaderFactory<R>; - template <class W> - ClientAsyncReader(grpc::internal::Call call, grpc::ClientContext* context, - const W& request, bool start, void* tag) - : context_(context), call_(call), started_(start) { - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - if (start) { - StartCallInternal(tag); - } else { - GPR_CODEGEN_ASSERT(tag == nullptr); - } - } - - void StartCallInternal(void* tag) { - init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - init_ops_.set_output_tag(tag); - call_.PerformOps(&init_ops_); - } - - grpc::ClientContext* context_; - grpc::internal::Call call_; - bool started_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpClientSendClose> - init_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> - meta_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpRecvMessage<R>> - read_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpClientRecvStatus> - finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template <class W> -class ClientAsyncWriterInterface - : public internal::ClientAsyncStreamingInterface, - public internal::AsyncWriterInterface<W> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -namespace internal { -template <class W> -class ClientAsyncWriterFactory { - public: - /// Create a stream object. - /// Start the RPC if \a start is set - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// If \a start is not set, \a tag must be nullptr and the actual call - /// must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template <class R> - static ClientAsyncWriter<W>* Create(grpc::ChannelInterface* channel, - grpc::CompletionQueue* cq, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, R* response, - bool start, void* tag) { - grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter<W>))) - ClientAsyncWriter<W>(call, context, response, start, tag); - } -}; -} // namespace internal - -/// Async API on the client side for doing client-streaming RPCs, -/// where the outgoing message stream going to the server contains -/// messages of type \a W. -template <class W> -class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall(void* tag) override { - GPR_CODEGEN_ASSERT(!started_); - started_ = true; - StartCallInternal(tag); - } - - /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for - /// semantics. - /// - /// Side effect: - /// - upon receiving initial metadata from the server, the \a ClientContext - /// associated with this call is updated, and the calling code can access - /// the received metadata through the \a ClientContext. - void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Write(const W& msg, grpc::WriteOptions options, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - write_ops_.set_output_tag(tag); - if (options.is_last_message()) { - options.set_buffer_hint(); - write_ops_.ClientSendClose(); - } - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - write_ops_.set_output_tag(tag); - write_ops_.ClientSendClose(); - call_.PerformOps(&write_ops_); - } - - /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. - /// - /// Side effect: - /// - the \a ClientContext associated with this call is updated with - /// possible initial and trailing metadata received from the server. - /// - attempts to fill in the \a response parameter passed to this class's - /// constructor with the server's response message. - void Finish(grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class internal::ClientAsyncWriterFactory<W>; - template <class R> - ClientAsyncWriter(grpc::internal::Call call, grpc::ClientContext* context, - R* response, bool start, void* tag) - : context_(context), call_(call), started_(start) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - if (start) { - StartCallInternal(tag); - } else { - GPR_CODEGEN_ASSERT(tag == nullptr); - } - } - - void StartCallInternal(void* tag) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - // if corked bit is set in context, we just keep the initial metadata - // buffered up to coalesce with later message send. No op is performed. - if (!context_->initial_metadata_corked_) { - write_ops_.set_output_tag(tag); - call_.PerformOps(&write_ops_); - } - } - - grpc::ClientContext* context_; - grpc::internal::Call call_; - bool started_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> - meta_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpClientSendClose> - write_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpGenericRecvMessage, - grpc::internal::CallOpClientRecvStatus> - finish_ops_; -}; - -/// Async client-side interface for bi-directional streaming, -/// where the client-to-server message stream has messages of type \a W, -/// and the server-to-client message stream has messages of type \a R. -template <class W, class R> -class ClientAsyncReaderWriterInterface - : public internal::ClientAsyncStreamingInterface, - public internal::AsyncWriterInterface<W>, - public internal::AsyncReaderInterface<R> { - public: - /// Signal the client is done with the writes (half-close the client stream). - /// Thread-safe with respect to \a AsyncReaderInterface::Read - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -namespace internal { -template <class W, class R> -class ClientAsyncReaderWriterFactory { - public: - /// Create a stream object. - /// Start the RPC request if \a start is set. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). If \a start is not set, \a tag must be - /// nullptr and the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter<W, R>* Create( - grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, - const grpc::internal::RpcMethod& method, grpc::ClientContext* context, - bool start, void* tag) { - grpc::internal::Call call = channel->CreateCall(method, context, cq); - - return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter<W, R>))) - ClientAsyncReaderWriter<W, R>(call, context, start, tag); - } -}; -} // namespace internal - -/// Async client-side interface for bi-directional streaming, -/// where the outgoing message stream going to the server -/// has messages of type \a W, and the incoming message stream coming -/// from the server has messages of type \a R. -template <class W, class R> -class ClientAsyncReaderWriter final - : public ClientAsyncReaderWriterInterface<W, R> { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall(void* tag) override { - GPR_CODEGEN_ASSERT(!started_); - started_ = true; - StartCallInternal(tag); - } - - /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method - /// for semantics of this method. - /// - /// Side effect: - /// - upon receiving initial metadata from the server, the \a ClientContext - /// is updated with it, and then the receiving initial metadata can - /// be accessed through this \a ClientContext. - void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Write(const W& msg, grpc::WriteOptions options, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - write_ops_.set_output_tag(tag); - if (options.is_last_message()) { - options.set_buffer_hint(); - write_ops_.ClientSendClose(); - } - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) override { - GPR_CODEGEN_ASSERT(started_); - write_ops_.set_output_tag(tag); - write_ops_.ClientSendClose(); - call_.PerformOps(&write_ops_); - } - - /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. - /// Side effect - /// - the \a ClientContext associated with this call is updated with - /// possible initial and trailing metadata sent from the server. - void Finish(grpc::Status* status, void* tag) override { - GPR_CODEGEN_ASSERT(started_); - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class internal::ClientAsyncReaderWriterFactory<W, R>; - ClientAsyncReaderWriter(grpc::internal::Call call, - grpc::ClientContext* context, bool start, void* tag) - : context_(context), call_(call), started_(start) { - if (start) { - StartCallInternal(tag); - } else { - GPR_CODEGEN_ASSERT(tag == nullptr); - } - } - - void StartCallInternal(void* tag) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - // if corked bit is set in context, we just keep the initial metadata - // buffered up to coalesce with later message send. No op is performed. - if (!context_->initial_metadata_corked_) { - write_ops_.set_output_tag(tag); - call_.PerformOps(&write_ops_); - } - } - - grpc::ClientContext* context_; - grpc::internal::Call call_; - bool started_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> - meta_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpRecvMessage<R>> - read_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpClientSendClose> - write_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpClientRecvStatus> - finish_ops_; -}; - -template <class W, class R> -class ServerAsyncReaderInterface - : public grpc::internal::ServerAsyncStreamingInterface, - public internal::AsyncReaderInterface<R> { - public: - /// Indicate that the stream is to be finished with a certain status code - /// and also send out \a msg response to the client. - /// Request notification for when the server has sent the response and the - /// appropriate signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous - /// \a AsyncReaderInterface::Read operation with a non-ok result, - /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if - /// some failure occurred when trying to do so. - /// - /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it - /// is safe to deallocate once Finish returns. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - /// \param[in] msg To be sent to the client as the response for this call. - virtual void Finish(const W& msg, const grpc::Status& status, void* tag) = 0; - - /// Indicate that the stream is to be finished with a certain - /// non-OK status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// This call is meant to end the call with some error, and can be called at - /// any point that the server would like to "fail" the call (though note - /// this shouldn't be called concurrently with any other "sending" call, like - /// \a AsyncWriterInterface::Write). - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), and status, or if some failure occurred - /// when trying to do so. - /// - /// gRPC doesn't take ownership or a reference to \a status, so it is safe to - /// to deallocate once FinishWithError returns. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - /// - Note: \a status must have a non-OK code. - virtual void FinishWithError(const grpc::Status& status, void* tag) = 0; -}; - -/// Async server-side API for doing client-streaming RPCs, -/// where the incoming message stream from the client has messages of type \a R, -/// and the single response message sent from the server is type \a W. -template <class W, class R> -class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { - public: - explicit ServerAsyncReader(grpc::ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. - /// - /// Implicit input parameter: - /// - The initial metadata that will be sent to the client from this op will - /// be taken from the \a ServerContext associated with the call. - void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) override { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - /// See the \a ServerAsyncReaderInterface.Read method for semantics - /// - /// Side effect: - /// - also sends initial metadata if not alreay sent. - /// - uses the \a ServerContext associated with this call to send possible - /// initial and trailing metadata. - /// - /// Note: \a msg is not sent if \a status has a non-OK code. - /// - /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it - /// is safe to deallocate once Finish returns. - void Finish(const W& msg, const grpc::Status& status, void* tag) override { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); - } else { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_ops_); - } - - /// See the \a ServerAsyncReaderInterface.Read method for semantics - /// - /// Side effect: - /// - also sends initial metadata if not alreay sent. - /// - uses the \a ServerContext associated with this call to send possible - /// initial and trailing metadata. - /// - /// gRPC doesn't take ownership or a reference to \a status, so it is safe to - /// to deallocate once FinishWithError returns. - void FinishWithError(const grpc::Status& status, void* tag) override { - GPR_CODEGEN_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(grpc::internal::Call* call) override { call_ = *call; } - - grpc::internal::Call call_; - grpc::ServerContext* ctx_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpServerSendStatus> - finish_ops_; -}; - -template <class W> -class ServerAsyncWriterInterface - : public grpc::internal::ServerAsyncStreamingInterface, - public internal::AsyncWriterInterface<W> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation with a non-ok - /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if - /// some failure occurred when trying to do so. - /// - /// gRPC doesn't take ownership or a reference to \a status, so it is safe to - /// to deallocate once Finish returns. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const grpc::Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish - /// in a single step. - /// - /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it - /// is safe to deallocate once WriteAndFinish returns. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, - const grpc::Status& status, void* tag) = 0; -}; - -/// Async server-side API for doing server streaming RPCs, -/// where the outgoing message stream from the server has messages of type \a W. -template <class W> -class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { - public: - explicit ServerAsyncWriter(grpc::ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. - /// - /// Implicit input parameter: - /// - The initial metadata that will be sent to the client from this op will - /// be taken from the \a ServerContext associated with the call. - /// - /// \param[in] tag Tag identifying this request. - void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) override { - write_ops_.set_output_tag(tag); - EnsureInitialMetadataSent(&write_ops_); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Write(const W& msg, grpc::WriteOptions options, void* tag) override { - write_ops_.set_output_tag(tag); - if (options.is_last_message()) { - options.set_buffer_hint(); - } - - EnsureInitialMetadataSent(&write_ops_); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - call_.PerformOps(&write_ops_); - } - - /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. - /// - /// Implicit input parameter: - /// - the \a ServerContext associated with this call is used - /// for sending trailing (and initial) metadata to the client. - /// - /// Note: \a status must have an OK code. - /// - /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it - /// is safe to deallocate once WriteAndFinish returns. - void WriteAndFinish(const W& msg, grpc::WriteOptions options, - const grpc::Status& status, void* tag) override { - write_ops_.set_output_tag(tag); - EnsureInitialMetadataSent(&write_ops_); - options.set_buffer_hint(); - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&write_ops_); - } - - /// See the \a ServerAsyncWriterInterface.Finish method for semantics. - /// - /// Implicit input parameter: - /// - the \a ServerContext associated with this call is used for sending - /// trailing (and initial if not already sent) metadata to the client. - /// - /// Note: there are no restrictions are the code of - /// \a status,it may be non-OK - /// - /// gRPC doesn't take ownership or a reference to \a status, so it is safe to - /// to deallocate once Finish returns. - void Finish(const grpc::Status& status, void* tag) override { - finish_ops_.set_output_tag(tag); - EnsureInitialMetadataSent(&finish_ops_); - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(grpc::internal::Call* call) override { call_ = *call; } - - template <class T> - void EnsureInitialMetadataSent(T* ops) { - if (!ctx_->sent_initial_metadata_) { - ops->SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ops->set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - } - - grpc::internal::Call call_; - grpc::ServerContext* ctx_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpServerSendStatus> - write_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus> - finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ServerAsyncReaderWriterInterface - : public grpc::internal::ServerAsyncStreamingInterface, - public internal::AsyncWriterInterface<W>, - public internal::AsyncReaderInterface<R> { - public: - /// Indicate that the stream is to be finished with a certain status code. - /// Request notification for when the server has sent the appropriate - /// signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// It is appropriate to call this method when either: - /// * all messages from the client have been received (either known - /// implictly, or explicitly because a previous \a - /// AsyncReaderInterface::Read operation - /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' - /// with 'false'. - /// * it is desired to end the call early with some non-OK status code. - /// - /// This operation will end when the server has finished sending out initial - /// metadata (if not sent already), response message, and status, or if some - /// failure occurred when trying to do so. - /// - /// gRPC doesn't take ownership or a reference to \a status, so it is safe to - /// to deallocate once Finish returns. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of this call. - virtual void Finish(const grpc::Status& status, void* tag) = 0; - - /// Request the writing of \a msg and coalesce it with trailing metadata which - /// contains \a status, using WriteOptions options with - /// identifying tag \a tag. - /// - /// WriteAndFinish is equivalent of performing WriteLast and Finish in a - /// single step. - /// - /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it - /// is safe to deallocate once WriteAndFinish returns. - /// - /// \param[in] msg The message to be written. - /// \param[in] options The WriteOptions to be used to write this message. - /// \param[in] status The Status that server returns to client. - /// \param[in] tag The tag identifying the operation. - virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, - const grpc::Status& status, void* tag) = 0; -}; - -/// Async server-side API for doing bidirectional streaming RPCs, -/// where the incoming message stream coming from the client has messages of -/// type \a R, and the outgoing message stream coming from the server has -/// messages of type \a W. -template <class W, class R> -class ServerAsyncReaderWriter final - : public ServerAsyncReaderWriterInterface<W, R> { - public: - explicit ServerAsyncReaderWriter(grpc::ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. - /// - /// Implicit input parameter: - /// - The initial metadata that will be sent to the client from this op will - /// be taken from the \a ServerContext associated with the call. - /// - /// \param[in] tag Tag identifying this request. - void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) override { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) override { - write_ops_.set_output_tag(tag); - EnsureInitialMetadataSent(&write_ops_); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Write(const W& msg, grpc::WriteOptions options, void* tag) override { - write_ops_.set_output_tag(tag); - if (options.is_last_message()) { - options.set_buffer_hint(); - } - EnsureInitialMetadataSent(&write_ops_); - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - call_.PerformOps(&write_ops_); - } - - /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish - /// method for semantics. - /// - /// Implicit input parameter: - /// - the \a ServerContext associated with this call is used - /// for sending trailing (and initial) metadata to the client. - /// - /// Note: \a status must have an OK code. - // - /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it - /// is safe to deallocate once WriteAndFinish returns. - void WriteAndFinish(const W& msg, grpc::WriteOptions options, - const grpc::Status& status, void* tag) override { - write_ops_.set_output_tag(tag); - EnsureInitialMetadataSent(&write_ops_); - options.set_buffer_hint(); - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&write_ops_); - } - - /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. - /// - /// Implicit input parameter: - /// - the \a ServerContext associated with this call is used for sending - /// trailing (and initial if not already sent) metadata to the client. - /// - /// Note: there are no restrictions are the code of \a status, - /// it may be non-OK - // - /// gRPC doesn't take ownership or a reference to \a status, so it is safe to - /// to deallocate once Finish returns. - void Finish(const grpc::Status& status, void* tag) override { - finish_ops_.set_output_tag(tag); - EnsureInitialMetadataSent(&finish_ops_); - - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class grpc::Server; - - void BindCall(grpc::internal::Call* call) override { call_ = *call; } - - template <class T> - void EnsureInitialMetadataSent(T* ops) { - if (!ctx_->sent_initial_metadata_) { - ops->SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - ops->set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - } - - grpc::internal::Call call_; - grpc::ServerContext* ctx_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> - meta_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpServerSendStatus> - write_ops_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus> - finish_ops_; -}; - -} // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h index 0034d21058..55227b2e4a 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/async_unary_call.h @@ -19,402 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H -// IWYU pragma: private, include <grpcpp/support/async_unary_call.h> +// IWYU pragma: private -#include <grpcpp/impl/codegen/call.h> -#include <grpcpp/impl/codegen/call_op_set.h> -#include <grpcpp/impl/codegen/call_op_set_interface.h> -#include <grpcpp/impl/codegen/channel_interface.h> -#include <grpcpp/impl/codegen/client_context.h> -#include <grpcpp/impl/codegen/server_context.h> -#include <grpcpp/impl/codegen/service_type.h> -#include <grpcpp/impl/codegen/status.h> - -namespace grpc { - -// Forward declaration for use in Helper class -template <class R> -class ClientAsyncResponseReader; - -/// An interface relevant for async client side unary RPCs (which send -/// one request message to a server and receive one response message). -template <class R> -class ClientAsyncResponseReaderInterface { - public: - virtual ~ClientAsyncResponseReaderInterface() {} - - /// Start the call that was set up by the constructor, but only if the - /// constructor was invoked through the "Prepare" API which doesn't actually - /// start the call - virtual void StartCall() = 0; - - /// Request notification of the reading of initial metadata. Completion - /// will be notified by \a tag on the associated completion queue. - /// This call is optional, but if it is used, it cannot be used concurrently - /// with or after the \a Finish method. - /// - /// \param[in] tag Tag identifying this request. - virtual void ReadInitialMetadata(void* tag) = 0; - - /// Request to receive the server's response \a msg and final \a status for - /// the call, and to notify \a tag on this call's completion queue when - /// finished. - /// - /// This function will return when either: - /// - when the server's response message and status have been received. - /// - when the server has returned a non-OK status (no message expected in - /// this case). - /// - when the call failed for some reason and the library generated a - /// non-OK status. - /// - /// \param[in] tag Tag identifying this request. - /// \param[out] status To be updated with the operation status. - /// \param[out] msg To be filled in with the server's response message. - virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0; -}; - -namespace internal { - -class ClientAsyncResponseReaderHelper { - public: - /// Start a call and write the request out if \a start is set. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// If \a start is not set, the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// - /// Optionally pass in a base class for request and response types so that the - /// internal functions and structs can be templated based on that, allowing - /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors - /// can't have an explicit template parameter, the last argument is an - /// extraneous parameter just to provide the needed type information. - template <class R, class W, class BaseR = R, class BaseW = W> - static ClientAsyncResponseReader<R>* Create( - grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, - const grpc::internal::RpcMethod& method, grpc::ClientContext* context, - const W& request) /* __attribute__((noinline)) */ { - grpc::internal::Call call = channel->CreateCall(method, context, cq); - ClientAsyncResponseReader<R>* result = - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader<R>))) - ClientAsyncResponseReader<R>(call, context); - SetupRequest<BaseR, BaseW>( - call.call(), &result->single_buf_, &result->read_initial_metadata_, - &result->finish_, static_cast<const BaseW&>(request)); - - return result; - } - - // Various helper functions to reduce templating use - - template <class R, class W> - static void SetupRequest( - grpc_call* call, - grpc::internal::CallOpSendInitialMetadata** single_buf_ptr, - std::function<void(ClientContext*, internal::Call*, - internal::CallOpSendInitialMetadata*, void*)>* - read_initial_metadata, - std::function< - void(ClientContext*, internal::Call*, bool initial_metadata_read, - internal::CallOpSendInitialMetadata*, - internal::CallOpSetInterface**, void*, Status*, void*)>* finish, - const W& request) { - using SingleBufType = - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpClientSendClose, - grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpRecvMessage<R>, - grpc::internal::CallOpClientRecvStatus>; - SingleBufType* single_buf = - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(SingleBufType))) SingleBufType; - *single_buf_ptr = single_buf; - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok()); - single_buf->ClientSendClose(); - - // The purpose of the following functions is to type-erase the actual - // templated type of the CallOpSet being used by hiding that type inside the - // function definition rather than specifying it as an argument of the - // function or a member of the class. The type-erased CallOpSet will get - // static_cast'ed back to the real type so that it can be used properly. - *read_initial_metadata = - [](ClientContext* context, internal::Call* call, - internal::CallOpSendInitialMetadata* single_buf_view, void* tag) { - auto* single_buf = static_cast<SingleBufType*>(single_buf_view); - single_buf->set_output_tag(tag); - single_buf->RecvInitialMetadata(context); - call->PerformOps(single_buf); - }; - - // Note that this function goes one step further than the previous one - // because it type-erases the message being written down to a void*. This - // will be static-cast'ed back to the class specified here by hiding that - // class information inside the function definition. Note that this feature - // expects the class being specified here for R to be a base-class of the - // "real" R without any multiple-inheritance (as applies in protbuf wrt - // MessageLite) - *finish = [](ClientContext* context, internal::Call* call, - bool initial_metadata_read, - internal::CallOpSendInitialMetadata* single_buf_view, - internal::CallOpSetInterface** finish_buf_ptr, void* msg, - Status* status, void* tag) { - if (initial_metadata_read) { - using FinishBufType = - grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>, - grpc::internal::CallOpClientRecvStatus>; - FinishBufType* finish_buf = - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call->call(), sizeof(FinishBufType))) FinishBufType; - *finish_buf_ptr = finish_buf; - finish_buf->set_output_tag(tag); - finish_buf->RecvMessage(static_cast<R*>(msg)); - finish_buf->AllowNoMessage(); - finish_buf->ClientRecvStatus(context, status); - call->PerformOps(finish_buf); - } else { - auto* single_buf = static_cast<SingleBufType*>(single_buf_view); - single_buf->set_output_tag(tag); - single_buf->RecvInitialMetadata(context); - single_buf->RecvMessage(static_cast<R*>(msg)); - single_buf->AllowNoMessage(); - single_buf->ClientRecvStatus(context, status); - call->PerformOps(single_buf); - } - }; - } - - static void StartCall(grpc::ClientContext* context, - grpc::internal::CallOpSendInitialMetadata* single_buf) { - single_buf->SendInitialMetadata(&context->send_initial_metadata_, - context->initial_metadata_flags()); - } -}; - -// TODO(vjpai): This templated factory is deprecated and will be replaced by -//. the non-templated helper as soon as possible. -template <class R> -class ClientAsyncResponseReaderFactory { - public: - template <class W> - static ClientAsyncResponseReader<R>* Create( - grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, - const grpc::internal::RpcMethod& method, grpc::ClientContext* context, - const W& request, bool start) { - auto* result = ClientAsyncResponseReaderHelper::Create<R>( - channel, cq, method, context, request); - if (start) { - result->StartCall(); - } - return result; - } -}; - -} // namespace internal - -/// Async API for client-side unary RPCs, where the message response -/// received from the server is of type \a R. -template <class R> -class ClientAsyncResponseReader final - : public ClientAsyncResponseReaderInterface<R> { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall() override { - GPR_CODEGEN_DEBUG_ASSERT(!started_); - started_ = true; - internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); - } - - /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for - /// semantics. - /// - /// Side effect: - /// - the \a ClientContext associated with this call is updated with - /// possible initial and trailing metadata sent from the server. - void ReadInitialMetadata(void* tag) override { - GPR_CODEGEN_DEBUG_ASSERT(started_); - GPR_CODEGEN_DEBUG_ASSERT(!context_->initial_metadata_received_); - read_initial_metadata_(context_, &call_, single_buf_, tag); - initial_metadata_read_ = true; - } - - /// See \a ClientAsyncResponseReaderInterface::Finish for semantics. - /// - /// Side effect: - /// - the \a ClientContext associated with this call is updated with - /// possible initial and trailing metadata sent from the server. - void Finish(R* msg, grpc::Status* status, void* tag) override { - GPR_CODEGEN_DEBUG_ASSERT(started_); - finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, - static_cast<void*>(msg), status, tag); - } - - private: - friend class internal::ClientAsyncResponseReaderHelper; - grpc::ClientContext* const context_; - grpc::internal::Call call_; - bool started_ = false; - bool initial_metadata_read_ = false; - - ClientAsyncResponseReader(grpc::internal::Call call, - grpc::ClientContext* context) - : context_(context), call_(call) {} - - // disable operator new - static void* operator new(std::size_t size); - static void* operator new(std::size_t /*size*/, void* p) { return p; } - - internal::CallOpSendInitialMetadata* single_buf_; - internal::CallOpSetInterface* finish_buf_ = nullptr; - std::function<void(ClientContext*, internal::Call*, - internal::CallOpSendInitialMetadata*, void*)> - read_initial_metadata_; - std::function<void(ClientContext*, internal::Call*, - bool initial_metadata_read, - internal::CallOpSendInitialMetadata*, - internal::CallOpSetInterface**, void*, Status*, void*)> - finish_; -}; - -/// Async server-side API for handling unary calls, where the single -/// response message sent to the client is of type \a W. -template <class W> -class ServerAsyncResponseWriter final - : public grpc::internal::ServerAsyncStreamingInterface { - public: - explicit ServerAsyncResponseWriter(grpc::ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. - /// - /// Side effect: - /// The initial metadata that will be sent to the client from this op will - /// be taken from the \a ServerContext associated with the call. - /// - /// \param[in] tag Tag identifying this request. - void SendInitialMetadata(void* tag) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_buf_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); - } - - /// Indicate that the stream is to be finished and request notification - /// when the server has sent the appropriate signals to the client to - /// end the call. Should not be used concurrently with other operations. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of the call. - /// \param[in] msg Message to be sent to the client. - /// - /// Side effect: - /// - also sends initial metadata if not already sent (using the - /// \a ServerContext associated with this call). - /// - /// Note: if \a status has a non-OK code, then \a msg will not be sent, - /// and the client will receive only the status with possible trailing - /// metadata. - /// - /// gRPC doesn't take ownership or a reference to msg and status, so it is - /// safe to deallocate them once the Finish operation is complete (i.e. a - /// result arrives in the completion queue). - void Finish(const W& msg, const grpc::Status& status, void* tag) { - finish_buf_.set_output_tag(tag); - finish_buf_.set_core_cq_tag(&finish_buf_); - if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_buf_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_buf_.SendMessage(msg)); - } else { - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_buf_); - } - - /// Indicate that the stream is to be finished with a non-OK status, - /// and request notification for when the server has finished sending the - /// appropriate signals to the client to end the call. - /// Should not be used concurrently with other operations. - /// - /// \param[in] tag Tag identifying this request. - /// \param[in] status To be sent to the client as the result of the call. - /// - Note: \a status must have a non-OK code. - /// - /// Side effect: - /// - also sends initial metadata if not already sent (using the - /// \a ServerContext associated with this call). - /// - /// gRPC doesn't take ownership or a reference to status, so it is safe to - /// deallocate them once the Finish operation is complete (i.e. a result - /// arrives in the completion queue). - void FinishWithError(const grpc::Status& status, void* tag) { - GPR_CODEGEN_ASSERT(!status.ok()); - finish_buf_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_buf_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); - } - - private: - void BindCall(grpc::internal::Call* call) override { call_ = *call; } - - grpc::internal::Call call_; - grpc::ServerContext* ctx_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> - meta_buf_; - grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpServerSendStatus> - finish_buf_; -}; - -} // namespace grpc - -namespace std { -template <class R> -class default_delete<grpc::ClientAsyncResponseReader<R>> { - public: - void operator()(void* /*p*/) {} -}; -template <class R> -class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { - public: - void operator()(void* /*p*/) {} -}; -} // namespace std +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/support/async_unary_call.h> #endif // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h index a9ba7d16fc..d196995392 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/byte_buffer.h @@ -19,221 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_BYTE_BUFFER_H #define GRPCPP_IMPL_CODEGEN_BYTE_BUFFER_H -// IWYU pragma: private, include <grpcpp/support/byte_buffer.h> +// IWYU pragma: private -#include <vector> - -#include <grpc/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/config.h> -#include <grpcpp/impl/codegen/core_codegen_interface.h> -#include <grpcpp/impl/codegen/serialization_traits.h> -#include <grpcpp/impl/codegen/slice.h> -#include <grpcpp/impl/codegen/status.h> - -namespace grpc { - -class ServerInterface; -class ByteBuffer; -class ServerInterface; - -namespace internal { -template <class RequestType, class ResponseType> -class CallbackUnaryHandler; -template <class RequestType, class ResponseType> -class CallbackServerStreamingHandler; -template <class RequestType> -void* UnaryDeserializeHelper(grpc_byte_buffer*, grpc::Status*, RequestType*); -template <class ServiceType, class RequestType, class ResponseType> -class ServerStreamingHandler; -template <grpc::StatusCode code> -class ErrorMethodHandler; -class CallOpSendMessage; -template <class R> -class CallOpRecvMessage; -class CallOpGenericRecvMessage; -class ExternalConnectionAcceptorImpl; -template <class R> -class DeserializeFuncType; -class GrpcByteBufferPeer; - -} // namespace internal -/// A sequence of bytes. -class ByteBuffer final { - public: - /// Constuct an empty buffer. - ByteBuffer() : buffer_(nullptr) {} - - /// Construct buffer from \a slices, of which there are \a nslices. - ByteBuffer(const Slice* slices, size_t nslices) { - // The following assertions check that the representation of a grpc::Slice - // is identical to that of a grpc_slice: it has a grpc_slice field, and - // nothing else. - static_assert(std::is_same<decltype(slices[0].slice_), grpc_slice>::value, - "Slice must have same representation as grpc_slice"); - static_assert(sizeof(Slice) == sizeof(grpc_slice), - "Slice must have same representation as grpc_slice"); - // The following assertions check that the representation of a ByteBuffer is - // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field, - // and nothing else. - static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value, - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*), - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - // The const_cast is legal if grpc_raw_byte_buffer_create() does no more - // than its advertised side effect of increasing the reference count of the - // slices it processes, and such an increase does not affect the semantics - // seen by the caller of this constructor. - buffer_ = g_core_codegen_interface->grpc_raw_byte_buffer_create( - reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); - } - - /// Constuct a byte buffer by referencing elements of existing buffer - /// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not - /// a deep copy; it is just a referencing. As a result, its performance is - /// size-independent. - ByteBuffer(const ByteBuffer& buf) : buffer_(nullptr) { operator=(buf); } - - ~ByteBuffer() { - if (buffer_) { - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_); - } - } - - /// Wrapper of core function grpc_byte_buffer_copy . This is not - /// a deep copy; it is just a referencing. As a result, its performance is - /// size-independent. - ByteBuffer& operator=(const ByteBuffer& buf) { - if (this != &buf) { - Clear(); // first remove existing data - } - if (buf.buffer_) { - // then copy - buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buf.buffer_); - } - return *this; - } - - // If this ByteBuffer's representation is a single flat slice, returns a - // slice referencing that array. - Status TrySingleSlice(Slice* slice) const; - - /// Dump (read) the buffer contents into \a slics. - Status DumpToSingleSlice(Slice* slice) const; - - /// Dump (read) the buffer contents into \a slices. - Status Dump(std::vector<Slice>* slices) const; - - /// Remove all data. - void Clear() { - if (buffer_) { - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_); - buffer_ = nullptr; - } - } - - /// Make a duplicate copy of the internals of this byte - /// buffer so that we have our own owned version of it. - /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable. - /// This is not a deep copy; it is a referencing and its performance - /// is size-independent. - void Duplicate() { - buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buffer_); - } - - /// Forget underlying byte buffer without destroying - /// Use this only for un-owned byte buffers - void Release() { buffer_ = nullptr; } - - /// Buffer size in bytes. - size_t Length() const { - return buffer_ == nullptr - ? 0 - : g_core_codegen_interface->grpc_byte_buffer_length(buffer_); - } - - /// Swap the state of *this and *other. - void Swap(ByteBuffer* other) { - grpc_byte_buffer* tmp = other->buffer_; - other->buffer_ = buffer_; - buffer_ = tmp; - } - - /// Is this ByteBuffer valid? - bool Valid() const { return (buffer_ != nullptr); } - - private: - friend class SerializationTraits<ByteBuffer, void>; - friend class ServerInterface; - friend class internal::CallOpSendMessage; - template <class R> - friend class internal::CallOpRecvMessage; - friend class internal::CallOpGenericRecvMessage; - template <class RequestType> - friend void* internal::UnaryDeserializeHelper(grpc_byte_buffer*, - grpc::Status*, RequestType*); - template <class ServiceType, class RequestType, class ResponseType> - friend class internal::ServerStreamingHandler; - template <class RequestType, class ResponseType> - friend class internal::CallbackUnaryHandler; - template <class RequestType, class ResponseType> - friend class internal::CallbackServerStreamingHandler; - template <StatusCode code> - friend class internal::ErrorMethodHandler; - template <class R> - friend class internal::DeserializeFuncType; - friend class ProtoBufferReader; - friend class ProtoBufferWriter; - friend class internal::GrpcByteBufferPeer; - friend class internal::ExternalConnectionAcceptorImpl; - - grpc_byte_buffer* buffer_; - - // takes ownership - void set_buffer(grpc_byte_buffer* buf) { - if (buffer_) { - Clear(); - } - buffer_ = buf; - } - - grpc_byte_buffer* c_buffer() { return buffer_; } - grpc_byte_buffer** c_buffer_ptr() { return &buffer_; } - - class ByteBufferPointer { - public: - /* NOLINTNEXTLINE(google-explicit-constructor) */ - ByteBufferPointer(const ByteBuffer* b) - : bbuf_(const_cast<ByteBuffer*>(b)) {} - /* NOLINTNEXTLINE(google-explicit-constructor) */ - operator ByteBuffer*() { return bbuf_; } - /* NOLINTNEXTLINE(google-explicit-constructor) */ - operator grpc_byte_buffer*() { return bbuf_->buffer_; } - /* NOLINTNEXTLINE(google-explicit-constructor) */ - operator grpc_byte_buffer**() { return &bbuf_->buffer_; } - - private: - ByteBuffer* bbuf_; - }; - ByteBufferPointer bbuf_ptr() const { return ByteBufferPointer(this); } -}; - -template <> -class SerializationTraits<ByteBuffer, void> { - public: - static Status Deserialize(ByteBuffer* byte_buffer, ByteBuffer* dest) { - dest->set_buffer(byte_buffer->buffer_); - return Status::OK; - } - static Status Serialize(const ByteBuffer& source, ByteBuffer* buffer, - bool* own_buffer) { - *buffer = source; - *own_buffer = true; - return g_core_codegen_interface->ok(); - } -}; - -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/support/byte_buffer.h> #endif // GRPCPP_IMPL_CODEGEN_BYTE_BUFFER_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/call.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/call.h index 90d54bc5d8..71f6cb8e4e 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/call.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/call.h @@ -18,78 +18,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_CALL_H #define GRPCPP_IMPL_CODEGEN_CALL_H -// IWYU pragma: private, include <grpcpp/impl/call.h> +// IWYU pragma: private -#include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/impl/codegen/call_hook.h> - -namespace grpc { -class CompletionQueue; -namespace experimental { -class ClientRpcInfo; -class ServerRpcInfo; -} // namespace experimental -namespace internal { -class CallHook; -class CallOpSetInterface; - -/// Straightforward wrapping of the C call object -class Call final { - public: - Call() - : call_hook_(nullptr), - cq_(nullptr), - call_(nullptr), - max_receive_message_size_(-1) {} - /** call is owned by the caller */ - Call(grpc_call* call, CallHook* call_hook, grpc::CompletionQueue* cq) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(-1) {} - - Call(grpc_call* call, CallHook* call_hook, grpc::CompletionQueue* cq, - experimental::ClientRpcInfo* rpc_info) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(-1), - client_rpc_info_(rpc_info) {} - - Call(grpc_call* call, CallHook* call_hook, grpc::CompletionQueue* cq, - int max_receive_message_size, experimental::ServerRpcInfo* rpc_info) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(max_receive_message_size), - server_rpc_info_(rpc_info) {} - - void PerformOps(CallOpSetInterface* ops) { - call_hook_->PerformOpsOnCall(ops, this); - } - - grpc_call* call() const { return call_; } - grpc::CompletionQueue* cq() const { return cq_; } - - int max_receive_message_size() const { return max_receive_message_size_; } - - experimental::ClientRpcInfo* client_rpc_info() const { - return client_rpc_info_; - } - - experimental::ServerRpcInfo* server_rpc_info() const { - return server_rpc_info_; - } - - private: - CallHook* call_hook_; - grpc::CompletionQueue* cq_; - grpc_call* call_; - int max_receive_message_size_; - experimental::ClientRpcInfo* client_rpc_info_ = nullptr; - experimental::ServerRpcInfo* server_rpc_info_ = nullptr; -}; -} // namespace internal -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/impl/call.h> #endif // GRPCPP_IMPL_CODEGEN_CALL_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_hook.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_hook.h index 8c4278e7dc..db10a0fc2b 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_hook.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_hook.h @@ -21,21 +21,7 @@ // IWYU pragma: private -namespace grpc { - -namespace internal { -class CallOpSetInterface; -class Call; - -/// This is an interface that Channel and Server implement to allow them to hook -/// performing ops. -class CallHook { - public: - virtual ~CallHook() {} - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; -}; -} // namespace internal - -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/impl/call_hook.h> #endif // GRPCPP_IMPL_CODEGEN_CALL_HOOK_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h index 283f299f95..ce8b363d6a 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set.h @@ -31,7 +31,6 @@ #include <grpc/impl/codegen/compression_types.h> #include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/call_hook.h> #include <grpcpp/impl/codegen/call_op_set_interface.h> @@ -45,6 +44,7 @@ #include <grpcpp/impl/codegen/serialization_traits.h> #include <grpcpp/impl/codegen/slice.h> #include <grpcpp/impl/codegen/string_ref.h> +#include <grpcpp/support/byte_buffer.h> namespace grpc { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set_interface.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set_interface.h index a8eed9f619..ce5f2036d1 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set_interface.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/call_op_set_interface.h @@ -21,41 +21,6 @@ // IWYU pragma: private -#include <grpcpp/impl/codegen/completion_queue_tag.h> - -namespace grpc { -namespace internal { - -class Call; - -/// An abstract collection of call ops, used to generate the -/// grpc_call_op structure to pass down to the lower layers, -/// and as it is-a CompletionQueueTag, also massages the final -/// completion into the correct form for consumption in the C++ -/// API. -class CallOpSetInterface : public CompletionQueueTag { - public: - /// Fills in grpc_op, starting from ops[*nops] and moving - /// upwards. - virtual void FillOps(internal::Call* call) = 0; - - /// Get the tag to be used at the core completion queue. Generally, the - /// value of core_cq_tag will be "this". However, it can be overridden if we - /// want core to process the tag differently (e.g., as a core callback) - virtual void* core_cq_tag() = 0; - - // This will be called while interceptors are run if the RPC is a hijacked - // RPC. This should set hijacking state for each of the ops. - virtual void SetHijackingState() = 0; - - // Should be called after interceptors are done running - virtual void ContinueFillOpsAfterInterception() = 0; - - // Should be called after interceptors are done running on the finalize result - // path - virtual void ContinueFinalizeResultAfterInterception() = 0; -}; -} // namespace internal -} // namespace grpc +#include <grpcpp/impl/call_op_set_interface.h> #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_INTERFACE_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h index f0ce27ae8f..9280289d5e 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/callback_common.h @@ -28,7 +28,7 @@ #include <functional> #include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/config.h> diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h index 106dd56c93..a3f162cc87 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/channel_interface.h @@ -26,7 +26,7 @@ // IWYU pragma: private #include <grpc/impl/codegen/connectivity_state.h> -#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/time.h> diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h index fc4d09a297..d24dd38b3c 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_context.h @@ -277,18 +277,6 @@ class ClientContext { deadline_ = deadline_tp.raw_time(); } - /// EXPERIMENTAL: Indicate that this request is idempotent. - /// By default, RPCs are assumed to <i>not</i> be idempotent. - /// - /// If true, the gRPC library assumes that it's safe to initiate - /// this RPC multiple times. - void set_idempotent(bool idempotent) { idempotent_ = idempotent; } - - /// EXPERIMENTAL: Set this request to be cacheable. - /// If set, grpc is free to use the HTTP GET verb for sending the request, - /// with the possibility of receiving a cached response. - void set_cacheable(bool cacheable) { cacheable_ = cacheable; } - /// Trigger wait-for-ready or not on this request. /// See https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md. /// If set, if an RPC is made when a channel's connectivity state is @@ -485,13 +473,10 @@ class ClientContext { } uint32_t initial_metadata_flags() const { - return (idempotent_ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST : 0) | - (wait_for_ready_ ? GRPC_INITIAL_METADATA_WAIT_FOR_READY : 0) | - (cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) | + return (wait_for_ready_ ? GRPC_INITIAL_METADATA_WAIT_FOR_READY : 0) | (wait_for_ready_explicitly_set_ ? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET - : 0) | - (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0); + : 0); } TString authority() { return authority_; } @@ -505,8 +490,6 @@ class ClientContext { bool initial_metadata_received_; bool wait_for_ready_; bool wait_for_ready_explicitly_set_; - bool idempotent_; - bool cacheable_; std::shared_ptr<grpc::Channel> channel_; grpc::internal::Mutex mu_; grpc_call* call_; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_interceptor.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_interceptor.h index f3560b5ad2..1c6b8ce51f 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/client_interceptor.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/client_interceptor.h @@ -19,181 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H #define GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H -// IWYU pragma: private, include <grpcpp/support/client_interceptor.h> +// IWYU pragma: private -#include <memory> -#include <vector> - -#include <grpcpp/impl/codegen/interceptor.h> -#include <grpcpp/impl/codegen/rpc_method.h> -#include <grpcpp/impl/codegen/string_ref.h> - -namespace grpc { - -class Channel; -class ClientContext; - -namespace internal { -class InterceptorBatchMethodsImpl; -} - -namespace experimental { -class ClientRpcInfo; - -// A factory interface for creation of client interceptors. A vector of -// factories can be provided at channel creation which will be used to create a -// new vector of client interceptors per RPC. Client interceptor authors should -// create a subclass of ClientInterceptorFactorInterface which creates objects -// of their interceptors. -class ClientInterceptorFactoryInterface { - public: - virtual ~ClientInterceptorFactoryInterface() {} - // Returns a pointer to an Interceptor object on successful creation, nullptr - // otherwise. If nullptr is returned, this server interceptor factory is - // ignored for the purposes of that RPC. - virtual Interceptor* CreateClientInterceptor(ClientRpcInfo* info) = 0; -}; -} // namespace experimental - -namespace internal { -extern experimental::ClientInterceptorFactoryInterface* - g_global_client_interceptor_factory; -} - -/// ClientRpcInfo represents the state of a particular RPC as it -/// appears to an interceptor. It is created and owned by the library and -/// passed to the CreateClientInterceptor method of the application's -/// ClientInterceptorFactoryInterface implementation -namespace experimental { -class ClientRpcInfo { - public: - // TODO(yashykt): Stop default-constructing ClientRpcInfo and remove UNKNOWN - // from the list of possible Types. - /// Type categorizes RPCs by unary or streaming type - enum class Type { - UNARY, - CLIENT_STREAMING, - SERVER_STREAMING, - BIDI_STREAMING, - UNKNOWN // UNKNOWN is not API and will be removed later - }; - - ~ClientRpcInfo() {} - - // Delete copy constructor but allow default move constructor - ClientRpcInfo(const ClientRpcInfo&) = delete; - ClientRpcInfo(ClientRpcInfo&&) = default; - - // Getter methods - - /// Return the fully-specified method name - const char* method() const { return method_; } - - /// Return an identifying suffix for the client stub, or nullptr if one wasn't - /// specified. - const char* suffix_for_stats() const { return suffix_for_stats_; } - - /// Return a pointer to the channel on which the RPC is being sent - ChannelInterface* channel() { return channel_; } - - /// Return a pointer to the underlying ClientContext structure associated - /// with the RPC to support features that apply to it - grpc::ClientContext* client_context() { return ctx_; } - - /// Return the type of the RPC (unary or a streaming flavor) - Type type() const { return type_; } - - private: - static_assert(Type::UNARY == - static_cast<Type>(internal::RpcMethod::NORMAL_RPC), - "violated expectation about Type enum"); - static_assert(Type::CLIENT_STREAMING == - static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), - "violated expectation about Type enum"); - static_assert(Type::SERVER_STREAMING == - static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), - "violated expectation about Type enum"); - static_assert(Type::BIDI_STREAMING == - static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), - "violated expectation about Type enum"); - - // Default constructor should only be used by ClientContext - ClientRpcInfo() = default; - - // Constructor will only be called from ClientContext - ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type, - const char* method, const char* suffix_for_stats, - grpc::ChannelInterface* channel) - : ctx_(ctx), - type_(static_cast<Type>(type)), - method_(method), - suffix_for_stats_(suffix_for_stats), - channel_(channel) {} - - // Move assignment should only be used by ClientContext - // TODO(yashykt): Delete move assignment - ClientRpcInfo& operator=(ClientRpcInfo&&) = default; - - // Runs interceptor at pos \a pos. - void RunInterceptor( - experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) { - GPR_CODEGEN_ASSERT(pos < interceptors_.size()); - interceptors_[pos]->Intercept(interceptor_methods); - } - - void RegisterInterceptors( - const std::vector<std::unique_ptr< - experimental::ClientInterceptorFactoryInterface>>& creators, - size_t interceptor_pos) { - if (interceptor_pos > creators.size()) { - // No interceptors to register - return; - } - // NOTE: The following is not a range-based for loop because it will only - // iterate over a portion of the creators vector. - for (auto it = creators.begin() + interceptor_pos; it != creators.end(); - ++it) { - auto* interceptor = (*it)->CreateClientInterceptor(this); - if (interceptor != nullptr) { - interceptors_.push_back( - std::unique_ptr<experimental::Interceptor>(interceptor)); - } - } - if (internal::g_global_client_interceptor_factory != nullptr) { - interceptors_.push_back(std::unique_ptr<experimental::Interceptor>( - internal::g_global_client_interceptor_factory - ->CreateClientInterceptor(this))); - } - } - - grpc::ClientContext* ctx_ = nullptr; - // TODO(yashykt): make type_ const once move-assignment is deleted - Type type_{Type::UNKNOWN}; - const char* method_ = nullptr; - const char* suffix_for_stats_ = nullptr; - grpc::ChannelInterface* channel_ = nullptr; - std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; - bool hijacked_ = false; - size_t hijacked_interceptor_ = 0; - - friend class internal::InterceptorBatchMethodsImpl; - friend class grpc::ClientContext; -}; - -// PLEASE DO NOT USE THIS. ALWAYS PREFER PER CHANNEL INTERCEPTORS OVER A GLOBAL -// INTERCEPTOR. IF USAGE IS ABSOLUTELY NECESSARY, PLEASE READ THE SAFETY NOTES. -// Registers a global client interceptor factory object, which is used for all -// RPCs made in this process. The application is responsible for maintaining the -// life of the object while gRPC operations are in progress. The global -// interceptor factory should only be registered once at the start of the -// process before any gRPC operations have begun. -void RegisterGlobalClientInterceptorFactory( - ClientInterceptorFactoryInterface* factory); - -// For testing purposes only -void TestOnlyResetGlobalClientInterceptorFactory(); - -} // namespace experimental -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include <grpcpp/support/client_interceptor.h> #endif // GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h index e0440ffe3b..340dc47faa 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen.h @@ -107,6 +107,8 @@ class CoreCodegen final : public CoreCodegenInterface { grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) override; void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override; void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override; + void grpc_slice_buffer_add_indexed(grpc_slice_buffer* sb, + grpc_slice slice) override; grpc_slice grpc_slice_from_static_buffer(const void* buffer, size_t length) override; grpc_slice grpc_slice_from_copied_buffer(const void* buffer, diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen_interface.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen_interface.h index 0b23bf46e6..5ca1feff08 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen_interface.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/core_codegen_interface.h @@ -127,6 +127,8 @@ class CoreCodegenInterface { virtual grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) = 0; virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) = 0; + virtual void grpc_slice_buffer_add_indexed(grpc_slice_buffer* sb, + grpc_slice slice) = 0; virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0; virtual grpc_slice grpc_slice_from_static_buffer(const void* buffer, size_t length) = 0; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor.h index 0818d5c534..b66c6656ae 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor.h @@ -21,13 +21,17 @@ // IWYU pragma: private, include <grpcpp/support/interceptor.h> +#include <map> #include <memory> +#include <util/generic/string.h> +#include <util/string/cast.h> #include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/metadata_map.h> +#include <grpcpp/impl/codegen/string_ref.h> +#include <grpcpp/support/byte_buffer.h> namespace grpc { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h index 46a5d1ac6b..f26b82b27e 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/interceptor_common.h @@ -29,8 +29,8 @@ #include <functional> #include <grpc/impl/codegen/grpc_types.h> +#include <grpcpp/impl/call_op_set_interface.h> #include <grpcpp/impl/codegen/call.h> -#include <grpcpp/impl/codegen/call_op_set_interface.h> #include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/intercepted_channel.h> #include <grpcpp/impl/codegen/server_interceptor.h> diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h index 3b6afaa2c7..4f00728b57 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/method_handler.h @@ -21,10 +21,10 @@ // IWYU pragma: private, include <grpcpp/support/method_handler.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/rpc_service_method.h> #include <grpcpp/impl/codegen/sync_stream.h> +#include <grpcpp/support/byte_buffer.h> namespace grpc { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h index ecc0e30109..38df095ca0 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_reader.h @@ -26,11 +26,11 @@ #include <grpc/impl/codegen/byte_buffer_reader.h> #include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/slice.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config_protobuf.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/serialization_traits.h> #include <grpcpp/impl/codegen/status.h> +#include <grpcpp/support/byte_buffer.h> /// This header provides an object that reads bytes directly from a /// grpc::ByteBuffer, via the ZeroCopyInputStream interface diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h index 552c6a70fa..34730bd205 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -25,11 +25,11 @@ #include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/slice.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config_protobuf.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/serialization_traits.h> #include <grpcpp/impl/codegen/status.h> +#include <grpcpp/support/byte_buffer.h> /// This header provides an object that writes bytes directly into a /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface @@ -110,7 +110,12 @@ class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream { // On win x64, int is only 32bit GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_)); - g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); + // Using grpc_slice_buffer_add could modify slice_ and merge it with the + // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to + // ensure the slice gets added at a separate index. It can then be kept + // around and popped later in the BackUp function. + g_core_codegen_interface->grpc_slice_buffer_add_indexed(slice_buffer_, + slice_); return true; } diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h index 4f6d87415a..9f9eb77b53 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/proto_utils.h @@ -26,7 +26,6 @@ #include <grpc/impl/codegen/byte_buffer_reader.h> #include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/slice.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config_protobuf.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/proto_buffer_reader.h> @@ -34,6 +33,7 @@ #include <grpcpp/impl/codegen/serialization_traits.h> #include <grpcpp/impl/codegen/slice.h> #include <grpcpp/impl/codegen/status.h> +#include <grpcpp/support/byte_buffer.h> /// This header provides serialization and deserialization between gRPC /// messages serialized using protobuf and the C++ objects they represent. diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_service_method.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_service_method.h index 6f6730a1f1..2aa7774ab5 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/rpc_service_method.h @@ -28,10 +28,10 @@ #include <vector> #include <grpc/impl/codegen/log.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/status.h> +#include <grpcpp/support/byte_buffer.h> namespace grpc { class ServerContextBase; diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h index 3079fecbc4..8343832c27 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/security/auth_context.h @@ -36,9 +36,14 @@ class SecureAuthContext; typedef std::pair<string_ref, string_ref> AuthProperty; -class AuthPropertyIterator - : public std::iterator<std::input_iterator_tag, const AuthProperty> { +class AuthPropertyIterator { public: + using iterator_category = std::forward_iterator_tag; + using value_type = const AuthProperty; + using pointer = void; + using reference = void; + using difference_type = std::ptrdiff_t; + ~AuthPropertyIterator(); AuthPropertyIterator& operator++(); AuthPropertyIterator operator++(int); diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h index 7a67ecd052..406fc90802 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_context.h @@ -117,6 +117,11 @@ class ServerContextTestSpouse; class DefaultReactorTestPeer; } // namespace testing +namespace experimental { +class OrcaServerInterceptor; +class CallMetricRecorder; +} // namespace experimental + /// Base class of ServerContext. class ServerContextBase { public: @@ -283,6 +288,19 @@ class ServerContextBase { /// Applications never need to call this method. grpc_call* c_call() { return call_.call; } + /// Get the \a CallMetricRecorder object for the current RPC. + /// Use it to record metrics during your RPC to send back to the + /// client in order to make load balancing decisions. This will + /// return nullptr if the feature hasn't been enabled using + /// \a EnableCallMetricRecording. + experimental::CallMetricRecorder* ExperimentalGetCallMetricRecorder() { + return call_metric_recorder_; + } + + /// EXPERIMENTAL API + /// Returns the call's authority. + grpc::string_ref ExperimentalGetAuthority() const; + protected: /// Async only. Has to be called before the rpc starts. /// Returns the tag in completion queue when the rpc finishes. @@ -388,6 +406,7 @@ class ServerContextBase { friend class grpc::ClientContext; friend class grpc::GenericServerContext; friend class grpc::GenericCallbackServerContext; + friend class grpc::experimental::OrcaServerInterceptor; /// Prevent copying. ServerContextBase(const ServerContextBase&); @@ -429,6 +448,8 @@ class ServerContextBase { } } + void CreateCallMetricRecorder(); + struct CallWrapper { ~CallWrapper(); @@ -466,6 +487,7 @@ class ServerContextBase { grpc::experimental::ServerRpcInfo* rpc_info_ = nullptr; RpcAllocatorState* message_allocator_state_ = nullptr; ContextAllocator* context_allocator_ = nullptr; + experimental::CallMetricRecorder* call_metric_recorder_ = nullptr; class Reactor : public grpc::ServerUnaryReactor { public: diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h index 031b377e4b..62a43b1bb4 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/server_interface.h @@ -24,7 +24,6 @@ #include <grpc/impl/codegen/port_platform.h> #include <grpc/impl/codegen/grpc_types.h> -#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/call_hook.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> @@ -32,6 +31,7 @@ #include <grpcpp/impl/codegen/interceptor_common.h> #include <grpcpp/impl/codegen/rpc_service_method.h> #include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/support/byte_buffer.h> namespace grpc { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/status.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/status.h index 358f2bb7f9..a3ab974a93 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/status.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/status.h @@ -21,6 +21,8 @@ // IWYU pragma: private, include <grpcpp/support/status.h> +#include <grpc/impl/codegen/port_platform.h> + #include <grpc/impl/codegen/status.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/status_code_enum.h> @@ -30,7 +32,7 @@ namespace grpc { /// Did it work? If it didn't, why? /// /// See \a grpc::StatusCode for details on the available code and their meaning. -class Status { +class GRPC_MUST_USE_RESULT_WHEN_USE_STRICT_WARNING Status { public: /// Construct an OK instance. Status() : code_(StatusCode::OK) { @@ -90,6 +92,10 @@ class Status { /// Construct an instance with associated \a code and \a error_message. /// It is an error to construct an OK status with non-empty \a error_message. + /// Note that \a message is intentionally accepted as a const reference + /// instead of a value (which results in a copy instead of a move) to allow + /// for easy transition to y_absl::Status in the future which accepts an + /// y_absl::string_view as a parameter. Status(StatusCode code, const TString& error_message) : code_(code), error_message_(error_message) {} diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h index 36ea24fdfd..6950eab57d 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h @@ -19,7 +19,7 @@ #ifndef GRPCPP_IMPL_CODEGEN_STATUS_CODE_ENUM_H #define GRPCPP_IMPL_CODEGEN_STATUS_CODE_ENUM_H -// IWYU pragma: private, include <grpcpp/support/status_code_enum.h> +// IWYU pragma: private, include <grpcpp/support/status.h> namespace grpc { diff --git a/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h b/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h index 3ed2fddcb8..1235b8de44 100644 --- a/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h +++ b/contrib/libs/grpc/include/grpcpp/impl/codegen/sync_stream.h @@ -20,7 +20,7 @@ // IWYU pragma: private, include <grpcpp/support/sync_stream.h> -#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/call.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/completion_queue.h> @@ -222,7 +222,12 @@ class ClientReader final : public ClientReaderInterface<R> { /// The \a ClientContext associated with this call is updated with /// possible metadata received from the server. grpc::Status Finish() override { - grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> ops; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpClientRecvStatus> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } grpc::Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); diff --git a/contrib/libs/grpc/include/grpcpp/security/auth_metadata_processor.h b/contrib/libs/grpc/include/grpcpp/security/auth_metadata_processor.h index f5321e877c..3251c8a9b8 100644 --- a/contrib/libs/grpc/include/grpcpp/security/auth_metadata_processor.h +++ b/contrib/libs/grpc/include/grpcpp/security/auth_metadata_processor.h @@ -30,6 +30,8 @@ namespace grpc { /// Interface allowing custom server-side authorization based on credentials /// encoded in metadata. Objects of this type can be passed to /// \a ServerCredentials::SetAuthMetadataProcessor(). +/// Please also check out \a grpc::experimental::Interceptor for another way to +/// do customized operations on the information provided by a specific call. class AuthMetadataProcessor { public: typedef std::multimap<grpc::string_ref, grpc::string_ref> InputMetadata; @@ -41,15 +43,25 @@ class AuthMetadataProcessor { /// a different thread from the one processing the call. virtual bool IsBlocking() const { return true; } - /// context is read/write: it contains the properties of the channel peer and - /// it is the job of the Process method to augment it with properties derived - /// from the passed-in auth_metadata. - /// consumed_auth_metadata needs to be filled with metadata that has been - /// consumed by the processor and will be removed from the call. - /// response_metadata is the metadata that will be sent as part of the - /// response. - /// If the return value is not Status::OK, the rpc call will be aborted with - /// the error code and error message sent back to the client. + /// Processes a Call associated with a connection. + /// auth_metadata: the authentication metadata associated with the particular + /// call + /// context: contains the connection-level info, e.g. the peer identity. This + /// parameter is readable and writable. Note that since the information is + /// shared for all calls associated with the connection, if the + /// implementation updates the info in a specific call, all the subsequent + /// calls will see the updates. A typical usage of context is to use + /// |auth_metadata| to infer the peer identity, and augment it with + /// properties. + /// consumed_auth_metadata: contains the metadata that the implementation + /// wants to remove from the current call, so that the server application is + /// no longer able to see it anymore. A typical usage would be to do token + /// authentication in the first call, and then remove the token information + /// for all subsequent calls. + /// response_metadata(CURRENTLY NOT SUPPORTED): the metadata that will be sent + /// as part of the response. + /// return: if the return value is not Status::OK, the rpc call will be + /// aborted with the error code and error message sent back to the client. virtual grpc::Status Process(const InputMetadata& auth_metadata, grpc::AuthContext* context, OutputMetadata* consumed_auth_metadata, diff --git a/contrib/libs/grpc/include/grpcpp/security/credentials.h b/contrib/libs/grpc/include/grpcpp/security/credentials.h index d6fb5a42be..ec194aecc3 100644 --- a/contrib/libs/grpc/include/grpcpp/security/credentials.h +++ b/contrib/libs/grpc/include/grpcpp/security/credentials.h @@ -29,11 +29,11 @@ #include <grpc/grpc_security_constants.h> #include <grpcpp/channel.h> -#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/grpc_library.h> #include <grpcpp/security/auth_context.h> #include <grpcpp/security/tls_credentials_options.h> #include <grpcpp/support/channel_arguments.h> +#include <grpcpp/support/client_interceptor.h> #include <grpcpp/support/status.h> #include <grpcpp/support/string_ref.h> diff --git a/contrib/libs/grpc/include/grpcpp/security/tls_certificate_verifier.h b/contrib/libs/grpc/include/grpcpp/security/tls_certificate_verifier.h index 63f285ece4..7635916eb7 100644 --- a/contrib/libs/grpc/include/grpcpp/security/tls_certificate_verifier.h +++ b/contrib/libs/grpc/include/grpcpp/security/tls_certificate_verifier.h @@ -214,6 +214,19 @@ class ExternalCertificateVerifier { request_map_ Y_ABSL_GUARDED_BY(mu_); }; +// A CertificateVerifier that doesn't perform any additional checks other than +// certificate verification, if specified. +// Note: using this solely without any other authentication mechanisms on the +// peer identity will leave your applications to the MITM(Man-In-The-Middle) +// attacks. Users should avoid doing so in production environments. +class NoOpCertificateVerifier : public CertificateVerifier { + public: + NoOpCertificateVerifier(); +}; + +// A CertificateVerifier that will perform hostname verification, to see if the +// target name set from the client side matches the identity information +// specified on the server's certificate. class HostNameCertificateVerifier : public CertificateVerifier { public: HostNameCertificateVerifier(); diff --git a/contrib/libs/grpc/include/grpcpp/server.h b/contrib/libs/grpc/include/grpcpp/server.h index 379a465ee3..50ee46f70e 100644 --- a/contrib/libs/grpc/include/grpcpp/server.h +++ b/contrib/libs/grpc/include/grpcpp/server.h @@ -35,13 +35,13 @@ #include <grpcpp/completion_queue.h> #include <grpcpp/health_check_service_interface.h> #include <grpcpp/impl/call.h> -#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/completion_queue.h> #include <grpcpp/impl/codegen/grpc_library.h> #include <grpcpp/impl/codegen/server_interface.h> #include <grpcpp/impl/rpc_service_method.h> #include <grpcpp/security/server_credentials.h> #include <grpcpp/support/channel_arguments.h> +#include <grpcpp/support/client_interceptor.h> #include <grpcpp/support/config.h> #include <grpcpp/support/status.h> diff --git a/contrib/libs/grpc/include/grpcpp/server_builder.h b/contrib/libs/grpc/include/grpcpp/server_builder.h index 22634ab260..3619018c5b 100644 --- a/contrib/libs/grpc/include/grpcpp/server_builder.h +++ b/contrib/libs/grpc/include/grpcpp/server_builder.h @@ -59,6 +59,7 @@ class ExternalConnectionAcceptorImpl; class CallbackGenericService; namespace experimental { +class OrcaServerInterceptorFactory; // EXPERIMENTAL API: // Interface for a grpc server to build transports with connections created out // of band. @@ -352,6 +353,7 @@ class ServerBuilder { private: friend class grpc::testing::ServerBuilderPluginTest; + friend class grpc::experimental::OrcaServerInterceptorFactory; struct SyncServerSettings { SyncServerSettings() @@ -402,6 +404,9 @@ class ServerBuilder { std::vector< std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> interceptor_creators_; + std::vector< + std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> + internal_interceptor_creators_; std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors_; grpc_server_config_fetcher* server_config_fetcher_ = nullptr; diff --git a/contrib/libs/grpc/include/grpcpp/support/async_stream.h b/contrib/libs/grpc/include/grpcpp/support/async_stream.h index 5f71701417..18e84dfcdb 100644 --- a/contrib/libs/grpc/include/grpcpp/support/async_stream.h +++ b/contrib/libs/grpc/include/grpcpp/support/async_stream.h @@ -19,6 +19,1115 @@ #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H #define GRPCPP_SUPPORT_ASYNC_STREAM_H -#include <grpcpp/impl/codegen/async_stream.h> // IWYU pragma: export +#include <grpc/grpc.h> +#include <grpcpp/impl/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/service_type.h> +#include <grpcpp/server_context.h> +#include <grpcpp/support/status.h> + +namespace grpc { + +namespace internal { +/// Common interface for all client side asynchronous streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + /// Start the call that was set up by the constructor, but only if the + /// constructor was invoked through the "Prepare" API which doesn't actually + /// start the call + virtual void StartCall(void* tag) = 0; + + /// Request notification of the reading of the initial metadata. Completion + /// will be notified by \a tag on the associated completion queue. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a AsyncReaderInterface::Read method. + /// + /// \param[in] tag Tag identifying this request. + virtual void ReadInitialMetadata(void* tag) = 0; + + /// Indicate that the stream is to be finished and request notification for + /// when the call has been ended. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method exactly once when both: + /// * the client side has no more message to send + /// (this can be declared implicitly by calling this method, or + /// explicitly through an earlier call to the <i>WritesDone</i> method + /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or + /// \a ClientAsyncReaderWriterInterface::WritesDone). + /// * there are no more messages to be received from the server (this can + /// be known implicitly by the calling code, or explicitly from an + /// earlier call to \a AsyncReaderInterface::Read that yielded a failed + /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). + /// + /// The tag will be returned when either: + /// - all incoming messages have been read and the server has returned + /// a status. + /// - the server has returned a non-OK status. + /// - the call failed for some reason and the library generated a + /// status. + /// + /// Note that implementations of this method attempt to receive initial + /// metadata from the server if initial metadata hasn't yet been received. + /// + /// \param[in] tag Tag identifying this request. + /// \param[out] status To be updated with the operation status. + virtual void Finish(grpc::Status* status, void* tag) = 0; +}; + +/// An interface that yields a sequence of messages of type \a R. +template <class R> +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a AsyncReaderInterface::Read on the same stream since reads + /// on the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + /// + /// Side effect: note that this method attempt to receive initial metadata for + /// a stream if it hasn't yet been received. + virtual void Read(R* msg, void* tag) = 0; +}; + +/// An interface that can be fed a sequence of messages of type \a W. +template <class W> +class AsyncWriterInterface { + public: + virtual ~AsyncWriterInterface() {} + + /// Request the writing of \a msg with identifying tag \a tag. + /// + /// Only one write may be outstanding at any given time. This means that + /// after calling Write, one must wait to receive \a tag from the completion + /// queue BEFORE calling Write again. + /// This is thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// + /// \param[in] msg The message to be written. + /// \param[in] tag The tag identifying the operation. + virtual void Write(const W& msg, void* tag) = 0; + + /// Request the writing of \a msg using WriteOptions \a options with + /// identifying tag \a tag. + /// + /// Only one write may be outstanding at any given time. This means that + /// after calling Write, one must wait to receive \a tag from the completion + /// queue BEFORE calling Write again. + /// WriteOptions \a options is used to set the write options of this message. + /// This is thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + virtual void Write(const W& msg, grpc::WriteOptions options, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with the writing + /// of trailing metadata, using WriteOptions \a options with + /// identifying tag \a tag. + /// + /// For client, WriteLast is equivalent of performing Write and + /// WritesDone in a single step. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until Finish is called, where \a msg and trailing metadata are coalesced + /// and write is initiated. Note that WriteLast can only buffer \a msg up to + /// the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to + /// to deallocate once Write returns. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + void WriteLast(const W& msg, grpc::WriteOptions options, void* tag) { + Write(msg, options.set_last_message(), tag); + } +}; + +} // namespace internal + +template <class R> +class ClientAsyncReaderInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncReaderInterface<R> {}; + +namespace internal { +template <class R> +class ClientAsyncReaderFactory { + public: + /// Create a stream object. + /// Write the first request out if \a start is set. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template <class W> + static ClientAsyncReader<R>* Create(grpc::ChannelInterface* channel, + grpc::CompletionQueue* cq, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + const W& request, bool start, void* tag) { + grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader<R>))) + ClientAsyncReader<R>(call, context, request, start, tag); + } +}; +} // namespace internal + +/// Async client-side API for doing server-streaming RPCs, +/// where the incoming message stream coming from the server has +/// messages of type \a R. +template <class R> +class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall(void* tag) override { + GPR_CODEGEN_ASSERT(!started_); + started_ = true; + StartCallInternal(tag); + } + + /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata + /// method for semantics. + /// + /// Side effect: + /// - upon receiving initial metadata from the server, + /// the \a ClientContext associated with this call is updated, and the + /// calling code can access the received metadata through the + /// \a ClientContext. + void ReadInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(started_); + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + read_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + read_ops_.RecvInitialMetadata(context_); + } + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible initial and trailing metadata received from the server. + void Finish(grpc::Status* status, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + finish_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); + } + + private: + friend class internal::ClientAsyncReaderFactory<R>; + template <class W> + ClientAsyncReader(grpc::internal::Call call, grpc::ClientContext* context, + const W& request, bool start, void* tag) + : context_(context), call_(call), started_(start) { + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + if (start) { + StartCallInternal(tag); + } else { + GPR_CODEGEN_ASSERT(tag == nullptr); + } + } + + void StartCallInternal(void* tag) { + init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + init_ops_.set_output_tag(tag); + call_.PerformOps(&init_ops_); + } + + grpc::ClientContext* context_; + grpc::internal::Call call_; + bool started_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + init_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage<R>> + read_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpClientRecvStatus> + finish_ops_; +}; + +/// Common interface for client side asynchronous writing. +template <class W> +class ClientAsyncWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface<W> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +namespace internal { +template <class W> +class ClientAsyncWriterFactory { + public: + /// Create a stream object. + /// Start the RPC if \a start is set + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, \a tag must be nullptr and the actual call + /// must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template <class R> + static ClientAsyncWriter<W>* Create(grpc::ChannelInterface* channel, + grpc::CompletionQueue* cq, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, R* response, + bool start, void* tag) { + grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter<W>))) + ClientAsyncWriter<W>(call, context, response, start, tag); + } +}; +} // namespace internal + +/// Async API on the client side for doing client-streaming RPCs, +/// where the outgoing message stream going to the server contains +/// messages of type \a W. +template <class W> +class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall(void* tag) override { + GPR_CODEGEN_ASSERT(!started_); + started_ = true; + StartCallInternal(tag); + } + + /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for + /// semantics. + /// + /// Side effect: + /// - upon receiving initial metadata from the server, the \a ClientContext + /// associated with this call is updated, and the calling code can access + /// the received metadata through the \a ClientContext. + void ReadInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(started_); + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); + } + + void Write(const W& msg, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + write_ops_.set_output_tag(tag); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void Write(const W& msg, grpc::WriteOptions options, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WritesDone(void* tag) override { + GPR_CODEGEN_ASSERT(started_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); + } + + /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible initial and trailing metadata received from the server. + /// - attempts to fill in the \a response parameter passed to this class's + /// constructor with the server's response message. + void Finish(grpc::Status* status, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + finish_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); + } + + private: + friend class internal::ClientAsyncWriterFactory<W>; + template <class R> + ClientAsyncWriter(grpc::internal::Call call, grpc::ClientContext* context, + R* response, bool start, void* tag) + : context_(context), call_(call), started_(start) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + if (start) { + StartCallInternal(tag); + } else { + GPR_CODEGEN_ASSERT(tag == nullptr); + } + } + + void StartCallInternal(void* tag) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + // if corked bit is set in context, we just keep the initial metadata + // buffered up to coalesce with later message send. No op is performed. + if (!context_->initial_metadata_corked_) { + write_ops_.set_output_tag(tag); + call_.PerformOps(&write_ops_); + } + } + + grpc::ClientContext* context_; + grpc::internal::Call call_; + bool started_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + write_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpGenericRecvMessage, + grpc::internal::CallOpClientRecvStatus> + finish_ops_; +}; + +/// Async client-side interface for bi-directional streaming, +/// where the client-to-server message stream has messages of type \a W, +/// and the server-to-client message stream has messages of type \a R. +template <class W, class R> +class ClientAsyncReaderWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface<W>, + public internal::AsyncReaderInterface<R> { + public: + /// Signal the client is done with the writes (half-close the client stream). + /// Thread-safe with respect to \a AsyncReaderInterface::Read + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +namespace internal { +template <class W, class R> +class ClientAsyncReaderWriterFactory { + public: + /// Create a stream object. + /// Start the RPC request if \a start is set. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter<W, R>* Create( + grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, + const grpc::internal::RpcMethod& method, grpc::ClientContext* context, + bool start, void* tag) { + grpc::internal::Call call = channel->CreateCall(method, context, cq); + + return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter<W, R>))) + ClientAsyncReaderWriter<W, R>(call, context, start, tag); + } +}; +} // namespace internal + +/// Async client-side interface for bi-directional streaming, +/// where the outgoing message stream going to the server +/// has messages of type \a W, and the incoming message stream coming +/// from the server has messages of type \a R. +template <class W, class R> +class ClientAsyncReaderWriter final + : public ClientAsyncReaderWriterInterface<W, R> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall(void* tag) override { + GPR_CODEGEN_ASSERT(!started_); + started_ = true; + StartCallInternal(tag); + } + + /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method + /// for semantics of this method. + /// + /// Side effect: + /// - upon receiving initial metadata from the server, the \a ClientContext + /// is updated with it, and then the receiving initial metadata can + /// be accessed through this \a ClientContext. + void ReadInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(started_); + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + read_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + read_ops_.RecvInitialMetadata(context_); + } + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + void Write(const W& msg, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + write_ops_.set_output_tag(tag); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void Write(const W& msg, grpc::WriteOptions options, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WritesDone(void* tag) override { + GPR_CODEGEN_ASSERT(started_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); + } + + /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. + /// Side effect + /// - the \a ClientContext associated with this call is updated with + /// possible initial and trailing metadata sent from the server. + void Finish(grpc::Status* status, void* tag) override { + GPR_CODEGEN_ASSERT(started_); + finish_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); + } + + private: + friend class internal::ClientAsyncReaderWriterFactory<W, R>; + ClientAsyncReaderWriter(grpc::internal::Call call, + grpc::ClientContext* context, bool start, void* tag) + : context_(context), call_(call), started_(start) { + if (start) { + StartCallInternal(tag); + } else { + GPR_CODEGEN_ASSERT(tag == nullptr); + } + } + + void StartCallInternal(void* tag) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + // if corked bit is set in context, we just keep the initial metadata + // buffered up to coalesce with later message send. No op is performed. + if (!context_->initial_metadata_corked_) { + write_ops_.set_output_tag(tag); + call_.PerformOps(&write_ops_); + } + } + + grpc::ClientContext* context_; + grpc::internal::Call call_; + bool started_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage<R>> + read_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose> + write_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpClientRecvStatus> + finish_ops_; +}; + +template <class W, class R> +class ServerAsyncReaderInterface + : public grpc::internal::ServerAsyncStreamingInterface, + public internal::AsyncReaderInterface<R> { + public: + /// Indicate that the stream is to be finished with a certain status code + /// and also send out \a msg response to the client. + /// Request notification for when the server has sent the response and the + /// appropriate signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous + /// \a AsyncReaderInterface::Read operation with a non-ok result, + /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if + /// some failure occurred when trying to do so. + /// + /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it + /// is safe to deallocate once Finish returns. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + /// \param[in] msg To be sent to the client as the response for this call. + virtual void Finish(const W& msg, const grpc::Status& status, void* tag) = 0; + + /// Indicate that the stream is to be finished with a certain + /// non-OK status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// This call is meant to end the call with some error, and can be called at + /// any point that the server would like to "fail" the call (though note + /// this shouldn't be called concurrently with any other "sending" call, like + /// \a AsyncWriterInterface::Write). + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), and status, or if some failure occurred + /// when trying to do so. + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once FinishWithError returns. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + /// - Note: \a status must have a non-OK code. + virtual void FinishWithError(const grpc::Status& status, void* tag) = 0; +}; + +/// Async server-side API for doing client-streaming RPCs, +/// where the incoming message stream from the client has messages of type \a R, +/// and the single response message sent from the server is type \a W. +template <class W, class R> +class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { + public: + explicit ServerAsyncReader(grpc::ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. + /// + /// Implicit input parameter: + /// - The initial metadata that will be sent to the client from this op will + /// be taken from the \a ServerContext associated with the call. + void SendInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) override { + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + /// See the \a ServerAsyncReaderInterface.Read method for semantics + /// + /// Side effect: + /// - also sends initial metadata if not alreay sent. + /// - uses the \a ServerContext associated with this call to send possible + /// initial and trailing metadata. + /// + /// Note: \a msg is not sent if \a status has a non-OK code. + /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to deallocate once Finish returns. + void Finish(const W& msg, const grpc::Status& status, void* tag) override { + finish_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.ok()) { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessage(msg)); + } else { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); + } + call_.PerformOps(&finish_ops_); + } + + /// See the \a ServerAsyncReaderInterface.Read method for semantics + /// + /// Side effect: + /// - also sends initial metadata if not alreay sent. + /// - uses the \a ServerContext associated with this call to send possible + /// initial and trailing metadata. + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once FinishWithError returns. + void FinishWithError(const grpc::Status& status, void* tag) override { + GPR_CODEGEN_ASSERT(!status.ok()); + finish_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); + } + + private: + void BindCall(grpc::internal::Call* call) override { call_ = *call; } + + grpc::internal::Call call_; + grpc::ServerContext* ctx_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + finish_ops_; +}; + +template <class W> +class ServerAsyncWriterInterface + : public grpc::internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface<W> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation with a non-ok + /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if + /// some failure occurred when trying to do so. + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const grpc::Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish + /// in a single step. + /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to deallocate once WriteAndFinish returns. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, + const grpc::Status& status, void* tag) = 0; +}; + +/// Async server-side API for doing server streaming RPCs, +/// where the outgoing message stream from the server has messages of type \a W. +template <class W> +class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { + public: + explicit ServerAsyncWriter(grpc::ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. + /// + /// Implicit input parameter: + /// - The initial metadata that will be sent to the client from this op will + /// be taken from the \a ServerContext associated with the call. + /// + /// \param[in] tag Tag identifying this request. + void SendInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_ops_); + } + + void Write(const W& msg, void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void Write(const W& msg, grpc::WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + EnsureInitialMetadataSent(&write_ops_); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. + /// + /// Implicit input parameter: + /// - the \a ServerContext associated with this call is used + /// for sending trailing (and initial) metadata to the client. + /// + /// Note: \a status must have an OK code. + /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to deallocate once WriteAndFinish returns. + void WriteAndFinish(const W& msg, grpc::WriteOptions options, + const grpc::Status& status, void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + + /// See the \a ServerAsyncWriterInterface.Finish method for semantics. + /// + /// Implicit input parameter: + /// - the \a ServerContext associated with this call is used for sending + /// trailing (and initial if not already sent) metadata to the client. + /// + /// Note: there are no restrictions are the code of + /// \a status,it may be non-OK + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + void Finish(const grpc::Status& status, void* tag) override { + finish_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&finish_ops_); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); + } + + private: + void BindCall(grpc::internal::Call* call) override { call_ = *call; } + + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + + grpc::internal::Call call_; + grpc::ServerContext* ctx_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + write_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + finish_ops_; +}; + +/// Server-side interface for asynchronous bi-directional streaming. +template <class W, class R> +class ServerAsyncReaderWriterInterface + : public grpc::internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface<W>, + public internal::AsyncReaderInterface<R> { + public: + /// Indicate that the stream is to be finished with a certain status code. + /// Request notification for when the server has sent the appropriate + /// signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// It is appropriate to call this method when either: + /// * all messages from the client have been received (either known + /// implictly, or explicitly because a previous \a + /// AsyncReaderInterface::Read operation + /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' + /// with 'false'. + /// * it is desired to end the call early with some non-OK status code. + /// + /// This operation will end when the server has finished sending out initial + /// metadata (if not sent already), response message, and status, or if some + /// failure occurred when trying to do so. + /// + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of this call. + virtual void Finish(const grpc::Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with + /// identifying tag \a tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to deallocate once WriteAndFinish returns. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, + const grpc::Status& status, void* tag) = 0; +}; + +/// Async server-side API for doing bidirectional streaming RPCs, +/// where the incoming message stream coming from the client has messages of +/// type \a R, and the outgoing message stream coming from the server has +/// messages of type \a W. +template <class W, class R> +class ServerAsyncReaderWriter final + : public ServerAsyncReaderWriterInterface<W, R> { + public: + explicit ServerAsyncReaderWriter(grpc::ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. + /// + /// Implicit input parameter: + /// - The initial metadata that will be sent to the client from this op will + /// be taken from the \a ServerContext associated with the call. + /// + /// \param[in] tag Tag identifying this request. + void SendInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) override { + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + void Write(const W& msg, void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void Write(const W& msg, grpc::WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + EnsureInitialMetadataSent(&write_ops_); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish + /// method for semantics. + /// + /// Implicit input parameter: + /// - the \a ServerContext associated with this call is used + /// for sending trailing (and initial) metadata to the client. + /// + /// Note: \a status must have an OK code. + // + /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it + /// is safe to deallocate once WriteAndFinish returns. + void WriteAndFinish(const W& msg, grpc::WriteOptions options, + const grpc::Status& status, void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + + /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. + /// + /// Implicit input parameter: + /// - the \a ServerContext associated with this call is used for sending + /// trailing (and initial if not already sent) metadata to the client. + /// + /// Note: there are no restrictions are the code of \a status, + /// it may be non-OK + // + /// gRPC doesn't take ownership or a reference to \a status, so it is safe to + /// to deallocate once Finish returns. + void Finish(const grpc::Status& status, void* tag) override { + finish_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&finish_ops_); + + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); + } + + private: + friend class grpc::Server; + + void BindCall(grpc::internal::Call* call) override { call_ = *call; } + + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + + grpc::internal::Call call_; + grpc::ServerContext* ctx_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + write_ops_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + finish_ops_; +}; + +} // namespace grpc #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H diff --git a/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h b/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h index 02071e48c5..a9886031f4 100644 --- a/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h +++ b/contrib/libs/grpc/include/grpcpp/support/async_unary_call.h @@ -19,6 +19,402 @@ #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H -#include <grpcpp/impl/codegen/async_unary_call.h> // IWYU pragma: export +#include <grpc/grpc.h> +#include <grpcpp/client_context.h> +#include <grpcpp/impl/call.h> +#include <grpcpp/impl/call_op_set_interface.h> +#include <grpcpp/impl/codegen/call_op_set.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/service_type.h> +#include <grpcpp/server_context.h> +#include <grpcpp/support/status.h> + +namespace grpc { + +// Forward declaration for use in Helper class +template <class R> +class ClientAsyncResponseReader; + +/// An interface relevant for async client side unary RPCs (which send +/// one request message to a server and receive one response message). +template <class R> +class ClientAsyncResponseReaderInterface { + public: + virtual ~ClientAsyncResponseReaderInterface() {} + + /// Start the call that was set up by the constructor, but only if the + /// constructor was invoked through the "Prepare" API which doesn't actually + /// start the call + virtual void StartCall() = 0; + + /// Request notification of the reading of initial metadata. Completion + /// will be notified by \a tag on the associated completion queue. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// \param[in] tag Tag identifying this request. + virtual void ReadInitialMetadata(void* tag) = 0; + + /// Request to receive the server's response \a msg and final \a status for + /// the call, and to notify \a tag on this call's completion queue when + /// finished. + /// + /// This function will return when either: + /// - when the server's response message and status have been received. + /// - when the server has returned a non-OK status (no message expected in + /// this case). + /// - when the call failed for some reason and the library generated a + /// non-OK status. + /// + /// \param[in] tag Tag identifying this request. + /// \param[out] status To be updated with the operation status. + /// \param[out] msg To be filled in with the server's response message. + virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0; +}; + +namespace internal { + +class ClientAsyncResponseReaderHelper { + public: + /// Start a call and write the request out if \a start is set. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// + /// Optionally pass in a base class for request and response types so that the + /// internal functions and structs can be templated based on that, allowing + /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors + /// can't have an explicit template parameter, the last argument is an + /// extraneous parameter just to provide the needed type information. + template <class R, class W, class BaseR = R, class BaseW = W> + static ClientAsyncResponseReader<R>* Create( + grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, + const grpc::internal::RpcMethod& method, grpc::ClientContext* context, + const W& request) /* __attribute__((noinline)) */ { + grpc::internal::Call call = channel->CreateCall(method, context, cq); + ClientAsyncResponseReader<R>* result = + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader<R>))) + ClientAsyncResponseReader<R>(call, context); + SetupRequest<BaseR, BaseW>( + call.call(), &result->single_buf_, &result->read_initial_metadata_, + &result->finish_, static_cast<const BaseW&>(request)); + + return result; + } + + // Various helper functions to reduce templating use + + template <class R, class W> + static void SetupRequest( + grpc_call* call, + grpc::internal::CallOpSendInitialMetadata** single_buf_ptr, + std::function<void(ClientContext*, internal::Call*, + internal::CallOpSendInitialMetadata*, void*)>* + read_initial_metadata, + std::function< + void(ClientContext*, internal::Call*, bool initial_metadata_read, + internal::CallOpSendInitialMetadata*, + internal::CallOpSetInterface**, void*, Status*, void*)>* finish, + const W& request) { + using SingleBufType = + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpClientSendClose, + grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage<R>, + grpc::internal::CallOpClientRecvStatus>; + SingleBufType* single_buf = + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(SingleBufType))) SingleBufType; + *single_buf_ptr = single_buf; + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok()); + single_buf->ClientSendClose(); + + // The purpose of the following functions is to type-erase the actual + // templated type of the CallOpSet being used by hiding that type inside the + // function definition rather than specifying it as an argument of the + // function or a member of the class. The type-erased CallOpSet will get + // static_cast'ed back to the real type so that it can be used properly. + *read_initial_metadata = + [](ClientContext* context, internal::Call* call, + internal::CallOpSendInitialMetadata* single_buf_view, void* tag) { + auto* single_buf = static_cast<SingleBufType*>(single_buf_view); + single_buf->set_output_tag(tag); + single_buf->RecvInitialMetadata(context); + call->PerformOps(single_buf); + }; + + // Note that this function goes one step further than the previous one + // because it type-erases the message being written down to a void*. This + // will be static-cast'ed back to the class specified here by hiding that + // class information inside the function definition. Note that this feature + // expects the class being specified here for R to be a base-class of the + // "real" R without any multiple-inheritance (as applies in protbuf wrt + // MessageLite) + *finish = [](ClientContext* context, internal::Call* call, + bool initial_metadata_read, + internal::CallOpSendInitialMetadata* single_buf_view, + internal::CallOpSetInterface** finish_buf_ptr, void* msg, + Status* status, void* tag) { + if (initial_metadata_read) { + using FinishBufType = + grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>, + grpc::internal::CallOpClientRecvStatus>; + FinishBufType* finish_buf = + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call->call(), sizeof(FinishBufType))) FinishBufType; + *finish_buf_ptr = finish_buf; + finish_buf->set_output_tag(tag); + finish_buf->RecvMessage(static_cast<R*>(msg)); + finish_buf->AllowNoMessage(); + finish_buf->ClientRecvStatus(context, status); + call->PerformOps(finish_buf); + } else { + auto* single_buf = static_cast<SingleBufType*>(single_buf_view); + single_buf->set_output_tag(tag); + single_buf->RecvInitialMetadata(context); + single_buf->RecvMessage(static_cast<R*>(msg)); + single_buf->AllowNoMessage(); + single_buf->ClientRecvStatus(context, status); + call->PerformOps(single_buf); + } + }; + } + + static void StartCall(grpc::ClientContext* context, + grpc::internal::CallOpSendInitialMetadata* single_buf) { + single_buf->SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + } +}; + +// TODO(vjpai): This templated factory is deprecated and will be replaced by +//. the non-templated helper as soon as possible. +template <class R> +class ClientAsyncResponseReaderFactory { + public: + template <class W> + static ClientAsyncResponseReader<R>* Create( + grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, + const grpc::internal::RpcMethod& method, grpc::ClientContext* context, + const W& request, bool start) { + auto* result = ClientAsyncResponseReaderHelper::Create<R>( + channel, cq, method, context, request); + if (start) { + result->StartCall(); + } + return result; + } +}; + +} // namespace internal + +/// Async API for client-side unary RPCs, where the message response +/// received from the server is of type \a R. +template <class R> +class ClientAsyncResponseReader final + : public ClientAsyncResponseReaderInterface<R> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall() override { + GPR_CODEGEN_DEBUG_ASSERT(!started_); + started_ = true; + internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); + } + + /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for + /// semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible initial and trailing metadata sent from the server. + void ReadInitialMetadata(void* tag) override { + GPR_CODEGEN_DEBUG_ASSERT(started_); + GPR_CODEGEN_DEBUG_ASSERT(!context_->initial_metadata_received_); + read_initial_metadata_(context_, &call_, single_buf_, tag); + initial_metadata_read_ = true; + } + + /// See \a ClientAsyncResponseReaderInterface::Finish for semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible initial and trailing metadata sent from the server. + void Finish(R* msg, grpc::Status* status, void* tag) override { + GPR_CODEGEN_DEBUG_ASSERT(started_); + finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, + static_cast<void*>(msg), status, tag); + } + + private: + friend class internal::ClientAsyncResponseReaderHelper; + grpc::ClientContext* const context_; + grpc::internal::Call call_; + bool started_ = false; + bool initial_metadata_read_ = false; + + ClientAsyncResponseReader(grpc::internal::Call call, + grpc::ClientContext* context) + : context_(context), call_(call) {} + + // disable operator new + static void* operator new(std::size_t size); + static void* operator new(std::size_t /*size*/, void* p) { return p; } + + internal::CallOpSendInitialMetadata* single_buf_; + internal::CallOpSetInterface* finish_buf_ = nullptr; + std::function<void(ClientContext*, internal::Call*, + internal::CallOpSendInitialMetadata*, void*)> + read_initial_metadata_; + std::function<void(ClientContext*, internal::Call*, + bool initial_metadata_read, + internal::CallOpSendInitialMetadata*, + internal::CallOpSetInterface**, void*, Status*, void*)> + finish_; +}; + +/// Async server-side API for handling unary calls, where the single +/// response message sent to the client is of type \a W. +template <class W> +class ServerAsyncResponseWriter final + : public grpc::internal::ServerAsyncStreamingInterface { + public: + explicit ServerAsyncResponseWriter(grpc::ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. + /// + /// Side effect: + /// The initial metadata that will be sent to the client from this op will + /// be taken from the \a ServerContext associated with the call. + /// + /// \param[in] tag Tag identifying this request. + void SendInitialMetadata(void* tag) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.set_output_tag(tag); + meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); + } + + /// Indicate that the stream is to be finished and request notification + /// when the server has sent the appropriate signals to the client to + /// end the call. Should not be used concurrently with other operations. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of the call. + /// \param[in] msg Message to be sent to the client. + /// + /// Side effect: + /// - also sends initial metadata if not already sent (using the + /// \a ServerContext associated with this call). + /// + /// Note: if \a status has a non-OK code, then \a msg will not be sent, + /// and the client will receive only the status with possible trailing + /// metadata. + /// + /// gRPC doesn't take ownership or a reference to msg and status, so it is + /// safe to deallocate them once the Finish operation is complete (i.e. a + /// result arrives in the completion queue). + void Finish(const W& msg, const grpc::Status& status, void* tag) { + finish_buf_.set_output_tag(tag); + finish_buf_.set_core_cq_tag(&finish_buf_); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.ok()) { + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_buf_.SendMessage(msg)); + } else { + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); + } + call_.PerformOps(&finish_buf_); + } + + /// Indicate that the stream is to be finished with a non-OK status, + /// and request notification for when the server has finished sending the + /// appropriate signals to the client to end the call. + /// Should not be used concurrently with other operations. + /// + /// \param[in] tag Tag identifying this request. + /// \param[in] status To be sent to the client as the result of the call. + /// - Note: \a status must have a non-OK code. + /// + /// Side effect: + /// - also sends initial metadata if not already sent (using the + /// \a ServerContext associated with this call). + /// + /// gRPC doesn't take ownership or a reference to status, so it is safe to + /// deallocate them once the Finish operation is complete (i.e. a result + /// arrives in the completion queue). + void FinishWithError(const grpc::Status& status, void* tag) { + GPR_CODEGEN_ASSERT(!status.ok()); + finish_buf_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + void BindCall(grpc::internal::Call* call) override { call_ = *call; } + + grpc::internal::Call call_; + grpc::ServerContext* ctx_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> + meta_buf_; + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + finish_buf_; +}; + +} // namespace grpc + +namespace std { +template <class R> +class default_delete<grpc::ClientAsyncResponseReader<R>> { + public: + void operator()(void* /*p*/) {} +}; +template <class R> +class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { + public: + void operator()(void* /*p*/) {} +}; +} // namespace std #endif // GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H diff --git a/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h b/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h index 6f566e76da..7ca7c4b258 100644 --- a/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h +++ b/contrib/libs/grpc/include/grpcpp/support/byte_buffer.h @@ -19,13 +19,221 @@ #ifndef GRPCPP_SUPPORT_BYTE_BUFFER_H #define GRPCPP_SUPPORT_BYTE_BUFFER_H +#include <vector> + #include <grpc/byte_buffer.h> #include <grpc/grpc.h> #include <grpc/support/log.h> -#include <grpcpp/impl/codegen/byte_buffer.h> // IWYU pragma: export +#include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/serialization_traits.h> #include <grpcpp/support/config.h> #include <grpcpp/support/slice.h> #include <grpcpp/support/status.h> +namespace grpc { + +class ServerInterface; +class ByteBuffer; +class ServerInterface; + +namespace internal { +template <class RequestType, class ResponseType> +class CallbackUnaryHandler; +template <class RequestType, class ResponseType> +class CallbackServerStreamingHandler; +template <class RequestType> +void* UnaryDeserializeHelper(grpc_byte_buffer*, grpc::Status*, RequestType*); +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler; +template <grpc::StatusCode code> +class ErrorMethodHandler; +class CallOpSendMessage; +template <class R> +class CallOpRecvMessage; +class CallOpGenericRecvMessage; +class ExternalConnectionAcceptorImpl; +template <class R> +class DeserializeFuncType; +class GrpcByteBufferPeer; + +} // namespace internal +/// A sequence of bytes. +class ByteBuffer final { + public: + /// Constuct an empty buffer. + ByteBuffer() : buffer_(nullptr) {} + + /// Construct buffer from \a slices, of which there are \a nslices. + ByteBuffer(const Slice* slices, size_t nslices) { + // The following assertions check that the representation of a grpc::Slice + // is identical to that of a grpc_slice: it has a grpc_slice field, and + // nothing else. + static_assert(std::is_same<decltype(slices[0].slice_), grpc_slice>::value, + "Slice must have same representation as grpc_slice"); + static_assert(sizeof(Slice) == sizeof(grpc_slice), + "Slice must have same representation as grpc_slice"); + // The following assertions check that the representation of a ByteBuffer is + // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field, + // and nothing else. + static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value, + "ByteBuffer must have same representation as " + "grpc_byte_buffer*"); + static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*), + "ByteBuffer must have same representation as " + "grpc_byte_buffer*"); + // The const_cast is legal if grpc_raw_byte_buffer_create() does no more + // than its advertised side effect of increasing the reference count of the + // slices it processes, and such an increase does not affect the semantics + // seen by the caller of this constructor. + buffer_ = g_core_codegen_interface->grpc_raw_byte_buffer_create( + reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); + } + + /// Constuct a byte buffer by referencing elements of existing buffer + /// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not + /// a deep copy; it is just a referencing. As a result, its performance is + /// size-independent. + ByteBuffer(const ByteBuffer& buf) : buffer_(nullptr) { operator=(buf); } + + ~ByteBuffer() { + if (buffer_) { + g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_); + } + } + + /// Wrapper of core function grpc_byte_buffer_copy . This is not + /// a deep copy; it is just a referencing. As a result, its performance is + /// size-independent. + ByteBuffer& operator=(const ByteBuffer& buf) { + if (this != &buf) { + Clear(); // first remove existing data + } + if (buf.buffer_) { + // then copy + buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buf.buffer_); + } + return *this; + } + + // If this ByteBuffer's representation is a single flat slice, returns a + // slice referencing that array. + Status TrySingleSlice(Slice* slice) const; + + /// Dump (read) the buffer contents into \a slics. + Status DumpToSingleSlice(Slice* slice) const; + + /// Dump (read) the buffer contents into \a slices. + Status Dump(std::vector<Slice>* slices) const; + + /// Remove all data. + void Clear() { + if (buffer_) { + g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_); + buffer_ = nullptr; + } + } + + /// Make a duplicate copy of the internals of this byte + /// buffer so that we have our own owned version of it. + /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable. + /// This is not a deep copy; it is a referencing and its performance + /// is size-independent. + void Duplicate() { + buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buffer_); + } + + /// Forget underlying byte buffer without destroying + /// Use this only for un-owned byte buffers + void Release() { buffer_ = nullptr; } + + /// Buffer size in bytes. + size_t Length() const { + return buffer_ == nullptr + ? 0 + : g_core_codegen_interface->grpc_byte_buffer_length(buffer_); + } + + /// Swap the state of *this and *other. + void Swap(ByteBuffer* other) { + grpc_byte_buffer* tmp = other->buffer_; + other->buffer_ = buffer_; + buffer_ = tmp; + } + + /// Is this ByteBuffer valid? + bool Valid() const { return (buffer_ != nullptr); } + + private: + friend class SerializationTraits<ByteBuffer, void>; + friend class ServerInterface; + friend class internal::CallOpSendMessage; + template <class R> + friend class internal::CallOpRecvMessage; + friend class internal::CallOpGenericRecvMessage; + template <class RequestType> + friend void* internal::UnaryDeserializeHelper(grpc_byte_buffer*, + grpc::Status*, RequestType*); + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ServerStreamingHandler; + template <class RequestType, class ResponseType> + friend class internal::CallbackUnaryHandler; + template <class RequestType, class ResponseType> + friend class internal::CallbackServerStreamingHandler; + template <StatusCode code> + friend class internal::ErrorMethodHandler; + template <class R> + friend class internal::DeserializeFuncType; + friend class ProtoBufferReader; + friend class ProtoBufferWriter; + friend class internal::GrpcByteBufferPeer; + friend class internal::ExternalConnectionAcceptorImpl; + + grpc_byte_buffer* buffer_; + + // takes ownership + void set_buffer(grpc_byte_buffer* buf) { + if (buffer_) { + Clear(); + } + buffer_ = buf; + } + + grpc_byte_buffer* c_buffer() { return buffer_; } + grpc_byte_buffer** c_buffer_ptr() { return &buffer_; } + + class ByteBufferPointer { + public: + /* NOLINTNEXTLINE(google-explicit-constructor) */ + ByteBufferPointer(const ByteBuffer* b) + : bbuf_(const_cast<ByteBuffer*>(b)) {} + /* NOLINTNEXTLINE(google-explicit-constructor) */ + operator ByteBuffer*() { return bbuf_; } + /* NOLINTNEXTLINE(google-explicit-constructor) */ + operator grpc_byte_buffer*() { return bbuf_->buffer_; } + /* NOLINTNEXTLINE(google-explicit-constructor) */ + operator grpc_byte_buffer**() { return &bbuf_->buffer_; } + + private: + ByteBuffer* bbuf_; + }; + ByteBufferPointer bbuf_ptr() const { return ByteBufferPointer(this); } +}; + +template <> +class SerializationTraits<ByteBuffer, void> { + public: + static Status Deserialize(ByteBuffer* byte_buffer, ByteBuffer* dest) { + dest->set_buffer(byte_buffer->buffer_); + return Status::OK; + } + static Status Serialize(const ByteBuffer& source, ByteBuffer* buffer, + bool* own_buffer) { + *buffer = source; + *own_buffer = true; + return g_core_codegen_interface->ok(); + } +}; + +} // namespace grpc + #endif // GRPCPP_SUPPORT_BYTE_BUFFER_H diff --git a/contrib/libs/grpc/include/grpcpp/support/client_callback.h b/contrib/libs/grpc/include/grpcpp/support/client_callback.h new file mode 100644 index 0000000000..c15bca0dbe --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/client_callback.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H +#define GRPCPP_SUPPORT_CLIENT_CALLBACK_H + +#include <grpcpp/impl/codegen/client_callback.h> // IWYU pragma: export + +#endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H diff --git a/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h b/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h new file mode 100644 index 0000000000..8e5e1ce67b --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/client_interceptor.h @@ -0,0 +1,197 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H +#define GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H + +#include <memory> +#include <vector> + +#include <grpcpp/impl/rpc_method.h> +#include <grpcpp/support/interceptor.h> +#include <grpcpp/support/string_ref.h> + +namespace grpc { + +class Channel; +class ClientContext; + +namespace internal { +class InterceptorBatchMethodsImpl; +} + +namespace experimental { +class ClientRpcInfo; + +// A factory interface for creation of client interceptors. A vector of +// factories can be provided at channel creation which will be used to create a +// new vector of client interceptors per RPC. Client interceptor authors should +// create a subclass of ClientInterceptorFactorInterface which creates objects +// of their interceptors. +class ClientInterceptorFactoryInterface { + public: + virtual ~ClientInterceptorFactoryInterface() {} + // Returns a pointer to an Interceptor object on successful creation, nullptr + // otherwise. If nullptr is returned, this server interceptor factory is + // ignored for the purposes of that RPC. + virtual Interceptor* CreateClientInterceptor(ClientRpcInfo* info) = 0; +}; +} // namespace experimental + +namespace internal { +extern experimental::ClientInterceptorFactoryInterface* + g_global_client_interceptor_factory; +} + +/// ClientRpcInfo represents the state of a particular RPC as it +/// appears to an interceptor. It is created and owned by the library and +/// passed to the CreateClientInterceptor method of the application's +/// ClientInterceptorFactoryInterface implementation +namespace experimental { +class ClientRpcInfo { + public: + // TODO(yashykt): Stop default-constructing ClientRpcInfo and remove UNKNOWN + // from the list of possible Types. + /// Type categorizes RPCs by unary or streaming type + enum class Type { + UNARY, + CLIENT_STREAMING, + SERVER_STREAMING, + BIDI_STREAMING, + UNKNOWN // UNKNOWN is not API and will be removed later + }; + + ~ClientRpcInfo() {} + + // Delete copy constructor but allow default move constructor + ClientRpcInfo(const ClientRpcInfo&) = delete; + ClientRpcInfo(ClientRpcInfo&&) = default; + + // Getter methods + + /// Return the fully-specified method name + const char* method() const { return method_; } + + /// Return an identifying suffix for the client stub, or nullptr if one wasn't + /// specified. + const char* suffix_for_stats() const { return suffix_for_stats_; } + + /// Return a pointer to the channel on which the RPC is being sent + ChannelInterface* channel() { return channel_; } + + /// Return a pointer to the underlying ClientContext structure associated + /// with the RPC to support features that apply to it + grpc::ClientContext* client_context() { return ctx_; } + + /// Return the type of the RPC (unary or a streaming flavor) + Type type() const { return type_; } + + private: + static_assert(Type::UNARY == + static_cast<Type>(internal::RpcMethod::NORMAL_RPC), + "violated expectation about Type enum"); + static_assert(Type::CLIENT_STREAMING == + static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::SERVER_STREAMING == + static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::BIDI_STREAMING == + static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), + "violated expectation about Type enum"); + + // Default constructor should only be used by ClientContext + ClientRpcInfo() = default; + + // Constructor will only be called from ClientContext + ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type, + const char* method, const char* suffix_for_stats, + grpc::ChannelInterface* channel) + : ctx_(ctx), + type_(static_cast<Type>(type)), + method_(method), + suffix_for_stats_(suffix_for_stats), + channel_(channel) {} + + // Move assignment should only be used by ClientContext + // TODO(yashykt): Delete move assignment + ClientRpcInfo& operator=(ClientRpcInfo&&) = default; + + // Runs interceptor at pos \a pos. + void RunInterceptor( + experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) { + GPR_CODEGEN_ASSERT(pos < interceptors_.size()); + interceptors_[pos]->Intercept(interceptor_methods); + } + + void RegisterInterceptors( + const std::vector<std::unique_ptr< + experimental::ClientInterceptorFactoryInterface>>& creators, + size_t interceptor_pos) { + if (interceptor_pos > creators.size()) { + // No interceptors to register + return; + } + // NOTE: The following is not a range-based for loop because it will only + // iterate over a portion of the creators vector. + for (auto it = creators.begin() + interceptor_pos; it != creators.end(); + ++it) { + auto* interceptor = (*it)->CreateClientInterceptor(this); + if (interceptor != nullptr) { + interceptors_.push_back( + std::unique_ptr<experimental::Interceptor>(interceptor)); + } + } + if (internal::g_global_client_interceptor_factory != nullptr) { + interceptors_.push_back(std::unique_ptr<experimental::Interceptor>( + internal::g_global_client_interceptor_factory + ->CreateClientInterceptor(this))); + } + } + + grpc::ClientContext* ctx_ = nullptr; + // TODO(yashykt): make type_ const once move-assignment is deleted + Type type_{Type::UNKNOWN}; + const char* method_ = nullptr; + const char* suffix_for_stats_ = nullptr; + grpc::ChannelInterface* channel_ = nullptr; + std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; + bool hijacked_ = false; + size_t hijacked_interceptor_ = 0; + + friend class internal::InterceptorBatchMethodsImpl; + friend class grpc::ClientContext; +}; + +// PLEASE DO NOT USE THIS. ALWAYS PREFER PER CHANNEL INTERCEPTORS OVER A GLOBAL +// INTERCEPTOR. IF USAGE IS ABSOLUTELY NECESSARY, PLEASE READ THE SAFETY NOTES. +// Registers a global client interceptor factory object, which is used for all +// RPCs made in this process. The application is responsible for maintaining the +// life of the object while gRPC operations are in progress. The global +// interceptor factory should only be registered once at the start of the +// process before any gRPC operations have begun. +void RegisterGlobalClientInterceptorFactory( + ClientInterceptorFactoryInterface* factory); + +// For testing purposes only +void TestOnlyResetGlobalClientInterceptorFactory(); + +} // namespace experimental +} // namespace grpc + +#endif // GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H diff --git a/contrib/libs/grpc/include/grpcpp/support/interceptor.h b/contrib/libs/grpc/include/grpcpp/support/interceptor.h new file mode 100644 index 0000000000..d4f2ea180e --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/interceptor.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_INTERCEPTOR_H +#define GRPCPP_SUPPORT_INTERCEPTOR_H + +#include <grpcpp/impl/codegen/interceptor.h> // IWYU pragma: export + +#endif // GRPCPP_SUPPORT_INTERCEPTOR_H diff --git a/contrib/libs/grpc/include/grpcpp/support/method_handler.h b/contrib/libs/grpc/include/grpcpp/support/method_handler.h new file mode 100644 index 0000000000..0b97a7af03 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/method_handler.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H +#define GRPCPP_SUPPORT_METHOD_HANDLER_H + +#include <grpcpp/impl/codegen/method_handler.h> // IWYU pragma: export + +#endif // GRPCPP_SUPPORT_METHOD_HANDLER_H diff --git a/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h b/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h new file mode 100644 index 0000000000..ad9c7a1869 --- /dev/null +++ b/contrib/libs/grpc/include/grpcpp/support/server_interceptor.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H +#define GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H + +#include <grpcpp/impl/codegen/server_interceptor.h> // IWYU pragma: export + +#endif // GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H |