summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-09-14 12:52:28 +0300
committera-romanov <[email protected]>2022-09-14 12:52:28 +0300
commita1b046d8ad132a2052989ac30585ab2c5a0e7f6f (patch)
treeb1102cae8046fa7223ed1b5f565c04500dbe4af2
parentf2536f7a87466cc95830c84a398d89bd3bce6312 (diff)
fix.
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp32
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h9
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp45
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp24
5 files changed, 70 insertions, 41 deletions
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.txt
index fdeab19adc1..931067348e2 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.txt
@@ -37,6 +37,7 @@ target_link_libraries(providers-s3-actors PUBLIC
clickhouse_client_udf
)
target_sources(providers-s3-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
new file mode 100644
index 00000000000..24003b9ede2
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp
@@ -0,0 +1,32 @@
+#include "yql_s3_actors_util.h"
+
+#include <util/string/builder.h>
+
+#ifdef THROW
+#undef THROW
+#endif
+#include <library/cpp/xml/document/xml-document.h>
+
+namespace NYql::NDq {
+
+TString ParseS3ErrorResponse(long httpCode, const TString& response) {
+ TStringBuilder str;
+ str << "HTTP response code: " << httpCode ;
+ try {
+ const NXml::TDocument xml(response, NXml::TDocument::String);
+ if (const auto& root = xml.Root(); root.Name() == "Error") {
+ const auto& code = root.Node("Code", true).Value<TString>();
+ const auto& message = root.Node("Message", true).Value<TString>();
+ str << ", " << message << ", error code: " << code;
+ } else {
+ str << Endl << response;
+ }
+ }
+ catch (std::exception&) {
+ str << Endl << response;
+ }
+
+ return str;
+}
+
+}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
new file mode 100644
index 00000000000..e808eea1fdb
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+namespace NYql::NDq {
+
+TString ParseS3ErrorResponse(long httpCode, const TString& response);
+
+}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 2773bcfc881..06847584c7b 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -21,6 +21,7 @@
#endif
#include "yql_s3_read_actor.h"
+#include "yql_s3_actors_util.h"
#include <ydb/core/protos/services.pb.h>
@@ -173,7 +174,7 @@ public:
{}
void Bootstrap() {
- LOG_D("TS3ReadActor", "Bootstrapped, InputIndex: " << InputIndex);
+ LOG_D("TS3ReadActor", __func__ << ", InputIndex: " << InputIndex);
Become(&TS3ReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
@@ -242,13 +243,17 @@ private:
return total;
}
-
void Handle(TEvPrivate::TEvReadResult::TPtr& result) {
++IsDoneCounter;
- auto id = result->Get()->PathIndex;
- LOG_D("TS3ReadActor", "ID: " << id << ", TEvReadResult size: " << result->Get()->Result.size());
- Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));
- Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ const auto id = result->Get()->PathIndex;
+ const auto httpCode = result->Get()->Result.HttpResponseCode;
+ LOG_D("TS3ReadActor", "ID: " << id << ", TEvReadResult size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode);
+ if (200 == httpCode || 206 == httpCode) {
+ Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ } else {
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, {TIssue(ParseS3ErrorResponse(httpCode, result->Get()->Result.Extract()))}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ }
}
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
@@ -260,7 +265,7 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
- LOG_D("TS3ReadActor", "PassAway");
+ LOG_D("TS3ReadActor", __func__);
ContainerCache.Clear();
TActorBootstrapped<TS3ReadActor>::PassAway();
}
@@ -409,7 +414,7 @@ public:
}
private:
void WaitFinish() {
- LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish()");
+ LOG_CORO_D("TS3ReadCoroImpl", __func__);
if (InputFinished)
return;
@@ -438,7 +443,7 @@ private:
void Run() final try {
- LOG_CORO_D("TS3ReadCoroImpl", "Run, Path: " << Path);
+ LOG_CORO_D("TS3ReadCoroImpl", __func__ << ", Path: " << Path);
NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
@@ -460,23 +465,7 @@ private:
WaitFinish();
if (!ErrorText.empty()) {
- TStringBuilder str;
-
- if (HttpResponseCode)
- str << "HTTP response code: " << HttpResponseCode << ", ";
-
- if (ErrorText.StartsWith("<?xml") && !ErrorText.EndsWith(TruncatedSuffix)) {
- const NXml::TDocument xml(ErrorText, NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- str << message << ", error code: " << code;
- } else
- str << ErrorText;
- } else
- str << ErrorText;
-
- Issues.AddIssues({TIssue(str)});
+ Issues.AddIssues({TIssue(ParseS3ErrorResponse(HttpResponseCode, ErrorText))});
}
if (exceptIssue.Message) {
@@ -635,7 +624,7 @@ public:
{}
void Bootstrap() {
- LOG_D("TS3StreamReadActor", "Bootstrapped");
+ LOG_D("TS3StreamReadActor", __func__);
Become(&TS3StreamReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
@@ -703,7 +692,7 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
- LOG_D("TS3StreamReadActor", "PassAway");
+ LOG_D("TS3StreamReadActor", __func__);
for (auto stuff: RetryStuffForFile) {
stuff->Cancel();
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 841b44d6f19..90fbc2870a1 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -1,4 +1,5 @@
#include "yql_s3_write_actor.h"
+#include "yql_s3_actors_util.h"
#include <ydb/core/protos/services.pb.h>
@@ -105,7 +106,7 @@ public:
void Bootstrap(const TActorId& parentId) {
ParentId = parentId;
- LOG_D("TS3FileWriteActor", "Bootstrapped by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "]");
+ LOG_D("TS3FileWriteActor", __func__ << " by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "]");
if (Parts->IsSealed() && 1U == Parts->Size()) {
const auto size = Parts->Volume();
InFlight += size;
@@ -121,9 +122,9 @@ public:
void PassAway() override {
if (InFlight || !Parts->Empty()) {
- LOG_W("TS3FileWriteActor", "PassAway but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size());
+ LOG_W("TS3FileWriteActor", __func__ << " but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size());
} else {
- LOG_D("TS3FileWriteActor", "PassAway");
+ LOG_D("TS3FileWriteActor", __func__);
}
TActorBootstrapped<TS3FileWriteActor>::PassAway();
}
@@ -241,13 +242,10 @@ private:
static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U:
- {
- auto content = std::get<IHTTPGateway::TContent>(std::move(result));
- if (content.HttpResponseCode >= 300) {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "HTTP error code: " << content.HttpResponseCode)})));
- } else {
- actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
- }
+ if (auto content = std::get<IHTTPGateway::TContent>(std::move(result)); content.HttpResponseCode >= 300) {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(ParseS3ErrorResponse(content.HttpResponseCode, content.Extract()))})));
+ } else {
+ actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url)));
}
break;
case 1U:
@@ -359,7 +357,7 @@ public:
}
void Bootstrap() {
- LOG_D("TS3WriteActor", "Bootstrapped");
+ LOG_D("TS3WriteActor", __func__);
Become(&TS3WriteActor::StateFunc);
}
@@ -456,9 +454,9 @@ private:
FileWriteActors.clear();
if (fileWriterCount) {
- LOG_W("TS3WriteActor", "PassAway with " << fileWriterCount << " NOT finished FileWriter(s)");
+ LOG_W("TS3WriteActor", __func__ << " with " << fileWriterCount << " NOT finished FileWriter(s)");
} else {
- LOG_D("TS3WriteActor", "PassAway");
+ LOG_D("TS3WriteActor", __func__);
}
TActorBootstrapped<TS3WriteActor>::PassAway();