aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/rpc.h
blob: 482ff7ce5384a5618d482b5dcf02d12d92eae788 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#pragma once

#include <util/generic/vector.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/generic/strbuf.h>
#include <util/generic/maybe.h>
#include <util/stream/output.h>
#include <util/datetime/base.h>
#include <functional>

namespace NNeh {
    using TData = TVector<char>;

    class TDataSaver: public TData, public IOutputStream {
    public:
        TDataSaver() = default;
        ~TDataSaver() override = default;
        TDataSaver(TDataSaver&&) noexcept = default;
        TDataSaver& operator=(TDataSaver&&) noexcept = default;

        void DoWrite(const void* buf, size_t len) override {
            insert(end(), (const char*)buf, (const char*)buf + len);
        }
    };

    class IRequest {
    public:
        IRequest()
            : ArrivalTime_(TInstant::Now())
        {
        }

        virtual ~IRequest() = default;

        virtual TStringBuf Scheme() const = 0;
        virtual TString RemoteHost() const = 0; //IP-literal / IPv4address / reg-name()
        virtual TStringBuf Service() const = 0;
        virtual TStringBuf Data() const = 0;
        virtual TStringBuf RequestId() const = 0;
        virtual bool Canceled() const = 0;
        virtual void SendReply(TData& data) = 0;
        enum TResponseError {
            BadRequest,             // bad request data - http_code 400
            Forbidden,              // forbidden request - http_code 403
            NotExistService,        // not found request handler - http_code 404
            TooManyRequests,        // too many requests for the handler - http_code 429
            InternalError,          // s...amthing happen - http_code 500
            NotImplemented,         // not implemented - http_code 501
            BadGateway,             // remote backend not available - http_code 502
            ServiceUnavailable,     // overload - http_code 503
            BandwidthLimitExceeded, // 5xx version of 429
            MaxResponseError        // count error types
        };
        virtual void SendError(TResponseError err, const TString& details = TString()) = 0;
        virtual TInstant ArrivalTime() const {
            return ArrivalTime_;
        }

    private:
        TInstant ArrivalTime_;
    };

    using IRequestRef = TAutoPtr<IRequest>;

    struct IOnRequest {
        virtual void OnRequest(IRequestRef req) = 0;
    };

    class TRequestOut: public TDataSaver {
    public:
        inline TRequestOut(IRequest* req)
            : Req_(req)
        {
        }

        ~TRequestOut() override {
            try {
                Finish();
            } catch (...) {
            }
        }

        void DoFinish() override {
            if (Req_) {
                Req_->SendReply(*this);
                Req_ = nullptr;
            }
        }

    private:
        IRequest* Req_;
    };

    struct IRequester {
        virtual ~IRequester() = default;
    };

    using IRequesterRef = TAtomicSharedPtr<IRequester>;

    struct IService: public TThrRefBase {
        virtual void ServeRequest(const IRequestRef& request) = 0;
    };

    using IServiceRef = TIntrusivePtr<IService>;
    using TServiceFunction = std::function<void(const IRequestRef&)>;

    IServiceRef Wrap(const TServiceFunction& func);

    class IServices {
    public:
        virtual ~IServices() = default;

        /// use current thread and run #threads-1 in addition
        virtual void Loop(size_t threads) = 0;
        /// run #threads and return control
        virtual void ForkLoop(size_t threads) = 0;
        /// send stopping request and wait stopping all services
        virtual void SyncStopFork() = 0;
        /// send stopping request and return control (async call)
        virtual void Stop() = 0;
        /// just listen, don't start any threads
        virtual void Listen() = 0;

        inline IServices& Add(const TString& service, IServiceRef srv) {
            DoAdd(service, srv);

            return *this;
        }

        inline IServices& Add(const TString& service, const TServiceFunction& func) {
            return Add(service, Wrap(func));
        }

        template <class T>
        inline IServices& Add(const TString& service, T& t) {
            return this->Add(service, std::bind(&T::ServeRequest, std::ref(t), std::placeholders::_1));
        }

        template <class T, void (T::*M)(const IRequestRef&)>
        inline IServices& Add(const TString& service, T& t) {
            return this->Add(service, std::bind(M, std::ref(t), std::placeholders::_1));
        }

    private:
        virtual void DoAdd(const TString& service, IServiceRef srv) = 0;
    };

    using IServicesRef = TAutoPtr<IServices>;
    using TCheck = std::function<TMaybe<IRequest::TResponseError>(const IRequestRef&)>;

    IServicesRef CreateLoop();
    // if request fails check it will be cancelled
    IServicesRef CreateLoop(TCheck check);
}