aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-04-26 17:18:36 +0300
committerhor911 <hor911@ydb.tech>2023-04-26 17:18:36 +0300
commitbf749f3ac3722adebe8244692adfc2d8d07208c5 (patch)
treee598e330daa4218a48d9f9b03578f0d4eb3152ee
parent6d73358874922366b2611962ff0179b01fb838a0 (diff)
downloadydb-bf749f3ac3722adebe8244692adfc2d8d07208c5.tar.gz
Retry policy for CURLcode
-rw-r--r--ydb/core/fq/libs/test_connection/test_object_storage.cpp6
-rw-r--r--ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp2
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp2
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/arrow_reader.h4
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp6
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp29
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h4
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp53
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h18
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp14
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp48
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp10
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp2
19 files changed, 122 insertions, 88 deletions
diff --git a/ydb/core/fq/libs/test_connection/test_object_storage.cpp b/ydb/core/fq/libs/test_connection/test_object_storage.cpp
index 1d8f2ec9ad..a627c6f0fc 100644
--- a/ydb/core/fq/libs/test_connection/test_object_storage.cpp
+++ b/ydb/core/fq/libs/test_connection/test_object_storage.cpp
@@ -147,8 +147,8 @@ private:
}
}
- static ERetryErrorClass RetryS3SlowDown(long httpResponseCode) {
- return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503
+ static ERetryErrorClass RetryS3SlowDown(CURLcode curlResponseCode, long httpResponseCode) {
+ return curlResponseCode == CURLE_OK && httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503
}
void SendDiscover() {
@@ -164,7 +164,7 @@ private:
TString requestId = CreateGuidAsString();
headers.emplace_back(TString{"X-Request-ID:"} + requestId);
- const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown);
+ const auto retryPolicy = NYql::IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy(RetryS3SlowDown);
NYql::TUrlBuilder urlBuilder(ClusterConfig.GetUrl());
const auto url = urlBuilder.AddUrlParam("list-type", "2")
diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
index 9c89e26e1c..4f7c7ecc28 100644
--- a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
+++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
@@ -112,7 +112,7 @@ private:
IHTTPGateway::TPtr Gateway;
IHTTPGateway::THeaders Headers;
- IRetryPolicy<long>::TPtr RetryPolicy;
+ IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const size_t FileSize;
int64_t InnerPos = 0;
diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
index 68d75e34e3..a3a95b2587 100644
--- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
+++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
@@ -5,7 +5,7 @@ TArrowFileDesc::TArrowFileDesc(
const TString& url,
IHTTPGateway::TPtr gateway,
IHTTPGateway::THeaders headers,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t size,
const TString& format
)
diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
index 556d584962..99a40fc363 100644
--- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
+++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
@@ -18,7 +18,7 @@ public:
const TString& url,
IHTTPGateway::TPtr gateway,
IHTTPGateway::THeaders headers,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
size_t size,
const TString& format = ""
);
@@ -26,7 +26,7 @@ public:
TString Url;
IHTTPGateway::TPtr Gateway;
IHTTPGateway::THeaders Headers;
- IRetryPolicy<long>::TPtr RetryPolicy;
+ IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
TString Format;
size_t Size;
bool IsLocal;
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 1c2fe083af..7ef5a47b71 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -43,9 +43,9 @@ public:
return std::move(ret);
}
- void Upload(TString, THeaders, TString, TOnResult, bool, IRetryPolicy<long>::TPtr) {}
+ void Upload(TString, THeaders, TString, TOnResult, bool, TRetryPolicy::TPtr) final {}
- void Delete(TString, THeaders, TOnResult, IRetryPolicy<long>::TPtr) {}
+ void Delete(TString, THeaders, TOnResult, TRetryPolicy::TPtr) final {}
void Download(
TString url,
@@ -54,7 +54,7 @@ public:
std::size_t sizeLimit,
TOnResult callback,
TString data,
- IRetryPolicy<long>::TPtr retryPolicy)
+ TRetryPolicy::TPtr retryPolicy) final
{
Y_UNUSED(sizeLimit);
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp
index 01edb1587d..eb94f33826 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp
@@ -2,14 +2,37 @@
namespace NYql {
-IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries) {
+IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries) {
if (!maxTime) {
maxTime = TDuration::Minutes(5);
}
- return IRetryPolicy<long>::GetExponentialBackoffPolicy([](long httpCode) {
+ return IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy([](CURLcode curlCode, long httpCode) {
+
+ switch (curlCode) {
+ case CURLE_OK:
+ // look to http code
+ break;
+ case CURLE_COULDNT_CONNECT:
+ case CURLE_WEIRD_SERVER_REPLY:
+ case CURLE_WRITE_ERROR:
+ case CURLE_READ_ERROR:
+ case CURLE_OPERATION_TIMEDOUT:
+ case CURLE_SSL_CONNECT_ERROR:
+ case CURLE_BAD_DOWNLOAD_RESUME:
+ case CURLE_SEND_ERROR:
+ case CURLE_RECV_ERROR:
+ case CURLE_NO_CONNECTION_AVAILABLE:
+ // retry small number of known errors
+ return ERetryErrorClass::ShortRetry;
+ default:
+ // do not retry others
+ return ERetryErrorClass::NoRetry;
+ }
+
switch (httpCode) {
case 0:
- return ERetryErrorClass::ShortRetry;
+ // rare case when curl code is not available like manual cancelling, not retriable anymore
+ return ERetryErrorClass::NoRetry;
case 408: // Request Timeout
case 425: // Too Early
case 429: // Too Many Requests
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h
index 4a245a8b4f..74169c215a 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h
@@ -1,9 +1,9 @@
#pragma once
-#include <library/cpp/retry/retry_policy.h>
+#include "yql_http_gateway.h"
namespace NYql {
-IRetryPolicy<long>::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero(), size_t maxRetries = std::numeric_limits<size_t>::max()); // Zero means default maxTime
+IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero(), size_t maxRetries = std::numeric_limits<size_t>::max()); // Zero means default maxTime
}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 773e4cc882..5deb77748a 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -2,7 +2,6 @@
#include "yql_dns_gateway.h"
#include <ydb/library/yql/utils/log/log.h>
-#include <contrib/libs/curl/include/curl/curl.h>
#include <util/stream/str.h>
#include <util/string/builder.h>
#include <util/generic/size_literals.h>
@@ -219,7 +218,7 @@ public:
return Method;
}
- virtual void Fail(const TIssue& error) = 0;
+ virtual void Fail(CURLcode result, const TIssue& error) = 0;
virtual void Done(CURLcode result, long httpResponseCode) = 0;
virtual size_t Write(void* contents, size_t size, size_t nmemb) = 0;
@@ -295,7 +294,7 @@ public:
size_t offset,
size_t sizeLimit,
IHTTPGateway::TOnResult callback,
- IRetryPolicy<long>::IRetryState::TPtr retryState,
+ IHTTPGateway::TRetryPolicy::IRetryState::TPtr retryState,
const TCurlInitConfig& config = TCurlInitConfig(),
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
: TEasyCurl(
@@ -330,7 +329,7 @@ public:
size_t offset,
size_t sizeLimit,
IHTTPGateway::TOnResult callback,
- IRetryPolicy<long>::IRetryState::TPtr retryState,
+ IHTTPGateway::TRetryPolicy::IRetryState::TPtr retryState,
const TCurlInitConfig& config = TCurlInitConfig(),
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) {
return std::make_shared<TEasyCurlBuffer>(
@@ -358,9 +357,9 @@ public:
return true;
}
- TMaybe<TDuration> GetNextRetryDelay(long httpResponseCode) const {
+ TMaybe<TDuration> GetNextRetryDelay(CURLcode curlResponseCode, long httpResponseCode) const {
if (RetryState)
- return RetryState->GetNextRetryDelay(httpResponseCode);
+ return RetryState->GetNextRetryDelay(curlResponseCode, httpResponseCode);
return {};
}
@@ -375,18 +374,18 @@ public:
InitHandles();
}
private:
- void Fail(const TIssue& error) final {
+ void Fail(CURLcode result, const TIssue& error) final {
TIssues issues{error};
const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
- Callbacks.top()(issues);
+ Callbacks.top()(IHTTPGateway::TResult(result, issues));
Callbacks.pop();
}
}
void Done(CURLcode result, long httpResponseCode) final {
if (CURLE_OK != result)
- return Fail(TIssue( TStringBuilder{} << curl_easy_strerror(result) << ". Detailed: " << GetDetailedErrorText()));
+ return Fail(result, TIssue( TStringBuilder{} << curl_easy_strerror(result) << ". Detailed: " << GetDetailedErrorText()));
const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
@@ -421,7 +420,7 @@ private:
std::mutex SyncCallbacks;
std::stack<IHTTPGateway::TOnResult> Callbacks;
- const IRetryPolicy<long>::IRetryState::TPtr RetryState;
+ const IHTTPGateway::TRetryPolicy::IRetryState::TPtr RetryState;
};
class TEasyCurlStream : public TEasyCurl {
@@ -495,35 +494,35 @@ public:
void Cancel(TIssue issue) {
Cancelled = true;
- OnFinish(TIssues{issue});
+ OnFinish(CURLE_OK, TIssues{issue});
}
private:
- void Fail(const TIssue& error) final {
+ void Fail(CURLcode result, const TIssue& error) final {
if (!Cancelled)
- OnFinish(TIssues{error});
+ OnFinish(result, TIssues{error});
}
- void MaybeStart(long httpResponseCode = 0) {
+ void MaybeStart(CURLcode result, long httpResponseCode = 0) {
if (!HttpResponseCode) {
if (!httpResponseCode) {
curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
}
HttpResponseCode = httpResponseCode;
- OnStart(HttpResponseCode);
+ OnStart(result, HttpResponseCode);
}
}
void Done(CURLcode result, long httpResponseCode) final {
- MaybeStart(httpResponseCode);
+ MaybeStart(result, httpResponseCode);
if (CURLE_OK != result) {
- return Fail(TIssue(TStringBuilder{} << "error: " << curl_easy_strerror(result) << " detailed: " << GetDetailedErrorText()));
+ return Fail(result, TIssue(TStringBuilder{} << "error: " << curl_easy_strerror(result) << " detailed: " << GetDetailedErrorText()));
}
if (!Cancelled)
- OnFinish(TIssues());
+ OnFinish(result, TIssues());
}
size_t Write(void* contents, size_t size, size_t nmemb) final {
- MaybeStart();
+ MaybeStart(CURLE_OK);
const auto realsize = size * nmemb;
if (!Cancelled)
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter));
@@ -545,7 +544,7 @@ private:
long HttpResponseCode = 0L;
};
-using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
+using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IHTTPGateway::TRetryPolicy::TPtr>;
class TKeyHash {
public:
@@ -560,7 +559,7 @@ public:
}
private:
const std::hash<TString> Hash;
- const std::hash<IRetryPolicy<long>::TPtr> HashPtr;
+ const std::hash<IHTTPGateway::TRetryPolicy::TPtr> HashPtr;
};
class THTTPMultiGateway : public IHTTPGateway {
@@ -788,7 +787,7 @@ private:
if (auto buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) {
AllocatedSize -= buffer->GetSizeLimit();
- if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(httpResponseCode)) {
+ if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(result, httpResponseCode)) {
buffer->Reset();
Delayed.emplace(nextRetryDelay->ToDeadLine(), std::move(buffer));
easy.reset();
@@ -818,12 +817,12 @@ private:
const TIssue error(curl_multi_strerror(result));
while (!works.empty()) {
curl_multi_remove_handle(Handle, works.top()->GetHandle());
- works.top()->Fail(error);
+ works.top()->Fail(CURLE_OK, error);
works.pop();
}
}
- void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, IRetryPolicy<long>::TPtr retryPolicy) final {
+ void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final {
Rps->Inc();
const std::unique_lock lock(Sync);
@@ -832,7 +831,7 @@ private:
Wakeup(0U);
}
- void Delete(TString url, THeaders headers, TOnResult callback, IRetryPolicy<long>::TPtr retryPolicy) final {
+ void Delete(TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final {
Rps->Inc();
const std::unique_lock lock(Sync);
@@ -848,12 +847,12 @@ private:
size_t sizeLimit,
TOnResult callback,
TString data,
- IRetryPolicy<long>::TPtr retryPolicy) final
+ TRetryPolicy::TPtr retryPolicy) final
{
Rps->Inc();
if (sizeLimit > MaxSimulatenousDownloadsSize) {
TIssue error(TStringBuilder() << "Too big file for downloading: size " << sizeLimit << ", but limit is " << MaxSimulatenousDownloadsSize);
- callback(TIssues{error});
+ callback(TResult(CURLE_OK, TIssues{error}));
return;
}
const std::unique_lock lock(Sync);
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index bef5a2bc26..8b48f54efe 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -7,6 +7,8 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/retry/retry_policy.h>
+#include <contrib/libs/curl/include/curl/curl.h>
+
#include <atomic>
#include <variant>
#include <functional>
@@ -17,6 +19,7 @@ class IHTTPGateway {
public:
using TPtr = std::shared_ptr<IHTTPGateway>;
using TWeakPtr = std::weak_ptr<IHTTPGateway>;
+ using TRetryPolicy = IRetryPolicy<CURLcode, long>;
virtual ~IHTTPGateway() = default;
@@ -63,9 +66,10 @@ public:
using THeaders = TSmallVec<TString>;
struct TResult {
- TResult(TContent&& content) : Content(std::move(content)) {}
- TResult(const TIssues& issues) : Content(""), Issues(issues) {}
+ TResult(TContent&& content) : Content(std::move(content)), CurlResponseCode(CURLE_OK) {}
+ TResult(CURLcode curlResponseCode, const TIssues& issues) : Content(""), CurlResponseCode(curlResponseCode), Issues(issues) {}
TContent Content;
+ CURLcode CurlResponseCode;
TIssues Issues;
};
using TOnResult = std::function<void(TResult&&)>;
@@ -76,13 +80,13 @@ public:
TString body,
TOnResult callback,
bool put = false,
- IRetryPolicy</*http response code*/long>::TPtr retryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0;
+ TRetryPolicy::TPtr retryPolicy = TRetryPolicy::GetNoRetryPolicy()) = 0;
virtual void Delete(
TString url,
THeaders headers,
TOnResult callback,
- IRetryPolicy</*http response code*/long>::TPtr retryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0;
+ TRetryPolicy::TPtr retryPolicy = TRetryPolicy::GetNoRetryPolicy()) = 0;
virtual void Download(
TString url,
@@ -91,7 +95,7 @@ public:
std::size_t sizeLimit,
TOnResult callback,
TString data = {},
- IRetryPolicy</*http response code*/long>::TPtr retryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0;
+ TRetryPolicy::TPtr retryPolicy = TRetryPolicy::GetNoRetryPolicy()) = 0;
class TCountedContent : public TContentBase {
public:
@@ -107,9 +111,9 @@ public:
const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter;
};
- using TOnDownloadStart = std::function<void(long)>; // http code.
+ using TOnDownloadStart = std::function<void(CURLcode, long)>; // http code.
using TOnNewDataPart = std::function<void(TCountedContent&&)>;
- using TOnDownloadFinish = std::function<void(TIssues)>;
+ using TOnDownloadFinish = std::function<void(CURLcode, TIssues)>;
using TCancelHook = std::function<void(TIssue)>;
virtual TCancelHook Download(
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
index 73eb1dc9a6..bf1dfaeccb 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
@@ -261,8 +261,8 @@ public:
hFunc(TEvPrivate::TEvListParts, Handle);
)
- bool RetryOperation(ui32 httpResponseCode) {
- auto result = RetryCount && RetryPolicy->CreateRetryState()->GetNextRetryDelay(httpResponseCode);
+ bool RetryOperation(CURLcode curlResponseCode, ui32 httpResponseCode) {
+ auto result = RetryCount && RetryPolicy->CreateRetryState()->GetNextRetryDelay(curlResponseCode, httpResponseCode);
if (result) {
RetryCount--;
} else {
@@ -351,7 +351,7 @@ public:
}
}
LOG_D("CommitMultipartUpload ERROR " << ev->Get()->State->BuildUrl());
- if (RetryOperation(result.Content.HttpResponseCode)) {
+ if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) {
PushCommitMultipartUpload(ev->Get()->State);
}
}
@@ -425,7 +425,7 @@ public:
return;
}
LOG_D("ListMultipartUploads ERROR " << ev->Get()->State->BuildUrl());
- if (RetryOperation(result.Content.HttpResponseCode)) {
+ if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) {
PushListMultipartUploads(ev->Get()->State);
}
}
@@ -442,7 +442,7 @@ public:
return;
}
LOG_D("AbortMultipartUpload ERROR " << ev->Get()->State->BuildUrl());
- if (RetryOperation(result.Content.HttpResponseCode)) {
+ if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) {
PushAbortMultipartUpload(ev->Get()->State);
}
}
@@ -482,7 +482,7 @@ public:
return;
}
LOG_D("ListParts ERROR " << ev->Get()->State->BuildUrl());
- if (RetryOperation(result.Content.HttpResponseCode)) {
+ if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) {
PushListParts(ev->Get()->State);
}
}
@@ -560,7 +560,7 @@ private:
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
NYql::NDqProto::TExternalEffect ExternalEffect;
NActors::TActorSystem* const ActorSystem;
- const IRetryPolicy<long>::TPtr RetryPolicy;
+ const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
ui64 HttpRequestInflight = 0;
ui64 RetryCount;
THashSet<TString> UnknownPrefixes;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 4bcdcadc74..bb97cee6be 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -199,15 +199,18 @@ struct TEvPrivate {
};
struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> {
- explicit TEvReadStarted(long httpResponseCode) : HttpResponseCode(httpResponseCode) {}
+ TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode)
+ : CurlResponseCode(curlResponseCode), HttpResponseCode(httpResponseCode) {}
+ const CURLcode CurlResponseCode;
const long HttpResponseCode;
};
struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
- TEvReadFinished(size_t pathIndex, TIssues&& issues)
- : PathIndex(pathIndex), Issues(std::move(issues)) {
+ TEvReadFinished(size_t pathIndex, CURLcode curlResponseCode, TIssues&& issues)
+ : PathIndex(pathIndex), CurlResponseCode(curlResponseCode), Issues(std::move(issues)) {
}
const size_t PathIndex;
+ const CURLcode CurlResponseCode;
TIssues Issues;
};
@@ -692,7 +695,7 @@ public:
ui64 startPathIndex,
const NActors::TActorId& computeActorId,
ui64 sizeLimit,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
@@ -965,7 +968,7 @@ private:
const ui64 InputIndex;
const TTxId TxId;
const NActors::TActorId ComputeActorId;
- const IRetryPolicy<long>::TPtr RetryPolicy;
+ const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
TActorSystem* const ActorSystem;
@@ -1030,7 +1033,7 @@ struct TRetryStuff {
std::size_t sizeLimit,
const TTxId& txId,
const TString& requestId,
- const IRetryPolicy<long>::TPtr& retryPolicy
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy
) : Gateway(std::move(gateway))
, Url(UrlEscapeRet(url, true))
, Headers(headers)
@@ -1048,13 +1051,13 @@ struct TRetryStuff {
std::size_t Offset, SizeLimit;
const TTxId TxId;
const TString RequestId;
- IRetryPolicy<long>::IRetryState::TPtr RetryState;
- IRetryPolicy<long>::TPtr RetryPolicy;
+ IHTTPGateway::TRetryPolicy::IRetryState::TPtr RetryState;
+ IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
IHTTPGateway::TCancelHook CancelHook;
TMaybe<TDuration> NextRetryDelay;
std::atomic_bool Cancelled;
- const IRetryPolicy<long>::IRetryState::TPtr& GetRetryState() {
+ const IHTTPGateway::TRetryPolicy::IRetryState::TPtr& GetRetryState() {
if (!RetryState) {
RetryState = RetryPolicy->CreateRetryState();
}
@@ -1074,16 +1077,16 @@ struct TRetryStuff {
}
};
-void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode)));
+void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, CURLcode curlResponseCode, long httpResponseCode) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(curlResponseCode, httpResponseCode)));
}
void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) {
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
}
-void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, TIssues issues) {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues))));
+void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, CURLcode curlResponseCode, TIssues issues) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, curlResponseCode, std::move(issues))));
}
void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) {
@@ -1092,10 +1095,9 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste
retryStuff->Headers,
retryStuff->Offset,
retryStuff->SizeLimit,
- std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1),
+ std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1, std::placeholders::_2),
std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1),
- std::bind(
- &OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1),
+ std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1, std::placeholders::_2),
inflightCounter);
}
@@ -1802,6 +1804,7 @@ public:
void Handle(TEvPrivate::TEvReadStarted::TPtr& ev) {
HttpResponseCode = ev->Get()->HttpResponseCode;
+ CurlResponseCode = ev->Get()->CurlResponseCode;
LOG_CORO_D("TEvReadStarted, Http code: " << HttpResponseCode);
}
@@ -1830,6 +1833,10 @@ public:
void Handle(TEvPrivate::TEvReadFinished::TPtr& ev) {
+ if (CurlResponseCode == CURLE_OK) {
+ CurlResponseCode = ev->Get()->CurlResponseCode;
+ }
+
Issues.Clear();
if (!ErrorText.empty()) {
TString errorCode;
@@ -1850,7 +1857,7 @@ public:
}
if (Issues) {
- RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode >= 300 ? HttpResponseCode : 0);
+ RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(CurlResponseCode, HttpResponseCode);
LOG_CORO_D("TEvReadFinished with Issues (try to retry): " << Issues.ToOneLineString());
if (RetryStuff->NextRetryDelay) {
// inplace retry: report problem to TransientIssues and repeat
@@ -2071,6 +2078,7 @@ private:
bool InputFinished = false;
long HttpResponseCode = 0L;
+ CURLcode CurlResponseCode = CURLE_OK;
bool ServerReturnedError = false;
TString ErrorText;
TIssues Issues;
@@ -2118,7 +2126,7 @@ public:
ui64 startPathIndex,
const TReadSpec::TPtr& readSpec,
const NActors::TActorId& computeActorId,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const std::size_t maxBlocksInFly,
IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
@@ -2561,7 +2569,7 @@ private:
const ui64 InputIndex;
const TTxId TxId;
const NActors::TActorId ComputeActorId;
- const IRetryPolicy<long>::TPtr RetryPolicy;
+ const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const TString Token;
@@ -2748,7 +2756,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
IArrowReader::TPtr arrowReader,
::NMonitoring::TDynamicCounterPtr counters,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 60c227cfec..019a326a98 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -23,7 +23,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
IArrowReader::TPtr arrowReader,
::NMonitoring::TDynamicCounterPtr counters,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
index 5de1839338..d42a800278 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
@@ -5,7 +5,7 @@
namespace NYql::NDq {
-void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const IRetryPolicy<long>::TPtr& retryPolicy) {
+void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy) {
factory.RegisterSink<NS3::TSink>("S3Sink",
[credentialsFactory, gateway, retryPolicy](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
index 99522d7815..2f5cc9a0f9 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
@@ -15,5 +15,5 @@ void RegisterS3WriteActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
- const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy());
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy());
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index fef83090b9..5eae61bc38 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -14,7 +14,7 @@ void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
IArrowReader::TPtr arrowReader) {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index f70b13a96d..c78ff01a98 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -26,7 +26,7 @@ void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
- const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
const TS3ReadActorFactoryConfig& = {},
::NMonitoring::TDynamicCounterPtr counters = nullptr,
IArrowReader::TPtr arrowReader = MakeArrowReader(TArrowReaderSettings()));
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index fd627a702c..dcb2761327 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -133,7 +133,7 @@ public:
const TString& key,
const TString& url,
const std::string_view& compression,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool dirtyWrite,
const TString& token)
: TxId(txId)
@@ -396,7 +396,7 @@ private:
const TTxId TxId;
const IHTTPGateway::TPtr Gateway;
const NYdb::TCredentialsProviderPtr CredProvider;
- const IRetryPolicy<long>::TPtr RetryPolicy;
+ const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
TActorSystem* const ActorSystem;
TActorId ParentId;
@@ -429,7 +429,7 @@ public:
const TString& compression,
bool multipart,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
- const IRetryPolicy<long>::TPtr& retryPolicy,
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
bool dirtyWrite,
const TString& token)
: Gateway(std::move(gateway))
@@ -602,7 +602,7 @@ private:
const NYdb::TCredentialsProviderPtr CredProvider;
IRandomProvider* RandomProvider;
TIntrusivePtr<IRandomProvider> DefaultRandomProvider;
- const IRetryPolicy<long>::TPtr RetryPolicy;
+ const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const ui64 OutputIndex;
const TTxId TxId;
@@ -639,7 +639,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const THashMap<TString, TString>& secureParams,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const IRetryPolicy<long>::TPtr& retryPolicy)
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy)
{
const auto token = secureParams.Value(params.GetToken(), TString{});
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
index 47eaafc414..3787a9f849 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
@@ -21,6 +21,6 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const THashMap<TString, TString>& secureParams,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const IRetryPolicy<long>::TPtr& retryPolicy);
+ const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
index 986a0b2a64..161b3c1d7b 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
@@ -231,7 +231,7 @@ public:
const std::shared_ptr<TListEntries> Output;
// HTTP control
const IHTTPGateway::TWeakPtr GatewayWeak;
- const IRetryPolicy<long>::TPtr RetryPolicy;
+ const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString RequestId;
const TListingRequest ListingRequest;
const TMaybe<TString> Delimiter;