diff options
author | hcpp <hcpp@ydb.tech> | 2022-11-07 10:40:43 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-11-07 10:40:43 +0300 |
commit | 13cb76c7f5cc61057081f7c6c47721a33109a147 (patch) | |
tree | a185c62c5aeaf408807cb5ca3914669bd592a532 | |
parent | a8f6baed8b40e0ca1eda9692d9b3eeed727a3098 (diff) | |
download | ydb-13cb76c7f5cc61057081f7c6c47721a33109a147.tar.gz |
path has been added to s3 errors
3 files changed, 42 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp index 862821ae997..01fb247ec34 100644 --- a/ydb/library/yql/providers/dq/actors/grouped_issues.cpp +++ b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp @@ -16,6 +16,9 @@ NYql::TIssues NYql::NDq::GroupedIssues::ToIssues() { for (auto& [issue, meta]: issueVector) { auto modified_issue_message = issue.GetMessage() + " " + meta.InfoString(); auto modified_issue = NYql::TIssue(issue.Position, issue.EndPosition, modified_issue_message); + for (const auto& subIssue: issue.GetSubIssues()) { + modified_issue.AddSubIssue(subIssue); + } issues.AddIssue(modified_issue); } return issues; diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp index dc148c5e100..ebf59a1cbc6 100644 --- a/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp +++ b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp @@ -79,5 +79,19 @@ Y_UNIT_TEST_SUITE(TestIssuesGrouping) { } UNIT_ASSERT_C(!holder.Issues.contains(eldery), "the oldest issue is not removed"); } + + Y_UNIT_TEST(ShouldSaveSubIssues) { + TIntrusivePtr<AgileTimeProvider> timeProvider = CreateAgileTimeProvider(1); + NDq::GroupedIssues holder(timeProvider); + holder.IssueExpiration = TDuration::Seconds(5); + TIssue issue("a"); + issue.AddSubIssue(MakeIntrusive<TIssue>("sub_issue")); + holder.AddIssue(issue); + auto groupedIssues = holder.ToIssues(); + UNIT_ASSERT_EQUAL(groupedIssues.Size(), 1); + UNIT_ASSERT_EQUAL(groupedIssues.back().GetSubIssues().size(), 1); + const auto& subIssue = *groupedIssues.back().GetSubIssues().back(); + UNIT_ASSERT_STRING_CONTAINS(subIssue.ToString(true), "sub_issue"); + } } } // namespace NYq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 1da4a3809c1..08e1a3e40e3 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -190,6 +190,17 @@ struct TEvPrivate { }; +TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { + if (!issues) { + return TIssues{}; + } + TIssue result(prefix); + for (auto& issue: issues) { + result.AddSubIssue(MakeIntrusive<TIssue>(issue)); + } + return TIssues{result}; +} + using namespace NKikimr::NMiniKQL; class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput { @@ -294,8 +305,9 @@ private: void Handle(TEvPrivate::TEvReadResult::TPtr& result) { ++IsDoneCounter; const auto id = result->Get()->PathIndex; + const auto path = std::get<TString>(Paths[id - StartPathIndex]); const auto httpCode = result->Get()->Result.HttpResponseCode; - LOG_D("TS3ReadActor", "ID: " << id << ", TEvReadResult size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode); + LOG_D("TS3ReadActor", "ID: " << id << ", Path: " << path << ", read size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode); if (200 == httpCode || 206 == httpCode) { Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id)); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); @@ -306,6 +318,7 @@ private: if (!ParseS3ErrorResponse(errorText, errorCode, message)) { message = errorText; } + message = TStringBuilder{} << "Error while reading file " << path << ", details: " << message; Send(ComputeActorId, new TEvAsyncInputError(InputIndex, BuildIssues(httpCode, errorCode, message), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } } @@ -313,8 +326,10 @@ private: void Handle(TEvPrivate::TEvReadError::TPtr& result) { ++IsDoneCounter; auto id = result->Get()->PathIndex; - LOG_W("TS3ReadActor", "ID: " << id << ", TEvReadError: " << result->Get()->Error.ToOneLineString()); - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + const auto path = std::get<TString>(Paths[id - StartPathIndex]); + LOG_W("TS3ReadActor", "Error while reading file " << path << ", details: ID: " << id << ", TEvReadError: " << result->Get()->Error.ToOneLineString()); + auto issues = AddParentIssue(TStringBuilder{} << "Error while reading file " << path, TIssues{result->Get()->Error}); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } // IActor & IDqComputeActorAsyncInput @@ -579,7 +594,7 @@ private: switch (etype) { case TEvPrivate::TEvReadFinished::EventType: Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Issues.ToOneLineString()); + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Path << " " << Issues.ToOneLineString()); break; default: continue; @@ -638,7 +653,7 @@ private: } catch (const TDtorException&) { throw; } catch (const std::exception& err) { - exceptIssue.SetMessage(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what()); + exceptIssue.SetMessage(err.what()); fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; RetryStuff->Cancel(); } @@ -660,8 +675,9 @@ private: Issues.AddIssue(exceptIssue); } - if (Issues) - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(Issues), fatalCode)); + auto issues = AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues)); + if (issues) + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); else Send(ParentActorId, new TEvPrivate::TEvReadFinished); } catch (const TDtorException&) { @@ -708,7 +724,8 @@ private: void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { TStringBuilder message; - message << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite()); + message << "Error while reading file " << Path << ", details: " + << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite()); if (auto* eventBase = ev->GetBase()) { message << " (" << eventBase->ToStringHeader() << ")"; } @@ -859,7 +876,6 @@ public: LOG_D("TS3StreamReadActor", "Bootstrap"); Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { - const TPath& path = Paths[pathInd]; auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy); RetryStuffForFile.push_back(stuff); |