diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-26 14:58:54 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-26 14:58:54 +0300 |
commit | 42c1023cad173ae9e7737355fedf76859d7e20c6 (patch) | |
tree | ed8a83a2c8f9b9b0d70346fb8d834e0aa1f358e5 | |
parent | 9b69fdda4c660d1e32a1b1c6cbf3b30d63a6882c (diff) | |
download | ydb-42c1023cad173ae9e7737355fedf76859d7e20c6.tar.gz |
Properly retry problems in reading from S3. Don't retry limit errors from S3. Parse and analyse errors from S3. Remove user data from logs
5 files changed, 122 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 5cf99777a1e..4e700956ed3 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -136,7 +136,7 @@ public: if (DnsCache != nullptr) { curl_easy_setopt(Handle, CURLOPT_RESOLVE, DnsCache.get()); } - + if (!Headers.empty()) { CurlHeaders = std::accumulate(Headers.cbegin(), Headers.cend(), CurlHeaders, std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2))); @@ -177,7 +177,7 @@ public: return Handle; } - EMethod GetMetthod() const { + EMethod GetMethod() const { return Method; } @@ -669,7 +669,7 @@ private: } TIntrusivePtr<::NMonitoring::TDynamicCounters> group; - switch (easy->GetMetthod()) { + switch (easy->GetMethod()) { case TEasyCurl::EMethod::GET: group = GroupForGET->GetSubgroup(codeLabel, codeValue); break; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp index 6fb2891f3f7..1623fb7ff48 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp @@ -10,17 +10,11 @@ namespace NYql::NDq { bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message) { - try { - const NXml::TDocument xml(response, NXml::TDocument::String); - if (const auto& root = xml.Root(); root.Name() == "Error") { - auto tmpMessage = root.Node("Message", true).Value<TString>(); - errorCode = root.Node("Code", true).Value<TString>(); - message = tmpMessage; - return true; - } - } - catch (std::exception&) { - // just suppress any possible errors + TS3Result s3Result(response); + if (s3Result.Parsed && s3Result.IsError) { + errorCode = std::move(s3Result.S3ErrorCode); + message = std::move(s3Result.ErrorMessage); + return true; } return false; } @@ -41,4 +35,45 @@ TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& me return issues; } +static const THashMap<TStringBuf, NDqProto::StatusIds::StatusCode> S3ErrorToStatusCode = { + { "BucketMaxSizeExceeded"sv, NDqProto::StatusIds::LIMIT_EXCEEDED }, + { "CloudTotalAliveSizeQuotaExceed"sv, NDqProto::StatusIds::LIMIT_EXCEEDED }, + { "EntityTooSmall"sv, NDqProto::StatusIds::LIMIT_EXCEEDED }, + { "EntityTooLarge"sv, NDqProto::StatusIds::LIMIT_EXCEEDED }, + { "KeyTooLongError"sv, NDqProto::StatusIds::LIMIT_EXCEEDED }, + { "InvalidStorageClass"sv, NDqProto::StatusIds::PRECONDITION_FAILED }, + { "AccessDenied"sv, NDqProto::StatusIds::BAD_REQUEST }, + { "NoSuchBucket"sv, NDqProto::StatusIds::BAD_REQUEST } +}; + +NDqProto::StatusIds::StatusCode StatusFromS3ErrorCode(const TString& s3ErrorCode) { + if (s3ErrorCode.empty()) { + return NDqProto::StatusIds::UNSPECIFIED; + } + const auto it = S3ErrorToStatusCode.find(TStringBuf(s3ErrorCode)); + if (it != S3ErrorToStatusCode.end()) { + return it->second; + } + return NYql::NDqProto::StatusIds::EXTERNAL_ERROR; +} + +TS3Result::TS3Result(const TString& body) + : Body(body) +{ + try { + Xml.emplace(Body, NXml::TDocument::String); + Parsed = true; + } catch (const std::exception& ex) { + ErrorMessage = ex.what(); + IsError = true; + } + if (Parsed) { + if (const auto& root = Xml->Root(); root.Name() == "Error") { + IsError = true; + S3ErrorCode = root.Node("Code", true).Value<TString>(); + ErrorMessage = root.Node("Message", true).Value<TString>(); + } + } +} + } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h index e30332a8799..7af3f890104 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h @@ -1,12 +1,38 @@ #pragma once #include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> + +#include <library/cpp/xml/document/xml-document-decl.h> #include <util/generic/string.h> +#include <optional> + namespace NYql::NDq { bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message); TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message); +NDqProto::StatusIds::StatusCode StatusFromS3ErrorCode(const TString& s3ErrorCode); + +struct TS3Result { + const TString Body; + bool Parsed = false; + std::optional<NXml::TDocument> Xml; + + bool IsError = false; + TString S3ErrorCode; + TString ErrorMessage; + + TS3Result(const TString& body); + + operator bool() const { + return Parsed; + } + + NXml::TConstNode GetRootNode() const { + return Parsed ? Xml->Root() : NXml::TConstNode{}; + } +}; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index ffabe50549e..ed28794b032 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -525,7 +525,7 @@ struct TRetryStuff { , SizeLimit(sizeLimit) , TxId(txId) , RequestId(requestId) - , RetryState(retryPolicy->CreateRetryState()) + , RetryPolicy(retryPolicy) , Cancelled(false) {} @@ -535,11 +535,19 @@ struct TRetryStuff { std::size_t Offset, SizeLimit; const TTxId TxId; const TString RequestId; - const IRetryPolicy<long>::IRetryState::TPtr RetryState; + const IRetryPolicy<long>::TPtr RetryPolicy; + IRetryPolicy<long>::IRetryState::TPtr RetryState; IHTTPGateway::TCancelHook CancelHook; TMaybe<TDuration> NextRetryDelay; std::atomic_bool Cancelled; + const IRetryPolicy<long>::IRetryState::TPtr& GetRetryState() { + if (!RetryState) { + RetryState = RetryPolicy->CreateRetryState(); + } + return RetryState; + } + void Cancel() { Cancelled.store(true); if (const auto cancelHook = std::move(CancelHook)) { @@ -703,15 +711,15 @@ private: static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; public: - TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, - const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, const std::size_t maxBlocksInFly, + TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, + const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, + const TString& path, const TString& url, const std::size_t maxBlocksInFly, const TS3ReadActorFactoryConfig& readActorFactoryCfg, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) - : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), - TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), + : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), + TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) {} @@ -770,13 +778,17 @@ public: ErrorText.clear(); Issues.Clear(); value.clear(); - RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode); + RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode); LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", Http code: " << HttpResponseCode << ", retry after: " << RetryStuff->NextRetryDelay << ", request id: [" << RetryStuff->RequestId << "]"); + if (!RetryStuff->NextRetryDelay) { // Success or not retryable + RetryStuff->RetryState = nullptr; + } return true; case TEvPrivate::TEvReadFinished::EventType: Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); if (Issues) { - if (RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(0L); !RetryStuff->NextRetryDelay) { + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished. Url: " << RetryStuff->Url << ". Issues: " << Issues.ToOneLineString()); + if (RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(0L); !RetryStuff->NextRetryDelay) { InputFinished = true; LOG_CORO_W("TS3ReadCoroImpl", "ReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]"); throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format @@ -788,7 +800,7 @@ public: GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize)))); value.clear(); } else { - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]"); + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", request id: [" << RetryStuff->RequestId << "]"); InputFinished = true; if (ServerReturnedError) { throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 3bc4bf46b78..c27e275be09 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -222,11 +222,10 @@ private: static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: try { - const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); - if (const auto& root = xml.Root(); root.Name() == "Error") { - const auto& code = root.Node("Code", true).Value<TString>(); - const auto& message = root.Node("Message", true).Value<TString>(); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, TStringBuilder{} << message << ", request id: [" << requestId << "]"))); + TS3Result s3Result(std::get<IHTTPGateway::TContent>(std::move(result)).Extract()); + const auto& root = s3Result.GetRootNode(); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); } else if (root.Name() != "InitiateMultipartUploadResult") actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]"))); else { @@ -255,8 +254,20 @@ private: const auto& str = std::get<IHTTPGateway::TContent>(response).Headers; if (const NHttp::THeaders headers(str.substr(str.rfind("HTTP/"))); headers.Has("Etag")) actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); - else - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response: " << str << ", request id: [" << requestId << "]"))); + else { + TS3Result s3Result(std::get<IHTTPGateway::TContent>(std::move(response)).Extract()); + if (s3Result.IsError && s3Result.Parsed) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); + } else { + constexpr size_t BODY_MAX_SIZE = 1_KB; + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, + TStringBuilder() << "Unexpected response" + << ". Headers: " << str + << ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) + << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"") + << ". Request id: [" << requestId << "]"))); + } + } } break; case 1U: { @@ -270,11 +281,10 @@ private: static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: try { - const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); - if (const auto& root = xml.Root(); root.Name() == "Error") { - const auto& code = root.Node("Code", true).Value<TString>(); - const auto& message = root.Node("Message", true).Value<TString>(); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, TStringBuilder{} << message << ", request id: [" << requestId << "]"))); + TS3Result s3Result(std::get<IHTTPGateway::TContent>(std::move(result)).Extract()); + const auto& root = s3Result.GetRootNode(); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); } else if (root.Name() != "CompleteMultipartUploadResult") actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]"))); else @@ -514,16 +524,9 @@ private: void Handle(TEvPrivate::TEvUploadError::TPtr& result) { LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString()); - auto statusCode = result->Get()->StatusCode; - if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { - - // add err code analysis here - - if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") { - statusCode = NYql::NDqProto::StatusIds::LIMIT_EXCEEDED; - } else { - statusCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; - } + NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode; + if (statusCode == NDqProto::StatusIds::UNSPECIFIED) { + statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode); } Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode); |