diff options
| author | hor911 <[email protected]> | 2022-09-20 13:38:27 +0300 | 
|---|---|---|
| committer | hor911 <[email protected]> | 2022-09-20 13:38:27 +0300 | 
| commit | 82d81702d4bd8e8e781c5f358c50e32013459779 (patch) | |
| tree | dc69637341faa3ec27016d63bfe0a931a6a44595 | |
| parent | c13f67fff632dd2ad5759df843bdb7f65676478d (diff) | |
Err specification in s3 read/write
4 files changed, 108 insertions, 29 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp index 24003b9ede2..6fb2891f3f7 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp @@ -9,24 +9,36 @@  namespace NYql::NDq { -TString ParseS3ErrorResponse(long httpCode, const TString& response) { -    TStringBuilder str; -    str << "HTTP response code: " << httpCode ; +bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message) {      try {          const NXml::TDocument xml(response, NXml::TDocument::String);          if (const auto& root = xml.Root(); root.Name() == "Error") { -            const auto& code = root.Node("Code", true).Value<TString>(); -            const auto& message = root.Node("Message", true).Value<TString>(); -            str << ", " << message << ", error code: " << code; -        } else { -            str << Endl << response; +            auto tmpMessage = root.Node("Message", true).Value<TString>(); +            errorCode = root.Node("Code", true).Value<TString>(); +            message = tmpMessage; +            return true;          }      }      catch (std::exception&) { -        str << Endl << response; +        // just suppress any possible errors +    } +    return false; +} + +TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message) { + +    TIssues issues; + +    if (httpCode) { +        issues.AddIssue(TStringBuilder() << "HTTP Code: " << httpCode); +    } +    if (s3ErrorCode) { +        issues.AddIssue(TStringBuilder() << "Object Storage Code: " << s3ErrorCode << ", " << message); +    } else { +        issues.AddIssue(message);      } -    return str; +    return issues;  }  } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h index e808eea1fdb..e30332a8799 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h @@ -1,9 +1,12 @@  #pragma once +#include <ydb/library/yql/public/issue/yql_issue.h> +  #include <util/generic/string.h>  namespace NYql::NDq { -TString ParseS3ErrorResponse(long httpCode, const TString& response); +bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message); +TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message);  } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 51c63e42b60..e73156e98ed 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -267,7 +267,13 @@ private:              Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));              Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));          } else { -            Send(ComputeActorId, new TEvAsyncInputError(InputIndex, {TIssue(ParseS3ErrorResponse(httpCode, result->Get()->Result.Extract()))}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); +            TString errorText = result->Get()->Result.Extract(); +            TString errorCode; +            TString message; +            if (!ParseS3ErrorResponse(errorText, errorCode, message)) { +                message = errorText; +            } +            Send(ComputeActorId, new TEvAsyncInputError(InputIndex, BuildIssues(httpCode, errorCode, message), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));          }      } @@ -506,14 +512,19 @@ private:              // Finish reading. Add error from server to issues          } catch (const std::exception& err) {              exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what(); -            fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; +            fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;              RetryStuff->Cancel();          }          WaitFinish();          if (!ErrorText.empty()) { -            Issues.AddIssues({TIssue(ParseS3ErrorResponse(HttpResponseCode, ErrorText))}); +            TString errorCode; +            TString message; +            if (!ParseS3ErrorResponse(ErrorText, errorCode, message)) { +                message = ErrorText; +            } +            Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message));          }          if (exceptIssue.Message) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 1974a528cc8..f8e8e74b8ee 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -66,8 +66,41 @@ struct TEvPrivate {      };      struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> { -        explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {} -        const TIssues Error; + +        TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message) +        : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) { +            BuildIssues(); +        } + +        TEvUploadError(const TString& s3ErrorCode, const TString& message) +        : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) { +            BuildIssues(); +        } + +        TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) +        : StatusCode(statusCode), HttpCode(0), Message(message) { +            BuildIssues(); +        } + +        TEvUploadError(long httpCode, const TString& message) +        : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) { +            BuildIssues(); +        } + +        TEvUploadError(TIssues&& issues) +        : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) { +            // don't build +        } + +        void BuildIssues() { +            Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message); +        } + +        NYql::NDqProto::StatusIds::StatusCode StatusCode; +        long HttpCode; +        TString S3ErrorCode; +        TString Message; +        TIssues Issues;      };      struct TEvUploadStarted : public TEventLocal<TEvUploadStarted, EvUploadStarted> { @@ -175,23 +208,23 @@ private:              if (const auto& root = xml.Root(); root.Name() == "Error") {                  const auto& code = root.Node("Code", true).Value<TString>();                  const auto& message = root.Node("Message", true).Value<TString>(); -                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); +                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, message)));              } else if (root.Name() != "InitiateMultipartUploadResult") -                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")}))); +                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name())));              else {                  const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});                  actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));              }              break;          } catch (const std::exception& ex) { -            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")}))); +            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what())));              break;          }          case 1U:              actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));              break;          default: -            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")}))); +            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")));              break;          }      } @@ -203,7 +236,7 @@ private:              if (const NHttp::THeaders headers(str.substr(str.rfind("HTTP/"))); headers.Has("Etag"))                  actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));              else -                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)}))); +                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response: " << str)));              }              break;          case 1U: @@ -219,21 +252,21 @@ private:              if (const auto& root = xml.Root(); root.Name() == "Error") {                  const auto& code = root.Node("Code", true).Value<TString>();                  const auto& message = root.Node("Message", true).Value<TString>(); -                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); +                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, message)));              } else if (root.Name() != "CompleteMultipartUploadResult") -                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")}))); +                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name())));              else                  actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));              break;          } catch (const std::exception& ex) { -            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")}))); +            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what())));              break;          }          case 1U:              actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));              break;          default: -            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")}))); +            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")));              break;          }      } @@ -242,7 +275,14 @@ private:          switch (result.index()) {          case 0U:              if (auto content = std::get<IHTTPGateway::TContent>(std::move(result)); content.HttpResponseCode >= 300) { -                actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(ParseS3ErrorResponse(content.HttpResponseCode, content.Extract()))}))); +                TString errorText = content.Extract(); +                TString errorCode; +                TString message; +                if (ParseS3ErrorResponse(errorText, errorCode, message)) { +                    actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, errorCode, message))); +                } else { +                    actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, errorText))); +                }              } else {                  actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));              } @@ -251,7 +291,7 @@ private:              actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));              break;          default: -            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")}))); +            actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")));              break;          }      } @@ -421,8 +461,21 @@ private:      }      void Handle(TEvPrivate::TEvUploadError::TPtr& result) { -        LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Error.ToOneLineString()); -        Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); +        LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString()); + +        auto statusCode = result->Get()->StatusCode; +        if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { +             +            // add err code analysis here + +            if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") { +                statusCode = NYql::NDqProto::StatusIds::LIMIT_EXCEEDED; +            } else { +                statusCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; +            } +        } + +        Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);      }      void FinishIfNeeded() {  | 
