diff options
author | hor911 <hor911@ydb.tech> | 2023-04-26 17:18:36 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-04-26 17:18:36 +0300 |
commit | bf749f3ac3722adebe8244692adfc2d8d07208c5 (patch) | |
tree | e598e330daa4218a48d9f9b03578f0d4eb3152ee | |
parent | 6d73358874922366b2611962ff0179b01fb838a0 (diff) | |
download | ydb-bf749f3ac3722adebe8244692adfc2d8d07208c5.tar.gz |
Retry policy for CURLcode
19 files changed, 122 insertions, 88 deletions
diff --git a/ydb/core/fq/libs/test_connection/test_object_storage.cpp b/ydb/core/fq/libs/test_connection/test_object_storage.cpp index 1d8f2ec9ad..a627c6f0fc 100644 --- a/ydb/core/fq/libs/test_connection/test_object_storage.cpp +++ b/ydb/core/fq/libs/test_connection/test_object_storage.cpp @@ -147,8 +147,8 @@ private: } } - static ERetryErrorClass RetryS3SlowDown(long httpResponseCode) { - return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503 + static ERetryErrorClass RetryS3SlowDown(CURLcode curlResponseCode, long httpResponseCode) { + return curlResponseCode == CURLE_OK && httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503 } void SendDiscover() { @@ -164,7 +164,7 @@ private: TString requestId = CreateGuidAsString(); headers.emplace_back(TString{"X-Request-ID:"} + requestId); - const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown); + const auto retryPolicy = NYql::IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy(RetryS3SlowDown); NYql::TUrlBuilder urlBuilder(ClusterConfig.GetUrl()); const auto url = urlBuilder.AddUrlParam("list-type", "2") diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp index 9c89e26e1c..4f7c7ecc28 100644 --- a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp +++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp @@ -112,7 +112,7 @@ private: IHTTPGateway::TPtr Gateway; IHTTPGateway::THeaders Headers; - IRetryPolicy<long>::TPtr RetryPolicy; + IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const TString Url; const size_t FileSize; int64_t InnerPos = 0; diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp index 68d75e34e3..a3a95b2587 100644 --- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp +++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp @@ -5,7 +5,7 @@ TArrowFileDesc::TArrowFileDesc( const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, size_t size, const TString& format ) diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h index 556d584962..99a40fc363 100644 --- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h +++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h @@ -18,7 +18,7 @@ public: const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, size_t size, const TString& format = "" ); @@ -26,7 +26,7 @@ public: TString Url; IHTTPGateway::TPtr Gateway; IHTTPGateway::THeaders Headers; - IRetryPolicy<long>::TPtr RetryPolicy; + IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; TString Format; size_t Size; bool IsLocal; diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 1c2fe083af..7ef5a47b71 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -43,9 +43,9 @@ public: return std::move(ret); } - void Upload(TString, THeaders, TString, TOnResult, bool, IRetryPolicy<long>::TPtr) {} + void Upload(TString, THeaders, TString, TOnResult, bool, TRetryPolicy::TPtr) final {} - void Delete(TString, THeaders, TOnResult, IRetryPolicy<long>::TPtr) {} + void Delete(TString, THeaders, TOnResult, TRetryPolicy::TPtr) final {} void Download( TString url, @@ -54,7 +54,7 @@ public: std::size_t sizeLimit, TOnResult callback, TString data, - IRetryPolicy<long>::TPtr retryPolicy) + TRetryPolicy::TPtr retryPolicy) final { Y_UNUSED(sizeLimit); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp index 01edb1587d..eb94f33826 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp @@ -2,14 +2,37 @@ namespace NYql { -IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries) { +IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries) { if (!maxTime) { maxTime = TDuration::Minutes(5); } - return IRetryPolicy<long>::GetExponentialBackoffPolicy([](long httpCode) { + return IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy([](CURLcode curlCode, long httpCode) { + + switch (curlCode) { + case CURLE_OK: + // look to http code + break; + case CURLE_COULDNT_CONNECT: + case CURLE_WEIRD_SERVER_REPLY: + case CURLE_WRITE_ERROR: + case CURLE_READ_ERROR: + case CURLE_OPERATION_TIMEDOUT: + case CURLE_SSL_CONNECT_ERROR: + case CURLE_BAD_DOWNLOAD_RESUME: + case CURLE_SEND_ERROR: + case CURLE_RECV_ERROR: + case CURLE_NO_CONNECTION_AVAILABLE: + // retry small number of known errors + return ERetryErrorClass::ShortRetry; + default: + // do not retry others + return ERetryErrorClass::NoRetry; + } + switch (httpCode) { case 0: - return ERetryErrorClass::ShortRetry; + // rare case when curl code is not available like manual cancelling, not retriable anymore + return ERetryErrorClass::NoRetry; case 408: // Request Timeout case 425: // Too Early case 429: // Too Many Requests diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h index 4a245a8b4f..74169c215a 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h @@ -1,9 +1,9 @@ #pragma once -#include <library/cpp/retry/retry_policy.h> +#include "yql_http_gateway.h" namespace NYql { -IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero(), size_t maxRetries = std::numeric_limits<size_t>::max()); // Zero means default maxTime +IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero(), size_t maxRetries = std::numeric_limits<size_t>::max()); // Zero means default maxTime } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 773e4cc882..5deb77748a 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -2,7 +2,6 @@ #include "yql_dns_gateway.h" #include <ydb/library/yql/utils/log/log.h> -#include <contrib/libs/curl/include/curl/curl.h> #include <util/stream/str.h> #include <util/string/builder.h> #include <util/generic/size_literals.h> @@ -219,7 +218,7 @@ public: return Method; } - virtual void Fail(const TIssue& error) = 0; + virtual void Fail(CURLcode result, const TIssue& error) = 0; virtual void Done(CURLcode result, long httpResponseCode) = 0; virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0; @@ -295,7 +294,7 @@ public: size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, - IRetryPolicy<long>::IRetryState::TPtr retryState, + IHTTPGateway::TRetryPolicy::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) : TEasyCurl( @@ -330,7 +329,7 @@ public: size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, - IRetryPolicy<long>::IRetryState::TPtr retryState, + IHTTPGateway::TRetryPolicy::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { return std::make_shared<TEasyCurlBuffer>( @@ -358,9 +357,9 @@ public: return true; } - TMaybe<TDuration> GetNextRetryDelay(long httpResponseCode) const { + TMaybe<TDuration> GetNextRetryDelay(CURLcode curlResponseCode, long httpResponseCode) const { if (RetryState) - return RetryState->GetNextRetryDelay(httpResponseCode); + return RetryState->GetNextRetryDelay(curlResponseCode, httpResponseCode); return {}; } @@ -375,18 +374,18 @@ public: InitHandles(); } private: - void Fail(const TIssue& error) final { + void Fail(CURLcode result, const TIssue& error) final { TIssues issues{error}; const std::unique_lock lock(SyncCallbacks); while (!Callbacks.empty()) { - Callbacks.top()(issues); + Callbacks.top()(IHTTPGateway::TResult(result, issues)); Callbacks.pop(); } } void Done(CURLcode result, long httpResponseCode) final { if (CURLE_OK != result) - return Fail(TIssue( TStringBuilder{} << curl_easy_strerror(result) << ". Detailed: " << GetDetailedErrorText())); + return Fail(result, TIssue( TStringBuilder{} << curl_easy_strerror(result) << ". Detailed: " << GetDetailedErrorText())); const std::unique_lock lock(SyncCallbacks); while (!Callbacks.empty()) { @@ -421,7 +420,7 @@ private: std::mutex SyncCallbacks; std::stack<IHTTPGateway::TOnResult> Callbacks; - const IRetryPolicy<long>::IRetryState::TPtr RetryState; + const IHTTPGateway::TRetryPolicy::IRetryState::TPtr RetryState; }; class TEasyCurlStream : public TEasyCurl { @@ -495,35 +494,35 @@ public: void Cancel(TIssue issue) { Cancelled = true; - OnFinish(TIssues{issue}); + OnFinish(CURLE_OK, TIssues{issue}); } private: - void Fail(const TIssue& error) final { + void Fail(CURLcode result, const TIssue& error) final { if (!Cancelled) - OnFinish(TIssues{error}); + OnFinish(result, TIssues{error}); } - void MaybeStart(long httpResponseCode = 0) { + void MaybeStart(CURLcode result, long httpResponseCode = 0) { if (!HttpResponseCode) { if (!httpResponseCode) { curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); } HttpResponseCode = httpResponseCode; - OnStart(HttpResponseCode); + OnStart(result, HttpResponseCode); } } void Done(CURLcode result, long httpResponseCode) final { - MaybeStart(httpResponseCode); + MaybeStart(result, httpResponseCode); if (CURLE_OK != result) { - return Fail(TIssue(TStringBuilder{} << "error: " << curl_easy_strerror(result) << " detailed: " << GetDetailedErrorText())); + return Fail(result, TIssue(TStringBuilder{} << "error: " << curl_easy_strerror(result) << " detailed: " << GetDetailedErrorText())); } if (!Cancelled) - OnFinish(TIssues()); + OnFinish(result, TIssues()); } size_t Write(void* contents, size_t size, size_t nmemb) final { - MaybeStart(); + MaybeStart(CURLE_OK); const auto realsize = size * nmemb; if (!Cancelled) OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter)); @@ -545,7 +544,7 @@ private: long HttpResponseCode = 0L; }; -using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; +using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IHTTPGateway::TRetryPolicy::TPtr>; class TKeyHash { public: @@ -560,7 +559,7 @@ public: } private: const std::hash<TString> Hash; - const std::hash<IRetryPolicy<long>::TPtr> HashPtr; + const std::hash<IHTTPGateway::TRetryPolicy::TPtr> HashPtr; }; class THTTPMultiGateway : public IHTTPGateway { @@ -788,7 +787,7 @@ private: if (auto buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) { AllocatedSize -= buffer->GetSizeLimit(); - if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(httpResponseCode)) { + if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(result, httpResponseCode)) { buffer->Reset(); Delayed.emplace(nextRetryDelay->ToDeadLine(), std::move(buffer)); easy.reset(); @@ -818,12 +817,12 @@ private: const TIssue error(curl_multi_strerror(result)); while (!works.empty()) { curl_multi_remove_handle(Handle, works.top()->GetHandle()); - works.top()->Fail(error); + works.top()->Fail(CURLE_OK, error); works.pop(); } } - void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, IRetryPolicy<long>::TPtr retryPolicy) final { + void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final { Rps->Inc(); const std::unique_lock lock(Sync); @@ -832,7 +831,7 @@ private: Wakeup(0U); } - void Delete(TString url, THeaders headers, TOnResult callback, IRetryPolicy<long>::TPtr retryPolicy) final { + void Delete(TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final { Rps->Inc(); const std::unique_lock lock(Sync); @@ -848,12 +847,12 @@ private: size_t sizeLimit, TOnResult callback, TString data, - IRetryPolicy<long>::TPtr retryPolicy) final + TRetryPolicy::TPtr retryPolicy) final { Rps->Inc(); if (sizeLimit > MaxSimulatenousDownloadsSize) { TIssue error(TStringBuilder() << "Too big file for downloading: size " << sizeLimit << ", but limit is " << MaxSimulatenousDownloadsSize); - callback(TIssues{error}); + callback(TResult(CURLE_OK, TIssues{error})); return; } const std::unique_lock lock(Sync); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index bef5a2bc26..8b48f54efe 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -7,6 +7,8 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/retry/retry_policy.h> +#include <contrib/libs/curl/include/curl/curl.h> + #include <atomic> #include <variant> #include <functional> @@ -17,6 +19,7 @@ class IHTTPGateway { public: using TPtr = std::shared_ptr<IHTTPGateway>; using TWeakPtr = std::weak_ptr<IHTTPGateway>; + using TRetryPolicy = IRetryPolicy<CURLcode, long>; virtual ~IHTTPGateway() = default; @@ -63,9 +66,10 @@ public: using THeaders = TSmallVec<TString>; struct TResult { - TResult(TContent&& content) : Content(std::move(content)) {} - TResult(const TIssues& issues) : Content(""), Issues(issues) {} + TResult(TContent&& content) : Content(std::move(content)), CurlResponseCode(CURLE_OK) {} + TResult(CURLcode curlResponseCode, const TIssues& issues) : Content(""), CurlResponseCode(curlResponseCode), Issues(issues) {} TContent Content; + CURLcode CurlResponseCode; TIssues Issues; }; using TOnResult = std::function<void(TResult&&)>; @@ -76,13 +80,13 @@ public: TString body, TOnResult callback, bool put = false, - IRetryPolicy</*http response code*/long>::TPtr retryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0; + TRetryPolicy::TPtr retryPolicy = TRetryPolicy::GetNoRetryPolicy()) = 0; virtual void Delete( TString url, THeaders headers, TOnResult callback, - IRetryPolicy</*http response code*/long>::TPtr retryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0; + TRetryPolicy::TPtr retryPolicy = TRetryPolicy::GetNoRetryPolicy()) = 0; virtual void Download( TString url, @@ -91,7 +95,7 @@ public: std::size_t sizeLimit, TOnResult callback, TString data = {}, - IRetryPolicy</*http response code*/long>::TPtr retryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0; + TRetryPolicy::TPtr retryPolicy = TRetryPolicy::GetNoRetryPolicy()) = 0; class TCountedContent : public TContentBase { public: @@ -107,9 +111,9 @@ public: const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter; }; - using TOnDownloadStart = std::function<void(long)>; // http code. + using TOnDownloadStart = std::function<void(CURLcode, long)>; // http code. using TOnNewDataPart = std::function<void(TCountedContent&&)>; - using TOnDownloadFinish = std::function<void(TIssues)>; + using TOnDownloadFinish = std::function<void(CURLcode, TIssues)>; using TCancelHook = std::function<void(TIssue)>; virtual TCancelHook Download( diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp index 73eb1dc9a6..bf1dfaeccb 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp @@ -261,8 +261,8 @@ public: hFunc(TEvPrivate::TEvListParts, Handle); ) - bool RetryOperation(ui32 httpResponseCode) { - auto result = RetryCount && RetryPolicy->CreateRetryState()->GetNextRetryDelay(httpResponseCode); + bool RetryOperation(CURLcode curlResponseCode, ui32 httpResponseCode) { + auto result = RetryCount && RetryPolicy->CreateRetryState()->GetNextRetryDelay(curlResponseCode, httpResponseCode); if (result) { RetryCount--; } else { @@ -351,7 +351,7 @@ public: } } LOG_D("CommitMultipartUpload ERROR " << ev->Get()->State->BuildUrl()); - if (RetryOperation(result.Content.HttpResponseCode)) { + if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) { PushCommitMultipartUpload(ev->Get()->State); } } @@ -425,7 +425,7 @@ public: return; } LOG_D("ListMultipartUploads ERROR " << ev->Get()->State->BuildUrl()); - if (RetryOperation(result.Content.HttpResponseCode)) { + if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) { PushListMultipartUploads(ev->Get()->State); } } @@ -442,7 +442,7 @@ public: return; } LOG_D("AbortMultipartUpload ERROR " << ev->Get()->State->BuildUrl()); - if (RetryOperation(result.Content.HttpResponseCode)) { + if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) { PushAbortMultipartUpload(ev->Get()->State); } } @@ -482,7 +482,7 @@ public: return; } LOG_D("ListParts ERROR " << ev->Get()->State->BuildUrl()); - if (RetryOperation(result.Content.HttpResponseCode)) { + if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) { PushListParts(ev->Get()->State); } } @@ -560,7 +560,7 @@ private: ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; NYql::NDqProto::TExternalEffect ExternalEffect; NActors::TActorSystem* const ActorSystem; - const IRetryPolicy<long>::TPtr RetryPolicy; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; ui64 HttpRequestInflight = 0; ui64 RetryCount; THashSet<TString> UnknownPrefixes; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 4bcdcadc74..bb97cee6be 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -199,15 +199,18 @@ struct TEvPrivate { }; struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> { - explicit TEvReadStarted(long httpResponseCode) : HttpResponseCode(httpResponseCode) {} + TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode) + : CurlResponseCode(curlResponseCode), HttpResponseCode(httpResponseCode) {} + const CURLcode CurlResponseCode; const long HttpResponseCode; }; struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { - TEvReadFinished(size_t pathIndex, TIssues&& issues) - : PathIndex(pathIndex), Issues(std::move(issues)) { + TEvReadFinished(size_t pathIndex, CURLcode curlResponseCode, TIssues&& issues) + : PathIndex(pathIndex), CurlResponseCode(curlResponseCode), Issues(std::move(issues)) { } const size_t PathIndex; + const CURLcode CurlResponseCode; TIssues Issues; }; @@ -692,7 +695,7 @@ public: ui64 startPathIndex, const NActors::TActorId& computeActorId, ui64 sizeLimit, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, @@ -965,7 +968,7 @@ private: const ui64 InputIndex; const TTxId TxId; const NActors::TActorId ComputeActorId; - const IRetryPolicy<long>::TPtr RetryPolicy; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; TActorSystem* const ActorSystem; @@ -1030,7 +1033,7 @@ struct TRetryStuff { std::size_t sizeLimit, const TTxId& txId, const TString& requestId, - const IRetryPolicy<long>::TPtr& retryPolicy + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy ) : Gateway(std::move(gateway)) , Url(UrlEscapeRet(url, true)) , Headers(headers) @@ -1048,13 +1051,13 @@ struct TRetryStuff { std::size_t Offset, SizeLimit; const TTxId TxId; const TString RequestId; - IRetryPolicy<long>::IRetryState::TPtr RetryState; - IRetryPolicy<long>::TPtr RetryPolicy; + IHTTPGateway::TRetryPolicy::IRetryState::TPtr RetryState; + IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; IHTTPGateway::TCancelHook CancelHook; TMaybe<TDuration> NextRetryDelay; std::atomic_bool Cancelled; - const IRetryPolicy<long>::IRetryState::TPtr& GetRetryState() { + const IHTTPGateway::TRetryPolicy::IRetryState::TPtr& GetRetryState() { if (!RetryState) { RetryState = RetryPolicy->CreateRetryState(); } @@ -1074,16 +1077,16 @@ struct TRetryStuff { } }; -void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode))); +void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, CURLcode curlResponseCode, long httpResponseCode) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(curlResponseCode, httpResponseCode))); } void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) { actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); } -void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, TIssues issues) { - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues)))); +void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, CURLcode curlResponseCode, TIssues issues) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, curlResponseCode, std::move(issues)))); } void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) { @@ -1092,10 +1095,9 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit, - std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1), + std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1, std::placeholders::_2), std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1), - std::bind( - &OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1), + std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1, std::placeholders::_2), inflightCounter); } @@ -1802,6 +1804,7 @@ public: void Handle(TEvPrivate::TEvReadStarted::TPtr& ev) { HttpResponseCode = ev->Get()->HttpResponseCode; + CurlResponseCode = ev->Get()->CurlResponseCode; LOG_CORO_D("TEvReadStarted, Http code: " << HttpResponseCode); } @@ -1830,6 +1833,10 @@ public: void Handle(TEvPrivate::TEvReadFinished::TPtr& ev) { + if (CurlResponseCode == CURLE_OK) { + CurlResponseCode = ev->Get()->CurlResponseCode; + } + Issues.Clear(); if (!ErrorText.empty()) { TString errorCode; @@ -1850,7 +1857,7 @@ public: } if (Issues) { - RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode >= 300 ? HttpResponseCode : 0); + RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(CurlResponseCode, HttpResponseCode); LOG_CORO_D("TEvReadFinished with Issues (try to retry): " << Issues.ToOneLineString()); if (RetryStuff->NextRetryDelay) { // inplace retry: report problem to TransientIssues and repeat @@ -2071,6 +2078,7 @@ private: bool InputFinished = false; long HttpResponseCode = 0L; + CURLcode CurlResponseCode = CURLE_OK; bool ServerReturnedError = false; TString ErrorText; TIssues Issues; @@ -2118,7 +2126,7 @@ public: ui64 startPathIndex, const TReadSpec::TPtr& readSpec, const NActors::TActorId& computeActorId, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, @@ -2561,7 +2569,7 @@ private: const ui64 InputIndex; const TTxId TxId; const NActors::TActorId ComputeActorId; - const IRetryPolicy<long>::TPtr RetryPolicy; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const TString Url; const TString Token; @@ -2748,7 +2756,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, IArrowReader::TPtr arrowReader, ::NMonitoring::TDynamicCounterPtr counters, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 60c227cfec..019a326a98 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -23,7 +23,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, IArrowReader::TPtr arrowReader, ::NMonitoring::TDynamicCounterPtr counters, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp index 5de1839338..d42a800278 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp @@ -5,7 +5,7 @@ namespace NYql::NDq { -void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const IRetryPolicy<long>::TPtr& retryPolicy) { +void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy) { factory.RegisterSink<NS3::TSink>("S3Sink", [credentialsFactory, gateway, retryPolicy](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h index 99522d7815..2f5cc9a0f9 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h @@ -15,5 +15,5 @@ void RegisterS3WriteActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, - const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy()); + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy()); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index fef83090b9..5eae61bc38 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -14,7 +14,7 @@ void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, ::NMonitoring::TDynamicCounterPtr counters, IArrowReader::TPtr arrowReader) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index f70b13a96d..c78ff01a98 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -26,7 +26,7 @@ void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), - const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), const TS3ReadActorFactoryConfig& = {}, ::NMonitoring::TDynamicCounterPtr counters = nullptr, IArrowReader::TPtr arrowReader = MakeArrowReader(TArrowReaderSettings())); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index fd627a702c..dcb2761327 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -133,7 +133,7 @@ public: const TString& key, const TString& url, const std::string_view& compression, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, bool dirtyWrite, const TString& token) : TxId(txId) @@ -396,7 +396,7 @@ private: const TTxId TxId; const IHTTPGateway::TPtr Gateway; const NYdb::TCredentialsProviderPtr CredProvider; - const IRetryPolicy<long>::TPtr RetryPolicy; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; TActorSystem* const ActorSystem; TActorId ParentId; @@ -429,7 +429,7 @@ public: const TString& compression, bool multipart, IDqComputeActorAsyncOutput::ICallbacks* callbacks, - const IRetryPolicy<long>::TPtr& retryPolicy, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, bool dirtyWrite, const TString& token) : Gateway(std::move(gateway)) @@ -602,7 +602,7 @@ private: const NYdb::TCredentialsProviderPtr CredProvider; IRandomProvider* RandomProvider; TIntrusivePtr<IRandomProvider> DefaultRandomProvider; - const IRetryPolicy<long>::TPtr RetryPolicy; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const ui64 OutputIndex; const TTxId TxId; @@ -639,7 +639,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const THashMap<TString, TString>& secureParams, IDqComputeActorAsyncOutput::ICallbacks* callbacks, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const IRetryPolicy<long>::TPtr& retryPolicy) + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy) { const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h index 47eaafc414..3787a9f849 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h @@ -21,6 +21,6 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const THashMap<TString, TString>& secureParams, IDqComputeActorAsyncOutput::ICallbacks* callbacks, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const IRetryPolicy<long>::TPtr& retryPolicy); + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index 986a0b2a64..161b3c1d7b 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -231,7 +231,7 @@ public: const std::shared_ptr<TListEntries> Output; // HTTP control const IHTTPGateway::TWeakPtr GatewayWeak; - const IRetryPolicy<long>::TPtr RetryPolicy; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const TString RequestId; const TListingRequest ListingRequest; const TMaybe<TString> Delimiter; |