diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/grpc/server/grpc_async_ctx_base.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/grpc/server/grpc_async_ctx_base.h')
-rw-r--r-- | library/cpp/grpc/server/grpc_async_ctx_base.h | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h new file mode 100644 index 0000000000..51356d4ce5 --- /dev/null +++ b/library/cpp/grpc/server/grpc_async_ctx_base.h @@ -0,0 +1,94 @@ +#pragma once + +#include "grpc_server.h" + +#include <util/generic/vector.h> +#include <util/generic/string.h> +#include <util/system/yassert.h> +#include <util/generic/set.h> + +#include <grpc++/server.h> +#include <grpc++/server_context.h> + +#include <chrono> + +namespace NGrpc { + +template<typename TService> +class TBaseAsyncContext: public ICancelableContext { +public: + TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq) + : Service(service) + , CQ(cq) + { + } + + TString GetPeerName() const { + return TString(Context.peer()); + } + + TInstant Deadline() const { + // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline + // right before the request is getting to be send. + // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + // + // After this timeout calculated back to the deadline on the server side + // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method). + // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME + // + + std::chrono::system_clock::time_point t = Context.deadline(); + if (t == std::chrono::system_clock::time_point::max()) { + return TInstant::Max(); + } + auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t); + return TInstant::MicroSeconds(us.time_since_epoch().count()); + } + + TSet<TStringBuf> GetPeerMetaKeys() const { + TSet<TStringBuf> keys; + for (const auto& [key, _]: Context.client_metadata()) { + keys.emplace(key.data(), key.size()); + } + return keys; + } + + TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const { + const auto& clientMetadata = Context.client_metadata(); + const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()}); + if (range.first == range.second) { + return {}; + } + + TVector<TStringBuf> values; + values.reserve(std::distance(range.first, range.second)); + + for (auto it = range.first; it != range.second; ++it) { + values.emplace_back(it->second.data(), it->second.size()); + } + return values; + } + + grpc_compression_level GetCompressionLevel() const { + return Context.compression_level(); + } + + void Shutdown() override { + // Shutdown may only be called after request has started successfully + if (Context.c_call()) + Context.TryCancel(); + } + +protected: + //! The means of communication with the gRPC runtime for an asynchronous + //! server. + typename TService::TCurrentGRpcService::AsyncService* const Service; + //! The producer-consumer queue where for asynchronous server notifications. + grpc::ServerCompletionQueue* const CQ; + //! Context for the rpc, allowing to tweak aspects of it such as the use + //! of compression, authentication, as well as to send metadata back to the + //! client. + grpc::ServerContext Context; +}; + +} // namespace NGrpc |