aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-01-26 14:58:54 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-01-26 14:58:54 +0300
commit42c1023cad173ae9e7737355fedf76859d7e20c6 (patch)
treeed8a83a2c8f9b9b0d70346fb8d834e0aa1f358e5
parent9b69fdda4c660d1e32a1b1c6cbf3b30d63a6882c (diff)
downloadydb-42c1023cad173ae9e7737355fedf76859d7e20c6.tar.gz
Properly retry problems in reading from S3. Don't retry limit errors from S3. Parse and analyse errors from S3. Remove user data from logs
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp6
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp57
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h26
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp32
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp47
5 files changed, 122 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 5cf99777a1e..4e700956ed3 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -136,7 +136,7 @@ public:
if (DnsCache != nullptr) {
curl_easy_setopt(Handle, CURLOPT_RESOLVE, DnsCache.get());
}
-
+
if (!Headers.empty()) {
CurlHeaders = std::accumulate(Headers.cbegin(), Headers.cend(), CurlHeaders,
std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2)));
@@ -177,7 +177,7 @@ public:
return Handle;
}
- EMethod GetMetthod() const {
+ EMethod GetMethod() const {
return Method;
}
@@ -669,7 +669,7 @@ private:
}
TIntrusivePtr<::NMonitoring::TDynamicCounters> group;
- switch (easy->GetMetthod()) {
+ switch (easy->GetMethod()) {
case TEasyCurl::EMethod::GET:
group = GroupForGET->GetSubgroup(codeLabel, codeValue);
break;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
index 6fb2891f3f7..1623fb7ff48 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
@@ -10,17 +10,11 @@
namespace NYql::NDq {
bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message) {
- try {
- const NXml::TDocument xml(response, NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- auto tmpMessage = root.Node("Message", true).Value<TString>();
- errorCode = root.Node("Code", true).Value<TString>();
- message = tmpMessage;
- return true;
- }
- }
- catch (std::exception&) {
- // just suppress any possible errors
+ TS3Result s3Result(response);
+ if (s3Result.Parsed && s3Result.IsError) {
+ errorCode = std::move(s3Result.S3ErrorCode);
+ message = std::move(s3Result.ErrorMessage);
+ return true;
}
return false;
}
@@ -41,4 +35,45 @@ TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& me
return issues;
}
+static const THashMap<TStringBuf, NDqProto::StatusIds::StatusCode> S3ErrorToStatusCode = {
+ { "BucketMaxSizeExceeded"sv, NDqProto::StatusIds::LIMIT_EXCEEDED },
+ { "CloudTotalAliveSizeQuotaExceed"sv, NDqProto::StatusIds::LIMIT_EXCEEDED },
+ { "EntityTooSmall"sv, NDqProto::StatusIds::LIMIT_EXCEEDED },
+ { "EntityTooLarge"sv, NDqProto::StatusIds::LIMIT_EXCEEDED },
+ { "KeyTooLongError"sv, NDqProto::StatusIds::LIMIT_EXCEEDED },
+ { "InvalidStorageClass"sv, NDqProto::StatusIds::PRECONDITION_FAILED },
+ { "AccessDenied"sv, NDqProto::StatusIds::BAD_REQUEST },
+ { "NoSuchBucket"sv, NDqProto::StatusIds::BAD_REQUEST }
+};
+
+NDqProto::StatusIds::StatusCode StatusFromS3ErrorCode(const TString& s3ErrorCode) {
+ if (s3ErrorCode.empty()) {
+ return NDqProto::StatusIds::UNSPECIFIED;
+ }
+ const auto it = S3ErrorToStatusCode.find(TStringBuf(s3ErrorCode));
+ if (it != S3ErrorToStatusCode.end()) {
+ return it->second;
+ }
+ return NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
+}
+
+TS3Result::TS3Result(const TString& body)
+ : Body(body)
+{
+ try {
+ Xml.emplace(Body, NXml::TDocument::String);
+ Parsed = true;
+ } catch (const std::exception& ex) {
+ ErrorMessage = ex.what();
+ IsError = true;
+ }
+ if (Parsed) {
+ if (const auto& root = Xml->Root(); root.Name() == "Error") {
+ IsError = true;
+ S3ErrorCode = root.Node("Code", true).Value<TString>();
+ ErrorMessage = root.Node("Message", true).Value<TString>();
+ }
+ }
+}
+
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
index e30332a8799..7af3f890104 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
@@ -1,12 +1,38 @@
#pragma once
#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
+
+#include <library/cpp/xml/document/xml-document-decl.h>
#include <util/generic/string.h>
+#include <optional>
+
namespace NYql::NDq {
bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message);
TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message);
+NDqProto::StatusIds::StatusCode StatusFromS3ErrorCode(const TString& s3ErrorCode);
+
+struct TS3Result {
+ const TString Body;
+ bool Parsed = false;
+ std::optional<NXml::TDocument> Xml;
+
+ bool IsError = false;
+ TString S3ErrorCode;
+ TString ErrorMessage;
+
+ TS3Result(const TString& body);
+
+ operator bool() const {
+ return Parsed;
+ }
+
+ NXml::TConstNode GetRootNode() const {
+ return Parsed ? Xml->Root() : NXml::TConstNode{};
+ }
+};
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index ffabe50549e..ed28794b032 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -525,7 +525,7 @@ struct TRetryStuff {
, SizeLimit(sizeLimit)
, TxId(txId)
, RequestId(requestId)
- , RetryState(retryPolicy->CreateRetryState())
+ , RetryPolicy(retryPolicy)
, Cancelled(false)
{}
@@ -535,11 +535,19 @@ struct TRetryStuff {
std::size_t Offset, SizeLimit;
const TTxId TxId;
const TString RequestId;
- const IRetryPolicy<long>::IRetryState::TPtr RetryState;
+ const IRetryPolicy<long>::TPtr RetryPolicy;
+ IRetryPolicy<long>::IRetryState::TPtr RetryState;
IHTTPGateway::TCancelHook CancelHook;
TMaybe<TDuration> NextRetryDelay;
std::atomic_bool Cancelled;
+ const IRetryPolicy<long>::IRetryState::TPtr& GetRetryState() {
+ if (!RetryState) {
+ RetryState = RetryPolicy->CreateRetryState();
+ }
+ return RetryState;
+ }
+
void Cancel() {
Cancelled.store(true);
if (const auto cancelHook = std::move(CancelHook)) {
@@ -703,15 +711,15 @@ private:
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
public:
- TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
- const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url, const std::size_t maxBlocksInFly,
+ TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
+ const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
+ const TString& path, const TString& url, const std::size_t maxBlocksInFly,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
- : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
- TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
+ : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
+ TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
{}
@@ -770,13 +778,17 @@ public:
ErrorText.clear();
Issues.Clear();
value.clear();
- RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode);
+ RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode);
LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", Http code: " << HttpResponseCode << ", retry after: " << RetryStuff->NextRetryDelay << ", request id: [" << RetryStuff->RequestId << "]");
+ if (!RetryStuff->NextRetryDelay) { // Success or not retryable
+ RetryStuff->RetryState = nullptr;
+ }
return true;
case TEvPrivate::TEvReadFinished::EventType:
Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
if (Issues) {
- if (RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(0L); !RetryStuff->NextRetryDelay) {
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished. Url: " << RetryStuff->Url << ". Issues: " << Issues.ToOneLineString());
+ if (RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(0L); !RetryStuff->NextRetryDelay) {
InputFinished = true;
LOG_CORO_W("TS3ReadCoroImpl", "ReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
@@ -788,7 +800,7 @@ public:
GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize))));
value.clear();
} else {
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", request id: [" << RetryStuff->RequestId << "]");
InputFinished = true;
if (ServerReturnedError) {
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 3bc4bf46b78..c27e275be09 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -222,11 +222,10 @@ private:
static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U: try {
- const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, TStringBuilder{} << message << ", request id: [" << requestId << "]")));
+ TS3Result s3Result(std::get<IHTTPGateway::TContent>(std::move(result)).Extract());
+ const auto& root = s3Result.GetRootNode();
+ if (s3Result.IsError) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
} else if (root.Name() != "InitiateMultipartUploadResult")
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]")));
else {
@@ -255,8 +254,20 @@ private:
const auto& str = std::get<IHTTPGateway::TContent>(response).Headers;
if (const NHttp::THeaders headers(str.substr(str.rfind("HTTP/"))); headers.Has("Etag"))
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));
- else
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response: " << str << ", request id: [" << requestId << "]")));
+ else {
+ TS3Result s3Result(std::get<IHTTPGateway::TContent>(std::move(response)).Extract());
+ if (s3Result.IsError && s3Result.Parsed) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
+ } else {
+ constexpr size_t BODY_MAX_SIZE = 1_KB;
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR,
+ TStringBuilder() << "Unexpected response"
+ << ". Headers: " << str
+ << ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE)
+ << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"")
+ << ". Request id: [" << requestId << "]")));
+ }
+ }
}
break;
case 1U: {
@@ -270,11 +281,10 @@ private:
static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U: try {
- const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, TStringBuilder{} << message << ", request id: [" << requestId << "]")));
+ TS3Result s3Result(std::get<IHTTPGateway::TContent>(std::move(result)).Extract());
+ const auto& root = s3Result.GetRootNode();
+ if (s3Result.IsError) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
} else if (root.Name() != "CompleteMultipartUploadResult")
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]")));
else
@@ -514,16 +524,9 @@ private:
void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString());
- auto statusCode = result->Get()->StatusCode;
- if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
-
- // add err code analysis here
-
- if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") {
- statusCode = NYql::NDqProto::StatusIds::LIMIT_EXCEEDED;
- } else {
- statusCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
- }
+ NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode;
+ if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
+ statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode);
}
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);