aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/http.h
blob: 95595959ad448fa9e8cfddce4a9f810144a376f4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#pragma once

#include "fwd.h"

#include <yt/cpp/mapreduce/interface/common.h>
#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/format.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/cpp/mapreduce/interface/node.h>

#include <library/cpp/deprecated/atomic/atomic.h>
#include <library/cpp/http/io/stream.h>

#include <util/generic/hash.h>
#include <util/generic/hash_multi_map.h>
#include <util/generic/strbuf.h>
#include <util/generic/guid.h>
#include <util/network/socket.h>
#include <util/stream/input.h>
#include <util/system/mutex.h>
#include <util/system/rwlock.h>
#include <util/generic/ptr.h>

namespace NYT {

class TNode;

namespace NHttp {

struct THeadersPtrWrapper;

} // NHttp

////////////////////////////////////////////////////////////////////////////////

enum class EFrameType
{
    Data = 0x01,
    KeepAlive = 0x02,
};


class THttpHeader
{
public:
    THttpHeader(const TString& method, const TString& command, bool isApi = true);

    void AddParameter(const TString& key, TNode value, bool overwrite = false);
    void RemoveParameter(const TString& key);
    void MergeParameters(const TNode& parameters, bool overwrite = false);
    TNode GetParameters() const;

    void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
    void AddPath(const TString& path, bool overwrite = false);
    void AddOperationId(const TOperationId& operationId, bool overwrite = false);
    void AddMutationId();
    bool HasMutationId() const;

    void SetToken(const TString& token);
    void SetProxyAddress(const TString& proxyAddress);
    void SetHostPort(const TString& hostPort);
    void SetImpersonationUser(const TString& impersonationUser);

    void SetServiceTicket(const TString& ticket);

    void SetInputFormat(const TMaybe<TFormat>& format);

    void SetOutputFormat(const TMaybe<TFormat>& format);
    TMaybe<TFormat> GetOutputFormat() const;

    void SetRequestCompression(const TString& compression);
    void SetResponseCompression(const TString& compression);

    TString GetCommand() const;
    TString GetUrl(bool needProxy = false) const;
    TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
    NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;

    const TString& GetMethod() const;

private:
    bool ShouldAcceptFraming() const;

private:
    const TString Method;
    const TString Command;
    const bool IsApi;

    TNode::TMapType Parameters;
    TString ImpersonationUser;
    TString Token;
    TString ServiceTicket;
    TNode Attributes;
    TString ProxyAddress;
    TString HostPort;

private:
    TMaybe<TFormat> InputFormat = TFormat::YsonText();
    TMaybe<TFormat> OutputFormat = TFormat::YsonText();

    TString RequestCompression = "identity";
    TString ResponseCompression = "identity";
};

////////////////////////////////////////////////////////////////////////////////

class TAddressCache
{
public:
    using TAddressPtr = TAtomicSharedPtr<TNetworkAddress>;

    static TAddressCache* Get();

    TAddressPtr Resolve(const TString& hostName);

private:
    struct TCacheEntry {
        TAddressPtr Address;
        TInstant ExpirationTime;
    };

private:
    TAddressPtr FindAddress(const TString& hostName) const;
    void AddAddress(TString hostName, TAddressPtr address);

private:
    TRWMutex Lock_;
    THashMap<TString, TCacheEntry> Cache_;
};

////////////////////////////////////////////////////////////////////////////////

struct TConnection
{
    THolder<TSocket> Socket;
    TAtomic Busy = 1;
    TInstant DeadLine;
    ui32 Id;
};

using TConnectionPtr = TAtomicSharedPtr<TConnection>;

class TConnectionPool
{
public:
    using TConnectionMap = THashMultiMap<TString, TConnectionPtr>;

    static TConnectionPool* Get();

    TConnectionPtr Connect(const TString& hostName, TDuration socketTimeout);
    void Release(TConnectionPtr connection);
    void Invalidate(const TString& hostName, TConnectionPtr connection);

private:
    void Refresh();
    static SOCKET DoConnect(TAddressCache::TAddressPtr address);

private:
    TMutex Lock_;
    TConnectionMap Connections_;
};

////////////////////////////////////////////////////////////////////////////////

//
// Input stream that handles YT-specific header/trailer errors
// and throws TErrorResponse if it finds any.
class THttpResponse
    : public IInputStream
{
public:
    // 'requestId' and 'hostName' are provided for debug reasons
    // (they will appear in some error messages).
    THttpResponse(
        IInputStream* socketStream,
        const TString& requestId,
        const TString& hostName);

    const THttpHeaders& Headers() const;

    void CheckErrorResponse() const;
    bool IsExhausted() const;
    int GetHttpCode() const;
    const TString& GetHostName() const;
    bool IsKeepAlive() const;

protected:
    size_t DoRead(void* buf, size_t len) override;
    size_t DoSkip(size_t len) override;

private:
    void CheckTrailers(const THttpHeaders& trailers);
    TMaybe<TErrorResponse> ParseError(const THttpHeaders& headers);
    size_t UnframeRead(void* buf, size_t len);
    size_t UnframeSkip(size_t len);
    bool RefreshFrameIfNecessary();

private:
    THttpInput HttpInput_;
    const TString RequestId_;
    const TString HostName_;
    int HttpCode_ = 0;
    TMaybe<TErrorResponse> ErrorResponse_;
    bool IsExhausted_ = false;
    const bool Unframe_;
    size_t RemainingFrameSize_ = 0;
};

////////////////////////////////////////////////////////////////////////////////

class THttpRequest
{
public:
    THttpRequest();
    THttpRequest(const TString& requestId);
    ~THttpRequest();

    TString GetRequestId() const;

    void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero());

    IOutputStream* StartRequest(const THttpHeader& header);
    void FinishRequest();

    void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body);

    THttpResponse* GetResponseStream();

    TString GetResponse();

    void InvalidateConnection();

    int GetHttpCode();

private:
    IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters);

private:
    class TRequestStream;

private:
    TString HostName;
    TString RequestId;
    TString Url_;
    TInstant StartTime_;
    TString LoggedAttributes_;

    TConnectionPtr Connection;

    THolder<TRequestStream> RequestStream_;

    THolder<TSocketInput> SocketInput;
    THolder<THttpResponse> Input;

    bool LogResponse = false;
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT