aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
blob: cee62cf1f79a3924abfcad511803a2a61e338cc2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>

#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/interface/client_method_options.h>
#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/common/retry_lib.h>

#include <library/cpp/testing/gtest/gtest.h>

using namespace NYT;
using namespace NYT::NDetail;
using namespace NYT::NDetail::NRawClient;


class TTestRetryPolicy
    : public IRequestRetryPolicy
{
private:
    static constexpr int RetriableCode = 904;

public:
    void NotifyNewAttempt() override
    { }

    TMaybe<TDuration> OnGenericError(const std::exception& /*e*/) override
    {
        return TDuration::Seconds(42);
    }

    void OnIgnoredError(const TErrorResponse& /*e*/) override
    { }

    TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
    {
        if (e.GetError().GetCode() == RetriableCode) {
            return TDuration::Seconds(e.GetError().GetAttributes().at("retry_interval").AsUint64());
        } else {
            return Nothing();
        }
    }

    TString GetAttemptDescription() const override
    {
        return "attempt";
    }

    static TNode GenerateRetriableError(TDuration retryDuration)
    {
        Y_ABORT_UNLESS(retryDuration - TDuration::Seconds(retryDuration.Seconds()) == TDuration::Zero());

        return TNode()
            ("code", RetriableCode)
            ("attributes",
                TNode()
                    ("retry_interval", retryDuration.Seconds()));
    }
};


TString GetPathFromRequest(const TNode& params)
{
    return params.AsMap().at("parameters").AsMap().at("path").AsString();
}

TVector<TString> GetAllPathsFromRequestList(const TNode& requestList)
{
    TVector<TString> result;
    for (const auto& request : requestList.AsList()) {
        result.push_back(GetPathFromRequest(request)); }
    return result;
}


TEST(TBatchRequestImplTest, ParseResponse) {
    TClientContext context;
    TRawBatchRequest batchRequest(context.Config);

    EXPECT_EQ(batchRequest.BatchSize(), 0u);

    auto get1 = batchRequest.Get(
        TTransactionId(),
        "//getOk",
        TGetOptions());

    auto get2 = batchRequest.Get(
        TTransactionId(),
        "//getError-3",
        TGetOptions());

    auto get3 = batchRequest.Get(
        TTransactionId(),
        "//getError-5",
        TGetOptions());

    EXPECT_EQ(batchRequest.BatchSize(), 3u);

    auto testRetryPolicy = MakeIntrusive<TTestRetryPolicy>();
    const TInstant now = TInstant::Seconds(100500);

    TRawBatchRequest retryBatch(context.Config);
    batchRequest.ParseResponse(
        TNode()
            .Add(TNode()("output", 5))
            .Add(TNode()("error",
                    TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(3))))
            .Add(TNode()("error",
                    TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(5)))),
            "<no-request-id>",
            testRetryPolicy,
            &retryBatch,
            now);

    EXPECT_EQ(batchRequest.BatchSize(), 0u);
    EXPECT_EQ(retryBatch.BatchSize(), 2u);

    TNode retryParameterList;
    TInstant nextTry;
    retryBatch.FillParameterList(3, &retryParameterList, &nextTry);
    EXPECT_EQ(
        GetAllPathsFromRequestList(retryParameterList),
        TVector<TString>({"//getError-3", "//getError-5"}));

    EXPECT_EQ(nextTry, now + TDuration::Seconds(5));
}