summaryrefslogtreecommitdiffstats
path: root/ydb/library/ycloud/impl/grpc_service_client.h
blob: c89515d8654f83346e6b2f10622cec6acceb8524 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#pragma once
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/grpc/client/grpc_client_low.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include "grpc_service_settings.h"

#define BLOG_GRPC_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::GRPC_CLIENT, stream)
#define BLOG_GRPC_DC(context, stream) LOG_DEBUG_S(context, NKikimrServices::GRPC_CLIENT, stream)

inline IOutputStream& operator <<(IOutputStream& out, const NGrpc::TGrpcStatus& status) {
    return out << status.GRpcStatusCode << " " << status.Msg;
}

template <typename TGrpcService>
class TGrpcServiceClient  {
    using TServiceConnection = NGrpc::TServiceConnection<TGrpcService>;

    NGrpc::TGRpcClientConfig Config;
    NGrpc::TGRpcClientLow Client;
    std::unique_ptr<TServiceConnection> Connection;

    TString Prefix(const TString& requestId = {}) const {
        if (requestId) {
            return Sprintf("[%08lx]{%s} ", (ptrdiff_t)this, requestId.c_str());
        } else {
            return Sprintf("[%08lx] ", (ptrdiff_t)this);
        }
    }

    static TString Trim(const TString& line) {
        if (line.size() > 512) {
            return line.substr(0, 512) + "...(truncated)";
        }
        return line;
    }

    template <typename TProtoMessageType>
    static TString Trim(const TProtoMessageType& message) {
        TStringBuilder log;
        log << message.GetDescriptor()->name() << " { " << Trim(message.ShortDebugString()) << " }";
        return log;
    }

public:
    static TString MaskToken(const TString& token) {
        TStringBuilder mask;
        if (token.size() >= 16) {
            mask << token.substr(0, 4);
            mask << "****";
            mask << token.substr(token.size() - 4, 4);
        } else {
            mask << "****";
        }
        mask << " (";
        mask << Sprintf("%08X", Crc32c(token.data(), token.size()));
        mask << ")";
        return mask;
    }

    static constexpr TDuration DEFAULT_TIMEOUT = TDuration::Seconds(10);

    struct TGrpcRequest {
        static const google::protobuf::Message& Obfuscate(const google::protobuf::Message& p) {
            return p;
        }
    };

    template <typename TCallType>
    void MakeCall(typename TCallType::TRequestEventType::TPtr ev) {
        using TRequestType = decltype(typename TCallType::TRequestEventType().Request);
        using TResponseType = decltype(typename TCallType::TResponseEventType().Response);
        const auto& requestId = ev->Get()->RequestId;
        if (!Connection) {
            BLOG_GRPC_D(Prefix(requestId) << "Connect to "
                        << ((Config.EnableSsl || !Config.SslCaCert.empty()) ? "grpcs://" : "grpc://")
                        << Config.Locator);
            Connection = Client.CreateGRpcServiceConnection<TGrpcService>(Config);
        }

        const TRequestType& request = ev->Get()->Request;
        NGrpc::TCallMeta meta;
        meta.Timeout = Config.Timeout;
        if (const auto& token = ev->Get()->Token) {
            meta.Aux.push_back({"authorization", "Bearer " + token});
        }
        if (requestId) {
            meta.Aux.push_back({"x-request-id", requestId});
        }

        NGrpc::TResponseCallback<TResponseType> callback =
            [actorSystem = NActors::TActivationContext::ActorSystem(), prefix = Prefix(requestId), request = ev](NGrpc::TGrpcStatus&& status, TResponseType&& response) -> void {
                if (status.Ok()) {
                    BLOG_GRPC_DC(*actorSystem, prefix << "Response " << Trim(TCallType::Obfuscate(response)));
                } else {
                    BLOG_GRPC_DC(*actorSystem, prefix << "Status " << status);
                }
                auto respEv = MakeHolder<typename TCallType::TResponseEventType>();
                respEv->Request = request;
                respEv->Status = status;
                respEv->Response = response;
                actorSystem->Send(respEv->Request->Sender, respEv.Release());
            };

        BLOG_GRPC_D(Prefix(requestId) << "Request " << Trim(TCallType::Obfuscate(request)));
        Connection->DoRequest(request, std::move(callback), TCallType::Request, meta);
    }

    static NGrpc::TGRpcClientConfig InitGrpcConfig(const NCloud::TGrpcClientSettings& settings) {
        NGrpc::TGRpcClientConfig config(settings.Endpoint, DEFAULT_TIMEOUT, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT, 0, settings.CertificateRootCA);
        config.EnableSsl = settings.EnableSsl;
        config.IntChannelParams[GRPC_ARG_KEEPALIVE_TIME_MS] = settings.GrpcKeepAliveTimeMs;
        config.IntChannelParams[GRPC_ARG_KEEPALIVE_TIMEOUT_MS] = settings.GrpcKeepAliveTimeoutMs;
        config.IntChannelParams[GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS] = 1;
        config.IntChannelParams[GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA] = 0;
        config.IntChannelParams[GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS] = settings.GrpcKeepAlivePingInterval;
        config.IntChannelParams[GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS] = settings.GrpcKeepAlivePingInterval;
        return config;
    }

    TGrpcServiceClient(const NCloud::TGrpcClientSettings& settings)
        : Config(InitGrpcConfig(settings))
    {}
};