aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-08-31 12:16:03 +0300
committerbbiff <bbiff@yandex-team.com>2022-08-31 12:16:03 +0300
commitfc719135cdcb2b6ffa82987c27cf2371858107c5 (patch)
tree7833fdbf015786480eabdb563fb00c81bd16a699
parent230709925f39039fc3437757c416856e4b018ab1 (diff)
downloadydb-fc719135cdcb2b6ffa82987c27cf2371858107c5.tar.gz
added error handling
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp4
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp76
2 files changed, 53 insertions, 27 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index e31a8d96866..6e230f85919 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -113,8 +113,8 @@ public:
byteRange << Offset + expectedSize - 1;
}
curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
- curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
- curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this));
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
if (method == EMethod::POST) {
curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, bodySize);
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 641a7c4f6b5..4db191fa854 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -137,24 +137,46 @@ private:
hFunc(TEvPrivate::TEvUploadPartFinished, Handle);
)
+ static std::optional<NXml::TConstNode> TryParseXmlResponse(const TString& response, TActorId parentId, TActorId selfId, TActorSystem* actorSystem, const TString& errorAvantMessage = "") {
+ try {
+ const NXml::TDocument xml(response, NXml::TDocument::String);
+ const auto& root = xml.Root();
+ if (root.Name() == "Error") {
+ auto errorCode = root.Node("Code", true).Value<TString>();
+ auto errorText = root.Node("Message", true).Value<TString>();
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << errorAvantMessage << "Error code: '" << errorCode << ". Description: " << errorText)})));
+ }
+ return root;
+ } catch(const std::exception& ex) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << errorAvantMessage << "Error while parsing xml: '" << ex.what())})));
+ }
+ return std::nullopt;
+ }
+
+ static std::optional<NXml::TConstNode> TryParseXmlResponse(IHTTPGateway::TResult response, TActorId parentId, TActorId selfId, TActorSystem* actorSystem, const TString& errorAvantMessage = "") {
+ return TryParseXmlResponse(std::get<IHTTPGateway::TContent>(response).Extract(), parentId, selfId, actorSystem, errorAvantMessage);
+ }
+
static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, IHTTPGateway::TResult&& result) {
switch (result.index()) {
- case 0U: try {
- const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
- } else if (root.Name() != "InitiateMultipartUploadResult")
+ case 0U: {
+ auto maybeRoot = TryParseXmlResponse(std::move(result), parentId, selfId, actorSystem, "create upload error: ");
+ if (!maybeRoot || maybeRoot->Name() == "Error") {
+ break;
+ }
+ auto root = *maybeRoot;
+ if (root.Name() != "InitiateMultipartUploadResult")
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")})));
else {
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
- actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
+ try {
+ actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
+ } catch (const std::exception& ex) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")})));
+ break;
+ }
}
break;
- } catch (const std::exception& ex) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")})));
- break;
}
case 1U:
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
@@ -176,7 +198,11 @@ private:
break;
}
}
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)})));
+
+ if (!TryParseXmlResponse(str, parentId, selfId, actorSystem, "upload part error: ")) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response: " << str)})));
+ }
+
break;
}
case 1U:
@@ -187,19 +213,18 @@ private:
static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) {
switch (result.index()) {
- case 0U: try {
- const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
- } else if (root.Name() != "CompleteMultipartUploadResult")
+ case 0U: {
+
+ auto maybeRoot = TryParseXmlResponse(std::move(result), parentId, selfId, actorSystem, "upload error: ");
+ if (!maybeRoot || maybeRoot->Name() == "Error") {
+ break;
+ }
+ auto root = *maybeRoot;
+ if (root.Name() != "CompleteMultipartUploadResult") {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")})));
- else
+ } else {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
- break;
- } catch (const std::exception& ex) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")})));
+ }
break;
}
case 1U:
@@ -217,7 +242,9 @@ private:
{
auto content = std::get<IHTTPGateway::TContent>(std::move(result));
if (content.HttpResponseCode >= 300) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "HTTP error code: " << content.HttpResponseCode)})));
+ if (!TryParseXmlResponse(content.Extract(), parentId, selfId, actorSystem, "upload error: ")) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "HTTP error code: " << content.HttpResponseCode << ". Response: " << content.data())})));
+ }
} else {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
}
@@ -375,7 +402,6 @@ private:
ins.first->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}
-
ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef()));
}