aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/wilson/wilson_uploader.cpp
blob: e87776717bb8401484708ec46c936f83f351e12e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include "wilson_uploader.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/wilson/protos/service.pb.h>
#include <library/cpp/actors/wilson/protos/service.grpc.pb.h>
#include <util/stream/file.h>
#include <grpc++/grpc++.h>
#include <chrono>

namespace NWilson {

    using namespace NActors;

    namespace NServiceProto = opentelemetry::proto::collector::trace::v1;

    namespace {

        class TWilsonUploader
            : public TActorBootstrapped<TWilsonUploader>
        {
            TString Host;
            ui16 Port;
            TString RootCA;

            std::shared_ptr<grpc::Channel> Channel;
            std::unique_ptr<NServiceProto::TraceService::Stub> Stub;
            grpc::CompletionQueue CQ;

            std::unique_ptr<grpc::ClientContext> Context;
            std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
            NServiceProto::ExportTraceServiceRequest Request;
            NServiceProto::ExportTraceServiceResponse Response;
            grpc::Status Status;

        public:
            TWilsonUploader(TString host, ui16 port, TString rootCA)
                : Host(std::move(host))
                , Port(std::move(port))
                , RootCA(std::move(rootCA))
            {}

            ~TWilsonUploader() {
                CQ.Shutdown();
            }

            void Bootstrap() {
                Become(&TThis::StateFunc);

                Channel = grpc::CreateChannel(TStringBuilder() << Host << ":" << Port, grpc::SslCredentials({
                    .pem_root_certs = TFileInput(RootCA).ReadAll(),
                }));
                Stub = NServiceProto::TraceService::NewStub(Channel);
            }

            void Handle(TEvWilson::TPtr ev) {
                CheckIfDone();

                auto *rspan = Request.resource_spans_size() ? Request.mutable_resource_spans(0) : Request.add_resource_spans();
                auto *sspan = rspan->scope_spans_size() ? rspan->mutable_scope_spans(0) : rspan->add_scope_spans();
                ev->Get()->Span.Swap(sspan->add_spans());

                if (!Context) {
                    SendRequest();
                }
            }

            void SendRequest() {
                Y_VERIFY(!Reader && !Context);
                Context = std::make_unique<grpc::ClientContext>();
                Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ);
                Reader->Finish(&Response, &Status, nullptr);
            }

            void CheckIfDone() {
                if (Context) {
                    void *tag;
                    bool ok;
                    if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
                        Reader.reset();
                        Context.reset();

                        if (Request.resource_spans_size()) {
                            SendRequest();
                        }
                    }
                }
            }

            STRICT_STFUNC(StateFunc,
                hFunc(TEvWilson, Handle);
            );
        };

    } // anonymous

    IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA) {
        return new TWilsonUploader(std::move(host), port, std::move(rootCA));
    }

} // NWilson