aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan Sukhov <evanevannnn@ydb.tech>2024-10-03 11:38:32 +0300
committerGitHub <noreply@github.com>2024-10-03 11:38:32 +0300
commit7c5cb13dfb3648a300f3f9facf810b15eef4bda1 (patch)
treeccd7834910665b6bdae8a35fd58baf80b7ef1a39
parent6e20d541b0fb351fcdd0b7f8d01076df39b40d38 (diff)
downloadydb-7c5cb13dfb3648a300f3f9facf810b15eef4bda1.tar.gz
replaced yexception with TCodeLineException in s3_read_actor (#9571)
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp37
1 files changed, 21 insertions, 16 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 175ebb6726..51933128d4 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -126,11 +126,11 @@
LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
<< " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
-#define THROW_ARROW_NOT_OK(status) \
+#define THROW_ARROW_NOT_OK(code, status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
- throw yexception() << _s.ToString(); \
+ ythrow TCodeLineException(code) << _s.ToString(); \
} while (false)
namespace NYql::NDq {
@@ -575,7 +575,7 @@ public:
void HandleEvent(TEvS3Provider::TEvReadResult2::THandle& event) {
if (event.Get()->Failure) {
- throw yexception() << event.Get()->Issues.ToOneLineString();
+ ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << event.Get()->Issues.ToOneLineString();
}
auto readyRange = event.Get()->ReadRange;
LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << event.Cookie);
@@ -773,9 +773,7 @@ public:
if (StopIfConsumedEnough(numRows)) {
isCancelled = true;
}
- if (!status.ok()) {
- throw yexception() << status.ToString();
- }
+ ThrowParquetNotOk(status);
SourceContext->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
if (RawInflightSize) {
RawInflightSize->Sub(downloadedBytes);
@@ -862,9 +860,7 @@ public:
if (StopIfConsumedEnough(numRows)) {
isCancelled = true;
}
- if (!status.ok()) {
- throw yexception() << status.ToString();
- }
+ ThrowParquetNotOk(status);
SourceContext->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
if (isCancelled) {
LOG_CORO_D("RunCoroBlockArrowParserOverFile - STOPPED ON SATURATION");
@@ -1177,6 +1173,11 @@ private:
// Stop any activity instantly
RetryStuff->Cancel();
return;
+ } catch (const TCodeLineException& err) {
+ LOG_CORO_E(err.what());
+ Issues.AddIssue(err.GetRawMessage());
+ FatalCode = static_cast<NYql::NDqProto::StatusIds::StatusCode>(err.Code);
+ RetryStuff->Cancel();
} catch (const std::exception& err) {
Issues.AddIssue(TIssue(err.what()));
FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
@@ -2029,11 +2030,11 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type, NSerialization::TSerializat
return std::make_shared<const NDB::DataTypeDecimal<NDB::Decimal128>>(precision, scale);
}
default:
- throw yexception() << "Unsupported data slot in MetaToClickHouse: " << slot;
+ ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << "Unsupported data slot in MetaToClickHouse: " << slot;
}
}
default:
- throw yexception() << "Unsupported type kind in MetaToClickHouse: " << type->GetKindAsStr();
+ ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR) << "Unsupported type kind in MetaToClickHouse: " << type->GetKindAsStr();
}
return nullptr;
}
@@ -2104,17 +2105,18 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (hasDirectories) {
auto pathPatternValue = settings.find("pathpattern");
if (pathPatternValue == settings.cend()) {
- ythrow yexception() << "'pathpattern' must be configured for directory listing";
+ ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR)
+ << "'pathpattern' must be configured for directory listing";
}
pathPattern = pathPatternValue->second;
auto pathPatternVariantValue = settings.find("pathpatternvariant");
if (pathPatternVariantValue == settings.cend()) {
- ythrow yexception()
+ ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR)
<< "'pathpatternvariant' must be configured for directory listing";
}
if (!TryFromString(pathPatternVariantValue->second, pathPatternVariant)) {
- ythrow yexception()
+ ythrow TCodeLineException(NYql::NDqProto::StatusIds::INTERNAL_ERROR)
<< "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second;
}
}
@@ -2197,13 +2199,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::shared_ptr<arrow::DataType> dataType;
YQL_ENSURE(ConvertArrowType(memberType, dataType), "Unsupported arrow type");
- THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional())));
+ THROW_ARROW_NOT_OK(
+ NYql::NDqProto::StatusIds::INTERNAL_ERROR,
+ builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional()))
+ );
readSpec->ColumnReorder.push_back(i);
readSpec->RowSpec.emplace(memberName, memberType);
}
auto res = builder.Finish();
- THROW_ARROW_NOT_OK(res.status());
+ THROW_ARROW_NOT_OK(NYql::NDqProto::StatusIds::INTERNAL_ERROR, res.status());
readSpec->ArrowSchema = std::move(res).ValueOrDie();
} else {
readSpec->CHColumns.resize(structType->GetMembersCount());