aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <yumkam7@ydb.tech>2025-04-15 18:15:21 +0300
committerGitHub <noreply@github.com>2025-04-15 18:15:21 +0300
commitecc1044834b45587d2a7fea2ddfa265aa9671eb2 (patch)
tree9fc056cb2a3fc05a2c291a1f3fec33cd6d1140e3
parent0dac9309ca7605774ab3373660d73cd00578c59a (diff)
downloadydb-ecc1044834b45587d2a7fea2ddfa265aa9671eb2.tar.gz
fix inferring error handling and more tests (#15581)
-rw-r--r--ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp2
-rw-r--r--ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp28
-rw-r--r--ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp122
3 files changed, 141 insertions, 11 deletions
diff --git a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp
index e642007bcd4..2684e2bebc8 100644
--- a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp
+++ b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp
@@ -217,7 +217,7 @@ private:
decompressedData << decompressedChunk;
}
return std::move(decompressedData);
- } catch (const yexception& error) {
+ } catch (const std::exception& error) {
auto errorEv = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
diff --git a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp
index ab162b61552..dadee457221 100644
--- a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp
+++ b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp
@@ -169,8 +169,12 @@ using ArrowFields = std::vector<ArrowField>;
std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::RandomAccessFile> file, std::shared_ptr<CsvConfig> config) {
int64_t fileSize;
+ constexpr auto errorHeader = "couldn't open csv/tsv file, check format and compression parameters: "sv;
if (auto sizeStatus = file->GetSize().Value(&fileSize); !sizeStatus.ok()) {
- return TStringBuilder{} << "coudn't get file size: " << sizeStatus.ToString();
+ return TStringBuilder{} << errorHeader << "coudn't get file size: " << sizeStatus.ToString();
+ }
+ if (fileSize <= 0 || fileSize > INT32_MAX) {
+ return TStringBuilder{} << errorHeader << "empty file";
}
std::shared_ptr<arrow::csv::TableReader> reader;
@@ -184,48 +188,52 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
.Value(&reader);
if (!readerStatus.ok()) {
- return TString{TStringBuilder{} << "couldn't open csv/tsv file, check format and compression parameters: " << readerStatus.ToString()};
+ return TString{TStringBuilder{} << errorHeader << readerStatus.ToString()};
}
std::shared_ptr<arrow::Table> table;
auto tableRes = reader->Read().Value(&table);
if (!tableRes.ok()) {
- return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression parameters: " << tableRes.ToString();
+ return TStringBuilder{} << errorHeader << tableRes.ToString();
}
return table->fields();
}
std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::RandomAccessFile> file) {
+ constexpr auto errorHeader = "couldn't open parquet file, check format parameters: "sv;
parquet::arrow::FileReaderBuilder builder;
builder.properties(parquet::ArrowReaderProperties(false));
auto openStatus = builder.Open(std::move(file));
if (!openStatus.ok()) {
- return TStringBuilder{} << "couldn't open parquet file, check format parameters: " << openStatus.ToString();
+ return TStringBuilder{} << errorHeader << openStatus.ToString();
}
std::unique_ptr<parquet::arrow::FileReader> reader;
auto readerStatus = builder.Build(&reader);
if (!readerStatus.ok()) {
- return TStringBuilder{} << "couldn't read parquet file, check format parameters: " << readerStatus.ToString();
+ return TStringBuilder{} << errorHeader << readerStatus.ToString();
}
std::shared_ptr<arrow::Schema> schema;
auto schemaRes = reader->GetSchema(&schema);
if (!schemaRes.ok()) {
- return TStringBuilder{} << "couldn't parse parquet file, check format parameters: " << schemaRes.ToString();
+ return TStringBuilder{} << errorHeader << schemaRes.ToString();
}
return schema->fields();
}
std::variant<ArrowFields, TString> InferJsonTypes(std::shared_ptr<arrow::io::RandomAccessFile> file, std::shared_ptr<JsonConfig> config) {
+ constexpr auto errorHeader = "couldn't open json file, check format and compression parameters: "sv;
int64_t fileSize;
if (auto sizeStatus = file->GetSize().Value(&fileSize); !sizeStatus.ok()) {
- return TStringBuilder{} << "coudn't get file size: " << sizeStatus.ToString();
+ return TStringBuilder{} << errorHeader << "coudn't get file size: " << sizeStatus.ToString();
+ }
+ if (fileSize <= 0 || fileSize > INT32_MAX) {
+ return TStringBuilder{} << errorHeader << "empty file";
}
-
std::shared_ptr<arrow::json::TableReader> reader;
auto readerStatus = arrow::json::TableReader::Make(
arrow::default_memory_pool(),
@@ -235,14 +243,14 @@ std::variant<ArrowFields, TString> InferJsonTypes(std::shared_ptr<arrow::io::Ran
).Value(&reader);
if (!readerStatus.ok()) {
- return TString{TStringBuilder{} << "couldn't open json file, check format and compression parameters: " << readerStatus.ToString()};
+ return TString{TStringBuilder{} << errorHeader << readerStatus.ToString()};
}
std::shared_ptr<arrow::Table> table;
auto tableRes = reader->Read().Value(&table);
if (!tableRes.ok()) {
- return TString{TStringBuilder{} << "couldn't parse json file, check format and compression parameters: " << tableRes.ToString()};
+ return TString{TStringBuilder{} << errorHeader << tableRes.ToString()};
}
return table->fields();
diff --git a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp
index a196d3c26cb..67749470585 100644
--- a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp
+++ b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp
@@ -93,6 +93,7 @@ TEST_F(ArrowInferenceTest, csv_simple) {
auto response = event->CastAsLocal<TEvInferredFileSchema>();
ASSERT_NE(response, nullptr);
+ ASSERT_TRUE(response->Status.IsSuccess());
auto& fields = response->Fields;
ASSERT_TRUE(fields[0].type().optional_type().item().has_type_id());
ASSERT_EQ(fields[0].type().optional_type().item().type_id(), Ydb::Type::INT64);
@@ -143,4 +144,125 @@ TEST_F(ArrowInferenceTest, tsv_simple) {
ASSERT_EQ(fields[2].name(), "C");
}
+TEST_F(ArrowInferenceTest, tsv_empty) {
+ TString s3Data =
+ "this part should not matter because it will be omitted as a partial row,,";
+
+ Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult {
+ EXPECT_EQ(url, BaseUrl + Path);
+ EXPECT_EQ(data, "");
+
+ NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));;
+ return result;
+ });
+
+ auto inferencinatorId = RegisterInferencinator("tsv_with_names");
+ ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
+ NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
+ });
+
+ std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
+ auto response = event->CastAsLocal<TEvInferredFileSchema>();
+ ASSERT_NE(response, nullptr);
+ ASSERT_FALSE(response->Status.IsSuccess());
+ Cerr << response->Status.GetIssues().ToOneLineString() << Endl;
+}
+
+TEST_F(ArrowInferenceTest, broken_json) {
+ TString s3Data = "A,B,C\n";
+
+ Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult {
+ EXPECT_EQ(url, BaseUrl + Path);
+ EXPECT_EQ(data, "");
+
+ NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));;
+ return result;
+ });
+
+ auto inferencinatorId = RegisterInferencinator("json_each_row");
+ ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
+ NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
+ });
+
+ std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
+ auto response = event->CastAsLocal<TEvInferredFileSchema>();
+ ASSERT_NE(response, nullptr);
+ ASSERT_FALSE(response->Status.IsSuccess());
+ Cerr << response->Status.GetIssues().ToOneLineString() << Endl;
+ // ASSERT_EQ(...)
+}
+
+TEST_F(ArrowInferenceTest, empty_json_each_row) {
+ TString s3Data = "";
+
+ Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult {
+ EXPECT_EQ(url, BaseUrl + Path);
+ EXPECT_EQ(data, "");
+
+ NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));;
+ return result;
+ });
+
+ auto inferencinatorId = RegisterInferencinator("json_each_row");
+ ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
+ NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
+ });
+
+ std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
+ auto response = event->CastAsLocal<TEvInferredFileSchema>();
+ ASSERT_NE(response, nullptr);
+ ASSERT_FALSE(response->Status.IsSuccess());
+ Cerr << response->Status.GetIssues().ToOneLineString() << Endl;
+ // ASSERT_EQ(...)
+}
+
+TEST_F(ArrowInferenceTest, empty_json_list) {
+ TString s3Data = "";
+
+ Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult {
+ EXPECT_EQ(url, BaseUrl + Path);
+ EXPECT_EQ(data, "");
+
+ NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));;
+ return result;
+ });
+
+ auto inferencinatorId = RegisterInferencinator("json_list");
+ ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
+ NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
+ });
+
+ std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
+ auto response = event->CastAsLocal<TEvInferredFileSchema>();
+ ASSERT_NE(response, nullptr);
+ ASSERT_FALSE(response->Status.IsSuccess());
+ Cerr << response->Status.GetIssues().ToOneLineString() << Endl;
+ // ASSERT_EQ(...)
+}
+
+TEST_F(ArrowInferenceTest, broken_json_list) {
+ TString s3Data = "\nfoobar\n";
+
+ Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult {
+ EXPECT_EQ(url, BaseUrl + Path);
+ EXPECT_EQ(data, "");
+
+ NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));;
+ return result;
+ });
+
+ auto inferencinatorId = RegisterInferencinator("json_list");
+ ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] {
+ NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0));
+ });
+
+ std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId});
+ auto response = event->CastAsLocal<TEvInferredFileSchema>();
+ ASSERT_NE(response, nullptr);
+ ASSERT_FALSE(response->Status.IsSuccess());
+ Cerr << response->Status.GetIssues().ToOneLineString() << Endl;
+ // ASSERT_EQ(...)
+}
+// TODO: broken_compression, unrecognized_compression, broken_csv, broken_tsv (is there?), mock errors inside arrow::BufferBuilder,...
+
} // namespace