1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/interface/client_method_options.h>
#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <library/cpp/testing/gtest/gtest.h>
using namespace NYT;
using namespace NYT::NDetail;
using namespace NYT::NDetail::NRawClient;
class TTestRetryPolicy
: public IRequestRetryPolicy
{
private:
static constexpr int RetriableCode = 904;
public:
void NotifyNewAttempt() override
{ }
TMaybe<TDuration> OnGenericError(const std::exception& /*e*/) override
{
return TDuration::Seconds(42);
}
void OnIgnoredError(const TErrorResponse& /*e*/) override
{ }
TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
{
if (e.GetError().GetCode() == RetriableCode) {
return TDuration::Seconds(e.GetError().GetAttributes().at("retry_interval").AsUint64());
} else {
return Nothing();
}
}
TString GetAttemptDescription() const override
{
return "attempt";
}
static TNode GenerateRetriableError(TDuration retryDuration)
{
Y_ABORT_UNLESS(retryDuration - TDuration::Seconds(retryDuration.Seconds()) == TDuration::Zero());
return TNode()
("code", RetriableCode)
("attributes",
TNode()
("retry_interval", retryDuration.Seconds()));
}
};
TString GetPathFromRequest(const TNode& params)
{
return params.AsMap().at("parameters").AsMap().at("path").AsString();
}
TVector<TString> GetAllPathsFromRequestList(const TNode& requestList)
{
TVector<TString> result;
for (const auto& request : requestList.AsList()) {
result.push_back(GetPathFromRequest(request)); }
return result;
}
TEST(TBatchRequestImplTest, ParseResponse) {
TClientContext context;
TRawBatchRequest batchRequest(context.Config);
EXPECT_EQ(batchRequest.BatchSize(), 0u);
auto get1 = batchRequest.Get(
TTransactionId(),
"//getOk",
TGetOptions());
auto get2 = batchRequest.Get(
TTransactionId(),
"//getError-3",
TGetOptions());
auto get3 = batchRequest.Get(
TTransactionId(),
"//getError-5",
TGetOptions());
EXPECT_EQ(batchRequest.BatchSize(), 3u);
auto testRetryPolicy = MakeIntrusive<TTestRetryPolicy>();
const TInstant now = TInstant::Seconds(100500);
TRawBatchRequest retryBatch(context.Config);
batchRequest.ParseResponse(
TNode()
.Add(TNode()("output", 5))
.Add(TNode()("error",
TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(3))))
.Add(TNode()("error",
TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(5)))),
"<no-request-id>",
testRetryPolicy,
&retryBatch,
now);
EXPECT_EQ(batchRequest.BatchSize(), 0u);
EXPECT_EQ(retryBatch.BatchSize(), 2u);
TNode retryParameterList;
TInstant nextTry;
retryBatch.FillParameterList(3, &retryParameterList, &nextTry);
EXPECT_EQ(
GetAllPathsFromRequestList(retryParameterList),
TVector<TString>({"//getError-3", "//getError-5"}));
EXPECT_EQ(nextTry, now + TDuration::Seconds(5));
}
|