aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2022-11-07 10:40:43 +0300
committerhcpp <hcpp@ydb.tech>2022-11-07 10:40:43 +0300
commit13cb76c7f5cc61057081f7c6c47721a33109a147 (patch)
treea185c62c5aeaf408807cb5ca3914669bd592a532
parenta8f6baed8b40e0ca1eda9692d9b3eeed727a3098 (diff)
downloadydb-13cb76c7f5cc61057081f7c6c47721a33109a147.tar.gz
path has been added to s3 errors
-rw-r--r--ydb/library/yql/providers/dq/actors/grouped_issues.cpp3
-rw-r--r--ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp14
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp34
3 files changed, 42 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp
index 862821ae997..01fb247ec34 100644
--- a/ydb/library/yql/providers/dq/actors/grouped_issues.cpp
+++ b/ydb/library/yql/providers/dq/actors/grouped_issues.cpp
@@ -16,6 +16,9 @@ NYql::TIssues NYql::NDq::GroupedIssues::ToIssues() {
for (auto& [issue, meta]: issueVector) {
auto modified_issue_message = issue.GetMessage() + " " + meta.InfoString();
auto modified_issue = NYql::TIssue(issue.Position, issue.EndPosition, modified_issue_message);
+ for (const auto& subIssue: issue.GetSubIssues()) {
+ modified_issue.AddSubIssue(subIssue);
+ }
issues.AddIssue(modified_issue);
}
return issues;
diff --git a/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
index dc148c5e100..ebf59a1cbc6 100644
--- a/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
+++ b/ydb/library/yql/providers/dq/actors/grouped_issues_ut.cpp
@@ -79,5 +79,19 @@ Y_UNIT_TEST_SUITE(TestIssuesGrouping) {
}
UNIT_ASSERT_C(!holder.Issues.contains(eldery), "the oldest issue is not removed");
}
+
+ Y_UNIT_TEST(ShouldSaveSubIssues) {
+ TIntrusivePtr<AgileTimeProvider> timeProvider = CreateAgileTimeProvider(1);
+ NDq::GroupedIssues holder(timeProvider);
+ holder.IssueExpiration = TDuration::Seconds(5);
+ TIssue issue("a");
+ issue.AddSubIssue(MakeIntrusive<TIssue>("sub_issue"));
+ holder.AddIssue(issue);
+ auto groupedIssues = holder.ToIssues();
+ UNIT_ASSERT_EQUAL(groupedIssues.Size(), 1);
+ UNIT_ASSERT_EQUAL(groupedIssues.back().GetSubIssues().size(), 1);
+ const auto& subIssue = *groupedIssues.back().GetSubIssues().back();
+ UNIT_ASSERT_STRING_CONTAINS(subIssue.ToString(true), "sub_issue");
+ }
}
} // namespace NYq
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 1da4a3809c1..08e1a3e40e3 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -190,6 +190,17 @@ struct TEvPrivate {
};
+TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
+ if (!issues) {
+ return TIssues{};
+ }
+ TIssue result(prefix);
+ for (auto& issue: issues) {
+ result.AddSubIssue(MakeIntrusive<TIssue>(issue));
+ }
+ return TIssues{result};
+}
+
using namespace NKikimr::NMiniKQL;
class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput {
@@ -294,8 +305,9 @@ private:
void Handle(TEvPrivate::TEvReadResult::TPtr& result) {
++IsDoneCounter;
const auto id = result->Get()->PathIndex;
+ const auto path = std::get<TString>(Paths[id - StartPathIndex]);
const auto httpCode = result->Get()->Result.HttpResponseCode;
- LOG_D("TS3ReadActor", "ID: " << id << ", TEvReadResult size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode);
+ LOG_D("TS3ReadActor", "ID: " << id << ", Path: " << path << ", read size: " << result->Get()->Result.size() << ", HTTP response code: " << httpCode);
if (200 == httpCode || 206 == httpCode) {
Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
@@ -306,6 +318,7 @@ private:
if (!ParseS3ErrorResponse(errorText, errorCode, message)) {
message = errorText;
}
+ message = TStringBuilder{} << "Error while reading file " << path << ", details: " << message;
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, BuildIssues(httpCode, errorCode, message), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}
}
@@ -313,8 +326,10 @@ private:
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
++IsDoneCounter;
auto id = result->Get()->PathIndex;
- LOG_W("TS3ReadActor", "ID: " << id << ", TEvReadError: " << result->Get()->Error.ToOneLineString());
- Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ const auto path = std::get<TString>(Paths[id - StartPathIndex]);
+ LOG_W("TS3ReadActor", "Error while reading file " << path << ", details: ID: " << id << ", TEvReadError: " << result->Get()->Error.ToOneLineString());
+ auto issues = AddParentIssue(TStringBuilder{} << "Error while reading file " << path, TIssues{result->Get()->Error});
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}
// IActor & IDqComputeActorAsyncInput
@@ -579,7 +594,7 @@ private:
switch (etype) {
case TEvPrivate::TEvReadFinished::EventType:
Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Issues.ToOneLineString());
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Path << " " << Issues.ToOneLineString());
break;
default:
continue;
@@ -638,7 +653,7 @@ private:
} catch (const TDtorException&) {
throw;
} catch (const std::exception& err) {
- exceptIssue.SetMessage(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what());
+ exceptIssue.SetMessage(err.what());
fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
RetryStuff->Cancel();
}
@@ -660,8 +675,9 @@ private:
Issues.AddIssue(exceptIssue);
}
- if (Issues)
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(Issues), fatalCode));
+ auto issues = AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
+ if (issues)
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
else
Send(ParentActorId, new TEvPrivate::TEvReadFinished);
} catch (const TDtorException&) {
@@ -708,7 +724,8 @@ private:
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final {
TStringBuilder message;
- message << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite());
+ message << "Error while reading file " << Path << ", details: "
+ << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite());
if (auto* eventBase = ev->GetBase()) {
message << " (" << eventBase->ToStringHeader() << ")";
}
@@ -859,7 +876,6 @@ public:
LOG_D("TS3StreamReadActor", "Bootstrap");
Become(&TS3StreamReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
-
const TPath& path = Paths[pathInd];
auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy);
RetryStuffForFile.push_back(stuff);