aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordinmukhammed <dinmukhammed@yandex-team.ru>2022-02-18 19:38:55 +0300
committerdinmukhammed <dinmukhammed@yandex-team.ru>2022-02-18 19:38:55 +0300
commit68155b41eaf4cc306f8286e7ce8da6f6af7d93f5 (patch)
tree0f81df1b6f5aef471df4c7a2bc0a33c3bfa281f7
parent08a3cf2b6e7bf0e39970b37a80370f2a22adc6db (diff)
downloadydb-68155b41eaf4cc306f8286e7ce8da6f6af7d93f5.tar.gz
YQ-560 Added S3 slow down retries
Added retries for s3 slow down ref:6fcd1760780ca5633d880057148a48ab15d299fb
-rw-r--r--ydb/library/yql/providers/s3/provider/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/provider/ya.make1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp32
3 files changed, 27 insertions, 7 deletions
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.txt
index 9cccc15b0c0..013f4ed3ad0 100644
--- a/ydb/library/yql/providers/s3/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/provider/CMakeLists.txt
@@ -8,6 +8,7 @@ target_link_libraries(providers-s3-provider PUBLIC
contrib-libs-re2
library-cpp-json
library-cpp-random_provider
+ library-cpp-retry
cpp-string_utils-quote
library-cpp-time_provider
cpp-xml-document
diff --git a/ydb/library/yql/providers/s3/provider/ya.make b/ydb/library/yql/providers/s3/provider/ya.make
index 075bad7b0b2..54f3cd07069 100644
--- a/ydb/library/yql/providers/s3/provider/ya.make
+++ b/ydb/library/yql/providers/s3/provider/ya.make
@@ -25,6 +25,7 @@ PEERDIR(
contrib/libs/re2
library/cpp/json
library/cpp/random_provider
+ library/cpp/retry
library/cpp/string_utils/quote
library/cpp/time_provider
library/cpp/xml/document
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 3e0384c9d7b..0b519984c7a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -13,6 +13,7 @@
#undef THROW
#endif
#include <library/cpp/xml/document/xml-document.h>
+#include <library/cpp/retry/retry_policy.h>
namespace NYql {
@@ -31,6 +32,10 @@ std::array<TExprNode::TPtr, 2U> GetSchema(const TExprNode& settings) {
using TItemsMap = std::map<TString, std::size_t>;
using TPendingBuckets = std::unordered_map<std::tuple<TString, TString, TString>, std::tuple<TNodeSet, TItemsMap, TIssues>, THash<std::tuple<TString, TString, TString>>>;
+ERetryErrorClass RetryS3SlowDown(long httpResponseCode) {
+ return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503
+}
+
void OnDiscovery(
IHTTPGateway::TWeakPtr gateway,
TPosition pos,
@@ -39,7 +44,8 @@ void OnDiscovery(
TPendingBuckets::mapped_type& output,
NThreading::TPromise<void> promise,
std::weak_ptr<TPendingBuckets> pendingBucketsWPtr,
- int promiseInd) {
+ int promiseInd,
+ const IRetryPolicy<long>::TPtr& retryPolicy) {
auto pendingBuckets = pendingBucketsWPtr.lock(); // keys and output could be used only when TPendingBuckets is alive
if (!pendingBuckets) {
return;
@@ -79,8 +85,13 @@ void OnDiscovery(
IHTTPGateway::THeaders headers;
if (const auto& token = std::get<2U>(keys); !token.empty())
headers.emplace_back(token);
- return g->Download(std::get<0U>(keys) + "?list-type=2&prefix=" + prefix + "&continuation-token=" + next + "&max-keys=" + maxKeys, std::move(headers), 0U,
- std::bind(&OnDiscovery, gateway, pos, std::placeholders::_1, std::cref(keys), std::ref(output), std::move(promise), pendingBucketsWPtr, promiseInd));
+ return g->Download(
+ std::get<0U>(keys) + "?list-type=2&prefix=" + prefix + "&continuation-token=" + next + "&max-keys=" + maxKeys,
+ std::move(headers),
+ 0U,
+ std::bind(&OnDiscovery, gateway, pos, std::placeholders::_1, std::cref(keys), std::ref(output), std::move(promise), pendingBucketsWPtr, promiseInd, retryPolicy),
+ /*data=*/"",
+ retryPolicy);
}
YQL_CLOG(INFO, ProviderS3) << "Gateway disappeared.";
}
@@ -106,7 +117,6 @@ void OnDiscovery(
promise.SetValue();
}
-
TString RegexFromWildcards(const std::string_view& pattern) {
const auto& escaped = RE2::QuoteMeta(re2::StringPiece(pattern));
TStringBuilder result;
@@ -195,6 +205,7 @@ public:
handles.reserve(PendingBuckets_->size());
int i = 0;
+ const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown);
for (auto& bucket : *PendingBuckets_) {
auto promise = NThreading::NewPromise();
handles.emplace_back(promise.GetFuture());
@@ -204,9 +215,16 @@ public:
if (const auto& token = std::get<2U>(bucket.first); !token.empty())
headers.emplace_back(token);
std::weak_ptr<TPendingBuckets> pendingBucketsWPtr = PendingBuckets_;
- Gateway_->Download(std::get<0U>(bucket.first) + "?list-type=2&prefix=" + prefix, headers, 0U, std::bind(&OnDiscovery,
- IHTTPGateway::TWeakPtr(Gateway_), ctx.GetPosition((*std::get<TNodeSet>(bucket.second).cbegin())->Pos()), std::placeholders::_1,
- std::cref(bucket.first), std::ref(bucket.second), std::move(promise), pendingBucketsWPtr, i++));
+ Gateway_->Download(
+ std::get<0U>(bucket.first) + "?list-type=2&prefix=" + prefix,
+ headers,
+ 0U,
+ std::bind(&OnDiscovery,
+ IHTTPGateway::TWeakPtr(Gateway_), ctx.GetPosition((*std::get<TNodeSet>(bucket.second).cbegin())->Pos()), std::placeholders::_1,
+ std::cref(bucket.first), std::ref(bucket.second), std::move(promise), pendingBucketsWPtr, i++, retryPolicy),
+ /*data=*/"",
+ retryPolicy
+ );
YQL_CLOG(INFO, ProviderS3) << "Enumerate items in " << std::get<0U>(bucket.first) << std::get<1U>(bucket.first);
}