diff options
author | leonidlazarev <leonidlazarev@yandex-team.com> | 2023-07-17 19:35:29 +0300 |
---|---|---|
committer | leonidlazarev <leonidlazarev@yandex-team.com> | 2023-07-17 19:35:29 +0300 |
commit | cb8e9a6330e4e5d9a0e2f8506e7469bbd641ec63 (patch) | |
tree | eddb7b81e7d1f5a7ac8078591799509e95572f4e /contrib/libs/grpc/include/grpcpp/support/method_handler.h | |
parent | 029cf29f3669091012394221f00dfa0f3631d91b (diff) | |
download | ydb-cb8e9a6330e4e5d9a0e2f8506e7469bbd641ec63.tar.gz |
feat grpc: update to grpc 1.53.1
update grpc to 1.53.1
update grpcio/py3 to 1.53.1
Added patches:
22-grpc-code-output.patch - allow translation of grpc code to internal string type.
23-max-thread-limitation.patch - to provide interface for settings of thread number limit, as
grpc::DynamicThreadPool doesn't provide interface to limit thread number anymore.
24-support_for-non-abort-grpc.patch - generate exception instead of application crash
25-forkable-destruction-order.patch - correct forkable logic for TimerManager
27-skip-child-post-fork-operations.patch - allow to skip child post fork operations to exclude UB (used for unified agent only)
pr33495_fox_nested_fork.patch - fix issues with nested forks
pr33582_fork_handler.patch - disable fork handler support if it is not requested intentionally
Diffstat (limited to 'contrib/libs/grpc/include/grpcpp/support/method_handler.h')
-rw-r--r-- | contrib/libs/grpc/include/grpcpp/support/method_handler.h | 417 |
1 files changed, 399 insertions, 18 deletions
diff --git a/contrib/libs/grpc/include/grpcpp/support/method_handler.h b/contrib/libs/grpc/include/grpcpp/support/method_handler.h index 0b97a7af03..e4ec4a88d8 100644 --- a/contrib/libs/grpc/include/grpcpp/support/method_handler.h +++ b/contrib/libs/grpc/include/grpcpp/support/method_handler.h @@ -1,24 +1,405 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H #define GRPCPP_SUPPORT_METHOD_HANDLER_H -#include <grpcpp/impl/codegen/method_handler.h> // IWYU pragma: export +#include <grpc/byte_buffer.h> +#include <grpc/support/log.h> +#include <grpcpp/impl/rpc_service_method.h> +#include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/sync_stream.h> + +namespace grpc { + +namespace internal { + +// Invoke the method handler, fill in the status, and +// return whether or not we finished safely (without an exception). +// Note that exception handling is 0-cost in most compiler/library +// implementations (except when an exception is actually thrown), +// so this process doesn't require additional overhead in the common case. +// Additionally, we don't need to return if we caught an exception or not; +// the handling is the same in either case. +template <class Callable> +::grpc::Status CatchingFunctionHandler(Callable&& handler) { +#if GRPC_ALLOW_EXCEPTIONS + try { + return handler(); + } catch (...) { + return grpc::Status(grpc::StatusCode::UNKNOWN, + "Unexpected error in RPC handling"); + } +#else // GRPC_ALLOW_EXCEPTIONS + return handler(); +#endif // GRPC_ALLOW_EXCEPTIONS +} + +/// A helper function with reduced templating to do the common work needed to +/// actually send the server response. Uses non-const parameter for Status since +/// this should only ever be called from the end of the RunHandler method. + +template <class ResponseType> +void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param, + ResponseType* rsp, grpc::Status& status) { + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + ops; + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + if (status.ok()) { + status = ops.SendMessagePtr(rsp); + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); +} + +/// A helper function with reduced templating to do deserializing. + +template <class RequestType> +void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status, + RequestType* request) { + grpc::ByteBuffer buf; + buf.set_buffer(req); + *status = grpc::SerializationTraits<RequestType>::Deserialize( + &buf, static_cast<RequestType*>(request)); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; +} + +/// A wrapper class of an application provided rpc method handler. +template <class ServiceType, class RequestType, class ResponseType, + class BaseRequestType = RequestType, + class BaseResponseType = ResponseType> +class RpcMethodHandler : public grpc::internal::MethodHandler { + public: + RpcMethodHandler( + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + const RequestType*, ResponseType*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) final { + ResponseType rsp; + grpc::Status status = param.status; + if (status.ok()) { + status = CatchingFunctionHandler([this, ¶m, &rsp] { + return func_(service_, + static_cast<grpc::ServerContext*>(param.server_context), + static_cast<RequestType*>(param.request), &rsp); + }); + static_cast<RequestType*>(param.request)->~RequestType(); + } + UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + grpc::Status* status, void** /*handler_data*/) final { + auto* request = + new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType; + return UnaryDeserializeHelper(req, status, + static_cast<BaseRequestType*>(request)); + } + + private: + /// Application provided rpc handler function. + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + const RequestType*, ResponseType*)> + func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + +/// A wrapper class of an application provided client streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class ClientStreamingHandler : public grpc::internal::MethodHandler { + public: + ClientStreamingHandler( + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + ServerReader<RequestType>*, ResponseType*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) final { + ServerReader<RequestType> reader( + param.call, static_cast<grpc::ServerContext*>(param.server_context)); + ResponseType rsp; + grpc::Status status = + CatchingFunctionHandler([this, ¶m, &reader, &rsp] { + return func_(service_, + static_cast<grpc::ServerContext*>(param.server_context), + &reader, &rsp); + }); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpServerSendStatus> + ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + } + if (status.ok()) { + status = ops.SendMessagePtr(&rsp); + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + ServerReader<RequestType>*, ResponseType*)> + func_; + ServiceType* service_; +}; + +/// A wrapper class of an application provided server streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler : public grpc::internal::MethodHandler { + public: + ServerStreamingHandler(std::function<grpc::Status( + ServiceType*, grpc::ServerContext*, + const RequestType*, ServerWriter<ResponseType>*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) final { + grpc::Status status = param.status; + if (status.ok()) { + ServerWriter<ResponseType> writer( + param.call, static_cast<grpc::ServerContext*>(param.server_context)); + status = CatchingFunctionHandler([this, ¶m, &writer] { + return func_(service_, + static_cast<grpc::ServerContext*>(param.server_context), + static_cast<RequestType*>(param.request), &writer); + }); + static_cast<RequestType*>(param.request)->~RequestType(); + } + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + if (param.server_context->has_pending_ops_) { + param.call->cq()->Pluck(¶m.server_context->pending_ops_); + } + param.call->cq()->Pluck(&ops); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + grpc::Status* status, void** /*handler_data*/) final { + grpc::ByteBuffer buf; + buf.set_buffer(req); + auto* request = + new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType(); + *status = + grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function<grpc::Status(ServiceType*, grpc::ServerContext*, + const RequestType*, ServerWriter<ResponseType>*)> + func_; + ServiceType* service_; +}; + +/// A wrapper class of an application provided bidi-streaming handler. +/// This also applies to server-streamed implementation of a unary method +/// with the additional requirement that such methods must have done a +/// write for status to be ok +/// Since this is used by more than 1 class, the service is not passed in. +/// Instead, it is expected to be an implicitly-captured argument of func +/// (through bind or something along those lines) +template <class Streamer, bool WriteNeeded> +class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler { + public: + explicit TemplatedBidiStreamingHandler( + std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func) + : func_(func), write_needed_(WriteNeeded) {} + + void RunHandler(const HandlerParameter& param) final { + Streamer stream(param.call, + static_cast<grpc::ServerContext*>(param.server_context)); + grpc::Status status = CatchingFunctionHandler([this, ¶m, &stream] { + return func_(static_cast<grpc::ServerContext*>(param.server_context), + &stream); + }); + + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } + if (write_needed_ && status.ok()) { + // If we needed a write but never did one, we need to mark the + // status as a fail + status = grpc::Status(grpc::StatusCode::INTERNAL, + "Service did not provide response message"); + } + } + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + if (param.server_context->has_pending_ops_) { + param.call->cq()->Pluck(¶m.server_context->pending_ops_); + } + param.call->cq()->Pluck(&ops); + } + + private: + std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_; + const bool write_needed_; +}; + +template <class ServiceType, class RequestType, class ResponseType> +class BidiStreamingHandler + : public TemplatedBidiStreamingHandler< + ServerReaderWriter<ResponseType, RequestType>, false> { + public: + BidiStreamingHandler(std::function<grpc::Status( + ServiceType*, grpc::ServerContext*, + ServerReaderWriter<ResponseType, RequestType>*)> + func, + ServiceType* service) + // TODO(vjpai): When gRPC supports C++14, move-capture func in the below + : TemplatedBidiStreamingHandler< + ServerReaderWriter<ResponseType, RequestType>, false>( + [func, service]( + grpc::ServerContext* ctx, + ServerReaderWriter<ResponseType, RequestType>* streamer) { + return func(service, ctx, streamer); + }) {} +}; + +template <class RequestType, class ResponseType> +class StreamedUnaryHandler + : public TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true> { + public: + explicit StreamedUnaryHandler( + std::function< + grpc::Status(grpc::ServerContext*, + ServerUnaryStreamer<RequestType, ResponseType>*)> + func) + : TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>( + std::move(func)) {} +}; + +template <class RequestType, class ResponseType> +class SplitServerStreamingHandler + : public TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false> { + public: + explicit SplitServerStreamingHandler( + std::function< + grpc::Status(grpc::ServerContext*, + ServerSplitStreamer<RequestType, ResponseType>*)> + func) + : TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>( + std::move(func)) {} +}; + +/// General method handler class for errors that prevent real method use +/// e.g., handle unknown method by returning UNIMPLEMENTED error. +template <grpc::StatusCode code> +class ErrorMethodHandler : public grpc::internal::MethodHandler { + public: + explicit ErrorMethodHandler(const TString& message) : message_(message) {} + + template <class T> + static void FillOps(grpc::ServerContextBase* context, + const TString& message, T* ops) { + grpc::Status status(code, message); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(&context->initial_metadata_, + context->initial_metadata_flags()); + if (context->compression_level_set()) { + ops->set_compression_level(context->compression_level()); + } + context->sent_initial_metadata_ = true; + } + ops->ServerSendStatus(&context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) final { + grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> + ops; + FillOps(param.server_context, message_, &ops); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req, + grpc::Status* /*status*/, void** /*handler_data*/) final { + // We have to destroy any request payload + if (req != nullptr) { + grpc_byte_buffer_destroy(req); + } + return nullptr; + } + + private: + const TString message_; +}; + +typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED> + UnknownMethodHandler; +typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED> + ResourceExhaustedHandler; + +} // namespace internal +} // namespace grpc #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H |