aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/abortable_http_response.h
blob: e9b1483bf70465130acc9e09c931e2330e602b94 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#pragma once

#include "http.h"

#include <util/generic/intrlist.h>

namespace NYT {

////////////////////////////////////////////////////////////////////////////////

class TAbortableHttpResponseRegistry;

using TOutageId = size_t;

////////////////////////////////////////////////////////////////////////////////

class TAbortedForTestPurpose
    : public yexception
{ };

struct TOutageOptions
{
    using TSelf = TOutageOptions;

    /// @brief Number of responses to abort.
    FLUENT_FIELD_DEFAULT(size_t, ResponseCount, std::numeric_limits<size_t>::max());

    /// @brief Number of bytes to read before abortion. If zero, abort immediately.
    FLUENT_FIELD_DEFAULT(size_t, LengthLimit, 0);
};

////////////////////////////////////////////////////////////////////////////////

class IAbortableHttpResponse
    : public TIntrusiveListItem<IAbortableHttpResponse>
{
public:
    virtual void Abort() = 0;
    virtual const TString& GetUrl() const = 0;
    virtual bool IsAborted() const = 0;
    virtual void SetLengthLimit(size_t limit) = 0;

    virtual ~IAbortableHttpResponse() = default;
};

class TAbortableHttpResponseBase
    : public IAbortableHttpResponse
{
public:
    TAbortableHttpResponseBase(const TString& url);
    ~TAbortableHttpResponseBase();

    void Abort() override;
    const TString& GetUrl() const override;
    bool IsAborted() const override;
    void SetLengthLimit(size_t limit) override;

protected:
    TString Url_;
    std::atomic<bool> Aborted_ = {false};
    size_t LengthLimit_ = std::numeric_limits<size_t>::max();
};

////////////////////////////////////////////////////////////////////////////////

/// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors.
class TAbortableCoreHttpResponse
    : public IInputStream
    , public TAbortableHttpResponseBase
{
public:
    TAbortableCoreHttpResponse(
        std::unique_ptr<IInputStream> stream,
        const TString& url);

private:
    size_t DoRead(void* buf, size_t len) override;
    size_t DoSkip(size_t len) override;

private:
    std::unique_ptr<IInputStream> Stream_;
};

////////////////////////////////////////////////////////////////////////////////

/// @brief Class extends @ref NYT::THttpResponse with possibility to emulate errors.
class TAbortableHttpResponse
    : public THttpResponse
    , public TAbortableHttpResponseBase
{
public:
    class TOutage
    {
    public:
        TOutage(TString urlPattern, TAbortableHttpResponseRegistry& registry, const TOutageOptions& options);
        TOutage(TOutage&&) = default;
        TOutage(const TOutage&) = delete;
        ~TOutage();

        void Stop();

    private:
        TString UrlPattern_;
        TAbortableHttpResponseRegistry& Registry_;
        TOutageId Id_;
        bool Stopped_ = false;
    };

public:
    TAbortableHttpResponse(
        TRequestContext context,
        IInputStream* socketStream,
        const TString& url);

    /// @brief Abort any responses which match `urlPattern` (i.e. contain it in url).
    ///
    /// @return number of aborted responses.
    static int AbortAll(const TString& urlPattern);

    /// @brief Start outage. Future responses which match `urlPattern` (i.e. contain it in url) will fail.
    ///
    /// @return outage object controlling the lifetime of outage (outage stops when object is destroyed)
    [[nodiscard]] static TOutage StartOutage(
        const TString& urlPattern,
        const TOutageOptions& options = TOutageOptions());

    /// @brief Start outage. Future `responseCount` responses which match `urlPattern` (i.e. contain it in url) will fail.
    ///
    /// @return outage object controlling the lifetime of outage (outage stops when object is destroyed)
    [[nodiscard]] static TOutage StartOutage(
        const TString& urlPattern,
        size_t responseCount);

private:
    size_t DoRead(void* buf, size_t len) override;
    size_t DoSkip(size_t len) override;
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT