1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#pragma once
#include <google/protobuf/message.h>
#include <library/cpp/threading/future/future.h>
#include <grpc++/server_context.h>
namespace grpc {
class ByteBuffer;
}
namespace NGrpc {
extern const char* GRPC_USER_AGENT_HEADER;
struct TAuthState {
enum EAuthState {
AS_NOT_PERFORMED,
AS_OK,
AS_FAIL,
AS_UNAVAILABLE
};
TAuthState(bool needAuth)
: NeedAuth(needAuth)
, State(AS_NOT_PERFORMED)
{}
bool NeedAuth;
EAuthState State;
};
//! An interface that may be used to limit concurrency of requests
class IGRpcRequestLimiter: public TThrRefBase {
public:
virtual bool IncRequest() = 0;
virtual void DecRequest() = 0;
};
using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>;
//! State of current request
class IRequestContextBase: public TThrRefBase {
public:
enum class EFinishStatus {
OK,
ERROR,
CANCEL
};
using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;
using TOnNextReply = std::function<void (size_t left)>;
//! Get pointer to the request's message.
virtual const NProtoBuf::Message* GetRequest() const = 0;
//! Get mutable pointer to the request's message.
virtual NProtoBuf::Message* GetRequestMut() = 0;
//! Get current auth state
virtual TAuthState& GetAuthState() = 0;
//! Send common response (The request shoult be created for protobuf response type)
//! Implementation can swap protobuf message
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;
//! Send serialised response (The request shoult be created for bytes response type)
//! Implementation can swap ByteBuffer
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;
//! Send grpc UNAUTHENTICATED status
virtual void ReplyUnauthenticated(const TString& in) = 0;
//! Send grpc error
virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details = "") = 0;
//! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise
virtual TInstant Deadline() const = 0;
//! Returns available peer metadata keys
virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0;
//! Returns peer optional metavalue
virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;
virtual TVector<TStringBuf> FindClientCert() const = 0;
//! Returns request compression level
virtual grpc_compression_level GetCompressionLevel() const = 0;
//! Returns protobuf arena allocator associated with current request
//! Lifetime of the arena is lifetime of the context
virtual google::protobuf::Arena* GetArena() = 0;
//! Add trailing metadata in to grpc context
//! The metadata will be send at the time of rpc finish
virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0;
//! Use validated database name for counters
virtual void UseDatabase(const TString& database) = 0;
// Streaming part
//! Set callback. The callback will be called when response deliverid to the client
//! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one
//! reply in flight
virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;
//! Finish streaming reply
virtual void FinishStreamingOk() = 0;
//! Returns future to get cancel of finish notification
virtual TAsyncFinishResult GetFinishFuture() = 0;
//! Returns peer address
virtual TString GetPeer() const = 0;
//! Returns true if server is using ssl
virtual bool SslServer() const = 0;
};
} // namespace NGrpc
|