aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_async_ctx_base.h
blob: 51356d4ce5af6d6b5abced249de00ee34550c379 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#pragma once

#include "grpc_server.h"

#include <util/generic/vector.h>
#include <util/generic/string.h>
#include <util/system/yassert.h>
#include <util/generic/set.h>

#include <grpc++/server.h>
#include <grpc++/server_context.h>

#include <chrono>

namespace NGrpc {

template<typename TService>
class TBaseAsyncContext: public ICancelableContext {
public:
    TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
        : Service(service)
        , CQ(cq)
    {
    }

    TString GetPeerName() const {
        return TString(Context.peer());
    }

    TInstant Deadline() const {
        // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
        // right before the request is getting to be send.
        // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
        //
        // After this timeout calculated back to the deadline on the server side
        // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
        // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
        //

        std::chrono::system_clock::time_point t = Context.deadline();
        if (t == std::chrono::system_clock::time_point::max()) {
            return TInstant::Max();
        }
        auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
        return TInstant::MicroSeconds(us.time_since_epoch().count());
    }

    TSet<TStringBuf> GetPeerMetaKeys() const {
        TSet<TStringBuf> keys;
        for (const auto& [key, _]: Context.client_metadata()) {
            keys.emplace(key.data(), key.size());
        }
        return keys;
    }

    TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
        const auto& clientMetadata = Context.client_metadata();
        const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
        if (range.first == range.second) {
            return {};
        }

        TVector<TStringBuf> values;
        values.reserve(std::distance(range.first, range.second));

        for (auto it = range.first; it != range.second; ++it) {
            values.emplace_back(it->second.data(), it->second.size());
        }
        return values;
    }

    grpc_compression_level GetCompressionLevel() const {
        return Context.compression_level();
    }

    void Shutdown() override {
        // Shutdown may only be called after request has started successfully
        if (Context.c_call())
            Context.TryCancel();
    }

protected:
    //! The means of communication with the gRPC runtime for an asynchronous
    //! server.
    typename TService::TCurrentGRpcService::AsyncService* const Service;
    //! The producer-consumer queue where for asynchronous server notifications.
    grpc::ServerCompletionQueue* const CQ;
    //! Context for the rpc, allowing to tweak aspects of it such as the use
    //! of compression, authentication, as well as to send metadata back to the
    //! client.
    grpc::ServerContext Context;
};

} // namespace NGrpc