diff options
author | hor911 <hor911@ydb.tech> | 2022-09-20 13:38:27 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-09-20 13:38:27 +0300 |
commit | 82d81702d4bd8e8e781c5f358c50e32013459779 (patch) | |
tree | dc69637341faa3ec27016d63bfe0a931a6a44595 | |
parent | c13f67fff632dd2ad5759df843bdb7f65676478d (diff) | |
download | ydb-82d81702d4bd8e8e781c5f358c50e32013459779.tar.gz |
Err specification in s3 read/write
4 files changed, 108 insertions, 29 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp index 24003b9ede2..6fb2891f3f7 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp @@ -9,24 +9,36 @@ namespace NYql::NDq { -TString ParseS3ErrorResponse(long httpCode, const TString& response) { - TStringBuilder str; - str << "HTTP response code: " << httpCode ; +bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message) { try { const NXml::TDocument xml(response, NXml::TDocument::String); if (const auto& root = xml.Root(); root.Name() == "Error") { - const auto& code = root.Node("Code", true).Value<TString>(); - const auto& message = root.Node("Message", true).Value<TString>(); - str << ", " << message << ", error code: " << code; - } else { - str << Endl << response; + auto tmpMessage = root.Node("Message", true).Value<TString>(); + errorCode = root.Node("Code", true).Value<TString>(); + message = tmpMessage; + return true; } } catch (std::exception&) { - str << Endl << response; + // just suppress any possible errors + } + return false; +} + +TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message) { + + TIssues issues; + + if (httpCode) { + issues.AddIssue(TStringBuilder() << "HTTP Code: " << httpCode); + } + if (s3ErrorCode) { + issues.AddIssue(TStringBuilder() << "Object Storage Code: " << s3ErrorCode << ", " << message); + } else { + issues.AddIssue(message); } - return str; + return issues; } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h index e808eea1fdb..e30332a8799 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h @@ -1,9 +1,12 @@ #pragma once +#include <ydb/library/yql/public/issue/yql_issue.h> + #include <util/generic/string.h> namespace NYql::NDq { -TString ParseS3ErrorResponse(long httpCode, const TString& response); +bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message); +TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 51c63e42b60..e73156e98ed 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -267,7 +267,13 @@ private: Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id)); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } else { - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, {TIssue(ParseS3ErrorResponse(httpCode, result->Get()->Result.Extract()))}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + TString errorText = result->Get()->Result.Extract(); + TString errorCode; + TString message; + if (!ParseS3ErrorResponse(errorText, errorCode, message)) { + message = errorText; + } + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, BuildIssues(httpCode, errorCode, message), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } } @@ -506,14 +512,19 @@ private: // Finish reading. Add error from server to issues } catch (const std::exception& err) { exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what(); - fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; RetryStuff->Cancel(); } WaitFinish(); if (!ErrorText.empty()) { - Issues.AddIssues({TIssue(ParseS3ErrorResponse(HttpResponseCode, ErrorText))}); + TString errorCode; + TString message; + if (!ParseS3ErrorResponse(ErrorText, errorCode, message)) { + message = ErrorText; + } + Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message)); } if (exceptIssue.Message) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 1974a528cc8..f8e8e74b8ee 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -66,8 +66,41 @@ struct TEvPrivate { }; struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> { - explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {} - const TIssues Error; + + TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message) + : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) { + BuildIssues(); + } + + TEvUploadError(const TString& s3ErrorCode, const TString& message) + : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) { + BuildIssues(); + } + + TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) + : StatusCode(statusCode), HttpCode(0), Message(message) { + BuildIssues(); + } + + TEvUploadError(long httpCode, const TString& message) + : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) { + BuildIssues(); + } + + TEvUploadError(TIssues&& issues) + : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) { + // don't build + } + + void BuildIssues() { + Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message); + } + + NYql::NDqProto::StatusIds::StatusCode StatusCode; + long HttpCode; + TString S3ErrorCode; + TString Message; + TIssues Issues; }; struct TEvUploadStarted : public TEventLocal<TEvUploadStarted, EvUploadStarted> { @@ -175,23 +208,23 @@ private: if (const auto& root = xml.Root(); root.Name() == "Error") { const auto& code = root.Node("Code", true).Value<TString>(); const auto& message = root.Node("Message", true).Value<TString>(); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, message))); } else if (root.Name() != "InitiateMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name()))); else { const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>()))); } break; } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what()))); break; } case 1U: actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); break; default: - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response."))); break; } } @@ -203,7 +236,7 @@ private: if (const NHttp::THeaders headers(str.substr(str.rfind("HTTP/"))); headers.Has("Etag")) actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); else - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response: " << str))); } break; case 1U: @@ -219,21 +252,21 @@ private: if (const auto& root = xml.Root(); root.Name() == "Error") { const auto& code = root.Node("Code", true).Value<TString>(); const auto& message = root.Node("Message", true).Value<TString>(); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, message))); } else if (root.Name() != "CompleteMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name()))); else actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url))); break; } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what()))); break; } case 1U: actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); break; default: - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response."))); break; } } @@ -242,7 +275,14 @@ private: switch (result.index()) { case 0U: if (auto content = std::get<IHTTPGateway::TContent>(std::move(result)); content.HttpResponseCode >= 300) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(ParseS3ErrorResponse(content.HttpResponseCode, content.Extract()))}))); + TString errorText = content.Extract(); + TString errorCode; + TString message; + if (ParseS3ErrorResponse(errorText, errorCode, message)) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, errorCode, message))); + } else { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, errorText))); + } } else { actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url))); } @@ -251,7 +291,7 @@ private: actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); break; default: - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")}))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response."))); break; } } @@ -421,8 +461,21 @@ private: } void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Error.ToOneLineString()); - Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); + LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString()); + + auto statusCode = result->Get()->StatusCode; + if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { + + // add err code analysis here + + if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") { + statusCode = NYql::NDqProto::StatusIds::LIMIT_EXCEEDED; + } else { + statusCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; + } + } + + Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode); } void FinishIfNeeded() { |