diff options
| author | a-romanov <[email protected]> | 2022-09-14 12:52:28 +0300 |
|---|---|---|
| committer | a-romanov <[email protected]> | 2022-09-14 12:52:28 +0300 |
| commit | a1b046d8ad132a2052989ac30585ab2c5a0e7f6f (patch) | |
| tree | b1102cae8046fa7223ed1b5f565c04500dbe4af2 | |
| parent | f2536f7a87466cc95830c84a398d89bd3bce6312 (diff) | |
fix.
5 files changed, 70 insertions, 41 deletions
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.txt index fdeab19adc1..931067348e2 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.txt @@ -37,6 +37,7 @@ target_link_libraries(providers-s3-actors PUBLIC clickhouse_client_udf ) target_sources(providers-s3-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp new file mode 100644 index 00000000000..24003b9ede2 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.cpp @@ -0,0 +1,32 @@ +#include "yql_s3_actors_util.h" + +#include <util/string/builder.h> + +#ifdef THROW +#undef THROW +#endif +#include <library/cpp/xml/document/xml-document.h> + +namespace NYql::NDq { + +TString ParseS3ErrorResponse(long httpCode, const TString& response) { + TStringBuilder str; + str << "HTTP response code: " << httpCode ; + try { + const NXml::TDocument xml(response, NXml::TDocument::String); + if (const auto& root = xml.Root(); root.Name() == "Error") { + const auto& code = root.Node("Code", true).Value<TString>(); + const auto& message = root.Node("Message", true).Value<TString>(); + str << ", " << message << ", error code: " << code; + } else { + str << Endl << response; + } + } + catch (std::exception&) { + str << Endl << response; + } + + return str; +} + +} diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h new file mode 100644 index 00000000000..e808eea1fdb --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_util.h @@ -0,0 +1,9 @@ +#pragma once + +#include <util/generic/string.h> + +namespace NYql::NDq { + +TString ParseS3ErrorResponse(long httpCode, const TString& response); + +} diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 2773bcfc881..06847584c7b 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -21,6 +21,7 @@ #endif #include "yql_s3_read_actor.h" +#include "yql_s3_actors_util.h" #include <ydb/core/protos/services.pb.h> @@ -173,7 +174,7 @@ public: {} void Bootstrap() { - LOG_D("TS3ReadActor", "Bootstrapped, InputIndex: " << InputIndex); + LOG_D("TS3ReadActor", __func__ << ", InputIndex: " << InputIndex); Become(&TS3ReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; @@ -242,13 +243,17 @@ private: return total; } - void Handle(TEvPrivate::TEvReadResult::TPtr& result) { ++IsDoneCounter; - auto id = result->Get()->PathIndex; - LOG_D("TS3ReadActor", "ID: " << id << ", TEvReadResult size: " << result->Get()->Result.size()); - Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id)); - Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + const auto id = result->Get()->PathIndex; + const auto httpCode = result->Get()->Result.HttpResponseCode; + LOG_D("TS3ReadActor", "ID: " << id << ", TEvReadResult size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode); + if (200 == httpCode || 206 == httpCode) { + Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id)); + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + } else { + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, {TIssue(ParseS3ErrorResponse(httpCode, result->Get()->Result.Extract()))}, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + } } void Handle(TEvPrivate::TEvReadError::TPtr& result) { @@ -260,7 +265,7 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor - LOG_D("TS3ReadActor", "PassAway"); + LOG_D("TS3ReadActor", __func__); ContainerCache.Clear(); TActorBootstrapped<TS3ReadActor>::PassAway(); } @@ -409,7 +414,7 @@ public: } private: void WaitFinish() { - LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish()"); + LOG_CORO_D("TS3ReadCoroImpl", __func__); if (InputFinished) return; @@ -438,7 +443,7 @@ private: void Run() final try { - LOG_CORO_D("TS3ReadCoroImpl", "Run, Path: " << Path); + LOG_CORO_D("TS3ReadCoroImpl", __func__ << ", Path: " << Path); NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; @@ -460,23 +465,7 @@ private: WaitFinish(); if (!ErrorText.empty()) { - TStringBuilder str; - - if (HttpResponseCode) - str << "HTTP response code: " << HttpResponseCode << ", "; - - if (ErrorText.StartsWith("<?xml") && !ErrorText.EndsWith(TruncatedSuffix)) { - const NXml::TDocument xml(ErrorText, NXml::TDocument::String); - if (const auto& root = xml.Root(); root.Name() == "Error") { - const auto& code = root.Node("Code", true).Value<TString>(); - const auto& message = root.Node("Message", true).Value<TString>(); - str << message << ", error code: " << code; - } else - str << ErrorText; - } else - str << ErrorText; - - Issues.AddIssues({TIssue(str)}); + Issues.AddIssues({TIssue(ParseS3ErrorResponse(HttpResponseCode, ErrorText))}); } if (exceptIssue.Message) { @@ -635,7 +624,7 @@ public: {} void Bootstrap() { - LOG_D("TS3StreamReadActor", "Bootstrapped"); + LOG_D("TS3StreamReadActor", __func__); Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; @@ -703,7 +692,7 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor - LOG_D("TS3StreamReadActor", "PassAway"); + LOG_D("TS3StreamReadActor", __func__); for (auto stuff: RetryStuffForFile) { stuff->Cancel(); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 841b44d6f19..90fbc2870a1 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -1,4 +1,5 @@ #include "yql_s3_write_actor.h" +#include "yql_s3_actors_util.h" #include <ydb/core/protos/services.pb.h> @@ -105,7 +106,7 @@ public: void Bootstrap(const TActorId& parentId) { ParentId = parentId; - LOG_D("TS3FileWriteActor", "Bootstrapped by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "]"); + LOG_D("TS3FileWriteActor", __func__ << " by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "]"); if (Parts->IsSealed() && 1U == Parts->Size()) { const auto size = Parts->Volume(); InFlight += size; @@ -121,9 +122,9 @@ public: void PassAway() override { if (InFlight || !Parts->Empty()) { - LOG_W("TS3FileWriteActor", "PassAway but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size()); + LOG_W("TS3FileWriteActor", __func__ << " but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size()); } else { - LOG_D("TS3FileWriteActor", "PassAway"); + LOG_D("TS3FileWriteActor", __func__); } TActorBootstrapped<TS3FileWriteActor>::PassAway(); } @@ -241,13 +242,10 @@ private: static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: - { - auto content = std::get<IHTTPGateway::TContent>(std::move(result)); - if (content.HttpResponseCode >= 300) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "HTTP error code: " << content.HttpResponseCode)}))); - } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url))); - } + if (auto content = std::get<IHTTPGateway::TContent>(std::move(result)); content.HttpResponseCode >= 300) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError({TIssue(ParseS3ErrorResponse(content.HttpResponseCode, content.Extract()))}))); + } else { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url))); } break; case 1U: @@ -359,7 +357,7 @@ public: } void Bootstrap() { - LOG_D("TS3WriteActor", "Bootstrapped"); + LOG_D("TS3WriteActor", __func__); Become(&TS3WriteActor::StateFunc); } @@ -456,9 +454,9 @@ private: FileWriteActors.clear(); if (fileWriterCount) { - LOG_W("TS3WriteActor", "PassAway with " << fileWriterCount << " NOT finished FileWriter(s)"); + LOG_W("TS3WriteActor", __func__ << " with " << fileWriterCount << " NOT finished FileWriter(s)"); } else { - LOG_D("TS3WriteActor", "PassAway"); + LOG_D("TS3WriteActor", __func__); } TActorBootstrapped<TS3WriteActor>::PassAway(); |
