1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#pragma once
#include <util/generic/vector.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/generic/strbuf.h>
#include <util/generic/maybe.h>
#include <util/stream/output.h>
#include <util/datetime/base.h>
#include <functional>
namespace NNeh {
using TData = TVector<char>;
class TDataSaver: public TData, public IOutputStream {
public:
TDataSaver() = default;
~TDataSaver() override = default;
TDataSaver(TDataSaver&&) noexcept = default;
TDataSaver& operator=(TDataSaver&&) noexcept = default;
void DoWrite(const void* buf, size_t len) override {
insert(end(), (const char*)buf, (const char*)buf + len);
}
};
class IRequest {
public:
IRequest()
: ArrivalTime_(TInstant::Now())
{
}
virtual ~IRequest() = default;
virtual TStringBuf Scheme() const = 0;
virtual TString RemoteHost() const = 0; //IP-literal / IPv4address / reg-name()
virtual TStringBuf Service() const = 0;
virtual TStringBuf Data() const = 0;
virtual TStringBuf RequestId() const = 0;
virtual bool Canceled() const = 0;
virtual void SendReply(TData& data) = 0;
enum TResponseError {
BadRequest, // bad request data - http_code 400
Forbidden, // forbidden request - http_code 403
NotExistService, // not found request handler - http_code 404
TooManyRequests, // too many requests for the handler - http_code 429
InternalError, // s...amthing happen - http_code 500
NotImplemented, // not implemented - http_code 501
BadGateway, // remote backend not available - http_code 502
ServiceUnavailable, // overload - http_code 503
BandwidthLimitExceeded, // 5xx version of 429
MaxResponseError // count error types
};
virtual void SendError(TResponseError err, const TString& details = TString()) = 0;
virtual TInstant ArrivalTime() const {
return ArrivalTime_;
}
private:
TInstant ArrivalTime_;
};
using IRequestRef = TAutoPtr<IRequest>;
struct IOnRequest {
virtual void OnRequest(IRequestRef req) = 0;
};
class TRequestOut: public TDataSaver {
public:
inline TRequestOut(IRequest* req)
: Req_(req)
{
}
~TRequestOut() override {
try {
Finish();
} catch (...) {
}
}
void DoFinish() override {
if (Req_) {
Req_->SendReply(*this);
Req_ = nullptr;
}
}
private:
IRequest* Req_;
};
struct IRequester {
virtual ~IRequester() = default;
};
using IRequesterRef = TAtomicSharedPtr<IRequester>;
struct IService: public TThrRefBase {
virtual void ServeRequest(const IRequestRef& request) = 0;
};
using IServiceRef = TIntrusivePtr<IService>;
using TServiceFunction = std::function<void(const IRequestRef&)>;
IServiceRef Wrap(const TServiceFunction& func);
class IServices {
public:
virtual ~IServices() = default;
/// use current thread and run #threads-1 in addition
virtual void Loop(size_t threads) = 0;
/// run #threads and return control
virtual void ForkLoop(size_t threads) = 0;
/// send stopping request and wait stopping all services
virtual void SyncStopFork() = 0;
/// send stopping request and return control (async call)
virtual void Stop() = 0;
/// just listen, don't start any threads
virtual void Listen() = 0;
inline IServices& Add(const TString& service, IServiceRef srv) {
DoAdd(service, srv);
return *this;
}
inline IServices& Add(const TString& service, const TServiceFunction& func) {
return Add(service, Wrap(func));
}
template <class T>
inline IServices& Add(const TString& service, T& t) {
return this->Add(service, std::bind(&T::ServeRequest, std::ref(t), std::placeholders::_1));
}
template <class T, void (T::*M)(const IRequestRef&)>
inline IServices& Add(const TString& service, T& t) {
return this->Add(service, std::bind(M, std::ref(t), std::placeholders::_1));
}
private:
virtual void DoAdd(const TString& service, IServiceRef srv) = 0;
};
using IServicesRef = TAutoPtr<IServices>;
using TCheck = std::function<TMaybe<IRequest::TResponseError>(const IRequestRef&)>;
IServicesRef CreateLoop();
// if request fails check it will be cancelled
IServicesRef CreateLoop(TCheck check);
}
|