aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-09-20 13:38:27 +0300
committerhor911 <hor911@ydb.tech>2022-09-20 13:38:27 +0300
commit82d81702d4bd8e8e781c5f358c50e32013459779 (patch)
treedc69637341faa3ec27016d63bfe0a931a6a44595
parentc13f67fff632dd2ad5759df843bdb7f65676478d (diff)
downloadydb-82d81702d4bd8e8e781c5f358c50e32013459779.tar.gz
Err specification in s3 read/write
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp32
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp17
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp83
4 files changed, 108 insertions, 29 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
index 24003b9ede2..6fb2891f3f7 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
@@ -9,24 +9,36 @@
namespace NYql::NDq {
-TString ParseS3ErrorResponse(long httpCode, const TString& response) {
- TStringBuilder str;
- str << "HTTP response code: " << httpCode ;
+bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message) {
try {
const NXml::TDocument xml(response, NXml::TDocument::String);
if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- str << ", " << message << ", error code: " << code;
- } else {
- str << Endl << response;
+ auto tmpMessage = root.Node("Message", true).Value<TString>();
+ errorCode = root.Node("Code", true).Value<TString>();
+ message = tmpMessage;
+ return true;
}
}
catch (std::exception&) {
- str << Endl << response;
+ // just suppress any possible errors
+ }
+ return false;
+}
+
+TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message) {
+
+ TIssues issues;
+
+ if (httpCode) {
+ issues.AddIssue(TStringBuilder() << "HTTP Code: " << httpCode);
+ }
+ if (s3ErrorCode) {
+ issues.AddIssue(TStringBuilder() << "Object Storage Code: " << s3ErrorCode << ", " << message);
+ } else {
+ issues.AddIssue(message);
}
- return str;
+ return issues;
}
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
index e808eea1fdb..e30332a8799 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
@@ -1,9 +1,12 @@
#pragma once
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
#include <util/generic/string.h>
namespace NYql::NDq {
-TString ParseS3ErrorResponse(long httpCode, const TString& response);
+bool ParseS3ErrorResponse(const TString& response, TString& errorCode, TString& message);
+TIssues BuildIssues(long httpCode, const TString& s3ErrorCode, const TString& message);
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 51c63e42b60..e73156e98ed 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -267,7 +267,13 @@ private:
Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
} else {
- Send(ComputeActorId, new TEvAsyncInputError(InputIndex, {TIssue(ParseS3ErrorResponse(httpCode, result->Get()->Result.Extract()))}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ TString errorText = result->Get()->Result.Extract();
+ TString errorCode;
+ TString message;
+ if (!ParseS3ErrorResponse(errorText, errorCode, message)) {
+ message = errorText;
+ }
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, BuildIssues(httpCode, errorCode, message), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}
}
@@ -506,14 +512,19 @@ private:
// Finish reading. Add error from server to issues
} catch (const std::exception& err) {
exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what();
- fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+ fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
RetryStuff->Cancel();
}
WaitFinish();
if (!ErrorText.empty()) {
- Issues.AddIssues({TIssue(ParseS3ErrorResponse(HttpResponseCode, ErrorText))});
+ TString errorCode;
+ TString message;
+ if (!ParseS3ErrorResponse(ErrorText, errorCode, message)) {
+ message = ErrorText;
+ }
+ Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message));
}
if (exceptIssue.Message) {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 1974a528cc8..f8e8e74b8ee 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -66,8 +66,41 @@ struct TEvPrivate {
};
struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> {
- explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {}
- const TIssues Error;
+
+ TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message)
+ : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) {
+ BuildIssues();
+ }
+
+ TEvUploadError(const TString& s3ErrorCode, const TString& message)
+ : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) {
+ BuildIssues();
+ }
+
+ TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message)
+ : StatusCode(statusCode), HttpCode(0), Message(message) {
+ BuildIssues();
+ }
+
+ TEvUploadError(long httpCode, const TString& message)
+ : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) {
+ BuildIssues();
+ }
+
+ TEvUploadError(TIssues&& issues)
+ : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) {
+ // don't build
+ }
+
+ void BuildIssues() {
+ Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message);
+ }
+
+ NYql::NDqProto::StatusIds::StatusCode StatusCode;
+ long HttpCode;
+ TString S3ErrorCode;
+ TString Message;
+ TIssues Issues;
};
struct TEvUploadStarted : public TEventLocal<TEvUploadStarted, EvUploadStarted> {
@@ -175,23 +208,23 @@ private:
if (const auto& root = xml.Root(); root.Name() == "Error") {
const auto& code = root.Node("Code", true).Value<TString>();
const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, message)));
} else if (root.Name() != "InitiateMultipartUploadResult")
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name())));
else {
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
}
break;
} catch (const std::exception& ex) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what())));
break;
}
case 1U:
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
break;
default:
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")));
break;
}
}
@@ -203,7 +236,7 @@ private:
if (const NHttp::THeaders headers(str.substr(str.rfind("HTTP/"))); headers.Has("Etag"))
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));
else
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response: " << str)));
}
break;
case 1U:
@@ -219,21 +252,21 @@ private:
if (const auto& root = xml.Root(); root.Name() == "Error") {
const auto& code = root.Node("Code", true).Value<TString>();
const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(code, message)));
} else if (root.Name() != "CompleteMultipartUploadResult")
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name())));
else
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
break;
} catch (const std::exception& ex) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what())));
break;
}
case 1U:
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
break;
default:
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")));
break;
}
}
@@ -242,7 +275,14 @@ private:
switch (result.index()) {
case 0U:
if (auto content = std::get<IHTTPGateway::TContent>(std::move(result)); content.HttpResponseCode >= 300) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(ParseS3ErrorResponse(content.HttpResponseCode, content.Extract()))})));
+ TString errorText = content.Extract();
+ TString errorCode;
+ TString message;
+ if (ParseS3ErrorResponse(errorText, errorCode, message)) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, errorCode, message)));
+ } else {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, errorText)));
+ }
} else {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
}
@@ -251,7 +291,7 @@ private:
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
break;
default:
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected variant index " << result.index() << " on finish upload response.")));
break;
}
}
@@ -421,8 +461,21 @@ private:
}
void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
- LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Error.ToOneLineString());
- Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR);
+ LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString());
+
+ auto statusCode = result->Get()->StatusCode;
+ if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
+
+ // add err code analysis here
+
+ if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") {
+ statusCode = NYql::NDqProto::StatusIds::LIMIT_EXCEEDED;
+ } else {
+ statusCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
+ }
+ }
+
+ Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);
}
void FinishIfNeeded() {