aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/grpc_io.h
blob: 07f4b31d949e52439d695ea99225cc92355d53a6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#pragma once

#include <library/cpp/unified_agent_client/async_joiner.h>
#include <library/cpp/unified_agent_client/f_maybe.h>

#include <contrib/libs/grpc/include/grpcpp/alarm.h>
#include <contrib/libs/grpc/include/grpcpp/grpcpp.h>

#include <thread>

struct grpc_cq_completion;

namespace NUnifiedAgent {
    enum class EIOStatus {
        Ok,
        Error
    };

    class IIOCallback {
    public:
        virtual ~IIOCallback() = default;

        virtual IIOCallback* Ref() = 0;

        virtual void OnIOCompleted(EIOStatus status) = 0;
    };

    template<typename TCallback, typename TCounter>
    class TIOCallback: public IIOCallback {
    public:
        explicit TIOCallback(TCallback&& callback, TCounter* counter)
            : Callback(std::move(callback))
            , Counter(counter)
        {
        }

        IIOCallback* Ref() override {
            Counter->Ref();
            return this;
        }

        void OnIOCompleted(EIOStatus status) override {
            Callback(status);
            Counter->UnRef();
        }

    private:
        TCallback Callback;
        TCounter* Counter;
    };

    template<typename TCallback, typename TCounter>
    THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) {
        return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter);
    }

    template<typename TTarget, typename TCounter = TTarget>
    THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus),
                                        TCounter* counter = nullptr)
    {
        return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); },
                              counter ? counter : target);
    }

    class TGrpcNotification: private ::grpc::internal::CompletionQueueTag {
    public:
        TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);

        ~TGrpcNotification();

        void Trigger();

    private:
        bool FinalizeResult(void** tag, bool* status) override;

    private:
        grpc::CompletionQueue& CompletionQueue;
        THolder<IIOCallback> IOCallback;
        THolder<grpc_cq_completion> Completion;
        std::atomic<bool> InQueue;
    };

    class TGrpcTimer: private IIOCallback {
    public:
        TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);

        void Set(TInstant triggerTime);

        void Cancel();

    private:
        IIOCallback* Ref() override;

        void OnIOCompleted(EIOStatus status) override;

    private:
        grpc::CompletionQueue& CompletionQueue;
        THolder<IIOCallback> IOCallback;
        grpc::Alarm Alarm;
        bool AlarmIsSet;
        TFMaybe<TInstant> NextTriggerTime;
    };

    class TGrpcCompletionQueuePoller {
    public:
        explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue);

        void Start();

        void Join();

    private:
        grpc::CompletionQueue& Queue;
        std::thread Thread;
    };

    class TGrpcCompletionQueueHost {
    public:
        TGrpcCompletionQueueHost();

        void Start();

        void Stop();

        inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
            return CompletionQueue;
        }

    private:
        grpc::CompletionQueue CompletionQueue;
        TGrpcCompletionQueuePoller Poller;
    };

    gpr_timespec InstantToTimespec(TInstant instant);

    void EnsureGrpcConfigured();

    void StartGrpcTracing();

    void FinishGrpcTracing();
}