diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-18 19:38:55 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-18 19:38:55 +0300 |
commit | 68155b41eaf4cc306f8286e7ce8da6f6af7d93f5 (patch) | |
tree | 0f81df1b6f5aef471df4c7a2bc0a33c3bfa281f7 | |
parent | 08a3cf2b6e7bf0e39970b37a80370f2a22adc6db (diff) | |
download | ydb-68155b41eaf4cc306f8286e7ce8da6f6af7d93f5.tar.gz |
YQ-560 Added S3 slow down retries
Added retries for s3 slow down
ref:6fcd1760780ca5633d880057148a48ab15d299fb
-rw-r--r-- | ydb/library/yql/providers/s3/provider/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/ya.make | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp | 32 |
3 files changed, 27 insertions, 7 deletions
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.txt index 9cccc15b0c0..013f4ed3ad0 100644 --- a/ydb/library/yql/providers/s3/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/provider/CMakeLists.txt @@ -8,6 +8,7 @@ target_link_libraries(providers-s3-provider PUBLIC contrib-libs-re2 library-cpp-json library-cpp-random_provider + library-cpp-retry cpp-string_utils-quote library-cpp-time_provider cpp-xml-document diff --git a/ydb/library/yql/providers/s3/provider/ya.make b/ydb/library/yql/providers/s3/provider/ya.make index 075bad7b0b2..54f3cd07069 100644 --- a/ydb/library/yql/providers/s3/provider/ya.make +++ b/ydb/library/yql/providers/s3/provider/ya.make @@ -25,6 +25,7 @@ PEERDIR( contrib/libs/re2 library/cpp/json library/cpp/random_provider + library/cpp/retry library/cpp/string_utils/quote library/cpp/time_provider library/cpp/xml/document diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 3e0384c9d7b..0b519984c7a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -13,6 +13,7 @@ #undef THROW #endif #include <library/cpp/xml/document/xml-document.h> +#include <library/cpp/retry/retry_policy.h> namespace NYql { @@ -31,6 +32,10 @@ std::array<TExprNode::TPtr, 2U> GetSchema(const TExprNode& settings) { using TItemsMap = std::map<TString, std::size_t>; using TPendingBuckets = std::unordered_map<std::tuple<TString, TString, TString>, std::tuple<TNodeSet, TItemsMap, TIssues>, THash<std::tuple<TString, TString, TString>>>; +ERetryErrorClass RetryS3SlowDown(long httpResponseCode) { + return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503 +} + void OnDiscovery( IHTTPGateway::TWeakPtr gateway, TPosition pos, @@ -39,7 +44,8 @@ void OnDiscovery( TPendingBuckets::mapped_type& output, NThreading::TPromise<void> promise, std::weak_ptr<TPendingBuckets> pendingBucketsWPtr, - int promiseInd) { + int promiseInd, + const IRetryPolicy<long>::TPtr& retryPolicy) { auto pendingBuckets = pendingBucketsWPtr.lock(); // keys and output could be used only when TPendingBuckets is alive if (!pendingBuckets) { return; @@ -79,8 +85,13 @@ void OnDiscovery( IHTTPGateway::THeaders headers; if (const auto& token = std::get<2U>(keys); !token.empty()) headers.emplace_back(token); - return g->Download(std::get<0U>(keys) + "?list-type=2&prefix=" + prefix + "&continuation-token=" + next + "&max-keys=" + maxKeys, std::move(headers), 0U, - std::bind(&OnDiscovery, gateway, pos, std::placeholders::_1, std::cref(keys), std::ref(output), std::move(promise), pendingBucketsWPtr, promiseInd)); + return g->Download( + std::get<0U>(keys) + "?list-type=2&prefix=" + prefix + "&continuation-token=" + next + "&max-keys=" + maxKeys, + std::move(headers), + 0U, + std::bind(&OnDiscovery, gateway, pos, std::placeholders::_1, std::cref(keys), std::ref(output), std::move(promise), pendingBucketsWPtr, promiseInd, retryPolicy), + /*data=*/"", + retryPolicy); } YQL_CLOG(INFO, ProviderS3) << "Gateway disappeared."; } @@ -106,7 +117,6 @@ void OnDiscovery( promise.SetValue(); } - TString RegexFromWildcards(const std::string_view& pattern) { const auto& escaped = RE2::QuoteMeta(re2::StringPiece(pattern)); TStringBuilder result; @@ -195,6 +205,7 @@ public: handles.reserve(PendingBuckets_->size()); int i = 0; + const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown); for (auto& bucket : *PendingBuckets_) { auto promise = NThreading::NewPromise(); handles.emplace_back(promise.GetFuture()); @@ -204,9 +215,16 @@ public: if (const auto& token = std::get<2U>(bucket.first); !token.empty()) headers.emplace_back(token); std::weak_ptr<TPendingBuckets> pendingBucketsWPtr = PendingBuckets_; - Gateway_->Download(std::get<0U>(bucket.first) + "?list-type=2&prefix=" + prefix, headers, 0U, std::bind(&OnDiscovery, - IHTTPGateway::TWeakPtr(Gateway_), ctx.GetPosition((*std::get<TNodeSet>(bucket.second).cbegin())->Pos()), std::placeholders::_1, - std::cref(bucket.first), std::ref(bucket.second), std::move(promise), pendingBucketsWPtr, i++)); + Gateway_->Download( + std::get<0U>(bucket.first) + "?list-type=2&prefix=" + prefix, + headers, + 0U, + std::bind(&OnDiscovery, + IHTTPGateway::TWeakPtr(Gateway_), ctx.GetPosition((*std::get<TNodeSet>(bucket.second).cbegin())->Pos()), std::placeholders::_1, + std::cref(bucket.first), std::ref(bucket.second), std::move(promise), pendingBucketsWPtr, i++, retryPolicy), + /*data=*/"", + retryPolicy + ); YQL_CLOG(INFO, ProviderS3) << "Enumerate items in " << std::get<0U>(bucket.first) << std::get<1U>(bucket.first); } |