1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#pragma once
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/grpc/client/grpc_client_low.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include "grpc_service_settings.h"
#define BLOG_GRPC_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::GRPC_CLIENT, stream)
#define BLOG_GRPC_DC(context, stream) LOG_DEBUG_S(context, NKikimrServices::GRPC_CLIENT, stream)
inline IOutputStream& operator <<(IOutputStream& out, const NGrpc::TGrpcStatus& status) {
return out << status.GRpcStatusCode << " " << status.Msg;
}
template <typename TGrpcService>
class TGrpcServiceClient {
using TServiceConnection = NGrpc::TServiceConnection<TGrpcService>;
NGrpc::TGRpcClientConfig Config;
NGrpc::TGRpcClientLow Client;
std::unique_ptr<TServiceConnection> Connection;
TString Prefix(const TString& requestId = {}) const {
if (requestId) {
return Sprintf("[%08lx]{%s} ", (ptrdiff_t)this, requestId.c_str());
} else {
return Sprintf("[%08lx] ", (ptrdiff_t)this);
}
}
static TString Trim(const TString& line) {
if (line.size() > 512) {
return line.substr(0, 512) + "...(truncated)";
}
return line;
}
template <typename TProtoMessageType>
static TString Trim(const TProtoMessageType& message) {
TStringBuilder log;
log << message.GetDescriptor()->name() << " { " << Trim(message.ShortDebugString()) << " }";
return log;
}
public:
static TString MaskToken(const TString& token) {
TStringBuilder mask;
if (token.size() >= 16) {
mask << token.substr(0, 4);
mask << "****";
mask << token.substr(token.size() - 4, 4);
} else {
mask << "****";
}
mask << " (";
mask << Sprintf("%08X", Crc32c(token.data(), token.size()));
mask << ")";
return mask;
}
static constexpr TDuration DEFAULT_TIMEOUT = TDuration::Seconds(10);
struct TGrpcRequest {
static const google::protobuf::Message& Obfuscate(const google::protobuf::Message& p) {
return p;
}
};
template <typename TCallType>
void MakeCall(typename TCallType::TRequestEventType::TPtr ev) {
using TRequestType = decltype(typename TCallType::TRequestEventType().Request);
using TResponseType = decltype(typename TCallType::TResponseEventType().Response);
const auto& requestId = ev->Get()->RequestId;
if (!Connection) {
BLOG_GRPC_D(Prefix(requestId) << "Connect to "
<< ((Config.EnableSsl || !Config.SslCaCert.empty()) ? "grpcs://" : "grpc://")
<< Config.Locator);
Connection = Client.CreateGRpcServiceConnection<TGrpcService>(Config);
}
const TRequestType& request = ev->Get()->Request;
NGrpc::TCallMeta meta;
meta.Timeout = Config.Timeout;
if (const auto& token = ev->Get()->Token) {
meta.Aux.push_back({"authorization", "Bearer " + token});
}
if (requestId) {
meta.Aux.push_back({"x-request-id", requestId});
}
NGrpc::TResponseCallback<TResponseType> callback =
[actorSystem = NActors::TActivationContext::ActorSystem(), prefix = Prefix(requestId), request = ev](NGrpc::TGrpcStatus&& status, TResponseType&& response) -> void {
if (status.Ok()) {
BLOG_GRPC_DC(*actorSystem, prefix << "Response " << Trim(TCallType::Obfuscate(response)));
} else {
BLOG_GRPC_DC(*actorSystem, prefix << "Status " << status);
}
auto respEv = MakeHolder<typename TCallType::TResponseEventType>();
respEv->Request = request;
respEv->Status = status;
respEv->Response = response;
actorSystem->Send(respEv->Request->Sender, respEv.Release());
};
BLOG_GRPC_D(Prefix(requestId) << "Request " << Trim(TCallType::Obfuscate(request)));
Connection->DoRequest(request, std::move(callback), TCallType::Request, meta);
}
static NGrpc::TGRpcClientConfig InitGrpcConfig(const NCloud::TGrpcClientSettings& settings) {
NGrpc::TGRpcClientConfig config(settings.Endpoint, DEFAULT_TIMEOUT, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT, 0, settings.CertificateRootCA);
config.EnableSsl = settings.EnableSsl;
config.IntChannelParams[GRPC_ARG_KEEPALIVE_TIME_MS] = settings.GrpcKeepAliveTimeMs;
config.IntChannelParams[GRPC_ARG_KEEPALIVE_TIMEOUT_MS] = settings.GrpcKeepAliveTimeoutMs;
config.IntChannelParams[GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS] = 1;
config.IntChannelParams[GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA] = 0;
config.IntChannelParams[GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS] = settings.GrpcKeepAlivePingInterval;
config.IntChannelParams[GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS] = settings.GrpcKeepAlivePingInterval;
return config;
}
TGrpcServiceClient(const NCloud::TGrpcClientSettings& settings)
: Config(InitGrpcConfig(settings))
{}
};
|