diff options
author | yumkam <yumkam7@ydb.tech> | 2025-04-15 18:15:21 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-15 18:15:21 +0300 |
commit | ecc1044834b45587d2a7fea2ddfa265aa9671eb2 (patch) | |
tree | 9fc056cb2a3fc05a2c291a1f3fec33cd6d1140e3 | |
parent | 0dac9309ca7605774ab3373660d73cd00578c59a (diff) | |
download | ydb-ecc1044834b45587d2a7fea2ddfa265aa9671eb2.tar.gz |
fix inferring error handling and more tests (#15581)
3 files changed, 141 insertions, 11 deletions
diff --git a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp index e642007bcd4..2684e2bebc8 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp +++ b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp @@ -217,7 +217,7 @@ private: decompressedData << decompressedChunk; } return std::move(decompressedData); - } catch (const yexception& error) { + } catch (const std::exception& error) { auto errorEv = MakeError( request.Path, NFq::TIssuesIds::INTERNAL_ERROR, diff --git a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp index ab162b61552..dadee457221 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp +++ b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp @@ -169,8 +169,12 @@ using ArrowFields = std::vector<ArrowField>; std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::RandomAccessFile> file, std::shared_ptr<CsvConfig> config) { int64_t fileSize; + constexpr auto errorHeader = "couldn't open csv/tsv file, check format and compression parameters: "sv; if (auto sizeStatus = file->GetSize().Value(&fileSize); !sizeStatus.ok()) { - return TStringBuilder{} << "coudn't get file size: " << sizeStatus.ToString(); + return TStringBuilder{} << errorHeader << "coudn't get file size: " << sizeStatus.ToString(); + } + if (fileSize <= 0 || fileSize > INT32_MAX) { + return TStringBuilder{} << errorHeader << "empty file"; } std::shared_ptr<arrow::csv::TableReader> reader; @@ -184,48 +188,52 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand .Value(&reader); if (!readerStatus.ok()) { - return TString{TStringBuilder{} << "couldn't open csv/tsv file, check format and compression parameters: " << readerStatus.ToString()}; + return TString{TStringBuilder{} << errorHeader << readerStatus.ToString()}; } std::shared_ptr<arrow::Table> table; auto tableRes = reader->Read().Value(&table); if (!tableRes.ok()) { - return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression parameters: " << tableRes.ToString(); + return TStringBuilder{} << errorHeader << tableRes.ToString(); } return table->fields(); } std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::RandomAccessFile> file) { + constexpr auto errorHeader = "couldn't open parquet file, check format parameters: "sv; parquet::arrow::FileReaderBuilder builder; builder.properties(parquet::ArrowReaderProperties(false)); auto openStatus = builder.Open(std::move(file)); if (!openStatus.ok()) { - return TStringBuilder{} << "couldn't open parquet file, check format parameters: " << openStatus.ToString(); + return TStringBuilder{} << errorHeader << openStatus.ToString(); } std::unique_ptr<parquet::arrow::FileReader> reader; auto readerStatus = builder.Build(&reader); if (!readerStatus.ok()) { - return TStringBuilder{} << "couldn't read parquet file, check format parameters: " << readerStatus.ToString(); + return TStringBuilder{} << errorHeader << readerStatus.ToString(); } std::shared_ptr<arrow::Schema> schema; auto schemaRes = reader->GetSchema(&schema); if (!schemaRes.ok()) { - return TStringBuilder{} << "couldn't parse parquet file, check format parameters: " << schemaRes.ToString(); + return TStringBuilder{} << errorHeader << schemaRes.ToString(); } return schema->fields(); } std::variant<ArrowFields, TString> InferJsonTypes(std::shared_ptr<arrow::io::RandomAccessFile> file, std::shared_ptr<JsonConfig> config) { + constexpr auto errorHeader = "couldn't open json file, check format and compression parameters: "sv; int64_t fileSize; if (auto sizeStatus = file->GetSize().Value(&fileSize); !sizeStatus.ok()) { - return TStringBuilder{} << "coudn't get file size: " << sizeStatus.ToString(); + return TStringBuilder{} << errorHeader << "coudn't get file size: " << sizeStatus.ToString(); + } + if (fileSize <= 0 || fileSize > INT32_MAX) { + return TStringBuilder{} << errorHeader << "empty file"; } - std::shared_ptr<arrow::json::TableReader> reader; auto readerStatus = arrow::json::TableReader::Make( arrow::default_memory_pool(), @@ -235,14 +243,14 @@ std::variant<ArrowFields, TString> InferJsonTypes(std::shared_ptr<arrow::io::Ran ).Value(&reader); if (!readerStatus.ok()) { - return TString{TStringBuilder{} << "couldn't open json file, check format and compression parameters: " << readerStatus.ToString()}; + return TString{TStringBuilder{} << errorHeader << readerStatus.ToString()}; } std::shared_ptr<arrow::Table> table; auto tableRes = reader->Read().Value(&table); if (!tableRes.ok()) { - return TString{TStringBuilder{} << "couldn't parse json file, check format and compression parameters: " << tableRes.ToString()}; + return TString{TStringBuilder{} << errorHeader << tableRes.ToString()}; } return table->fields(); diff --git a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp index a196d3c26cb..67749470585 100644 --- a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp +++ b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp @@ -93,6 +93,7 @@ TEST_F(ArrowInferenceTest, csv_simple) { auto response = event->CastAsLocal<TEvInferredFileSchema>(); ASSERT_NE(response, nullptr); + ASSERT_TRUE(response->Status.IsSuccess()); auto& fields = response->Fields; ASSERT_TRUE(fields[0].type().optional_type().item().has_type_id()); ASSERT_EQ(fields[0].type().optional_type().item().type_id(), Ydb::Type::INT64); @@ -143,4 +144,125 @@ TEST_F(ArrowInferenceTest, tsv_simple) { ASSERT_EQ(fields[2].name(), "C"); } +TEST_F(ArrowInferenceTest, tsv_empty) { + TString s3Data = + "this part should not matter because it will be omitted as a partial row,,"; + + Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult { + EXPECT_EQ(url, BaseUrl + Path); + EXPECT_EQ(data, ""); + + NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));; + return result; + }); + + auto inferencinatorId = RegisterInferencinator("tsv_with_names"); + ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); + }); + + std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); + auto response = event->CastAsLocal<TEvInferredFileSchema>(); + ASSERT_NE(response, nullptr); + ASSERT_FALSE(response->Status.IsSuccess()); + Cerr << response->Status.GetIssues().ToOneLineString() << Endl; +} + +TEST_F(ArrowInferenceTest, broken_json) { + TString s3Data = "A,B,C\n"; + + Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult { + EXPECT_EQ(url, BaseUrl + Path); + EXPECT_EQ(data, ""); + + NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));; + return result; + }); + + auto inferencinatorId = RegisterInferencinator("json_each_row"); + ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); + }); + + std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); + auto response = event->CastAsLocal<TEvInferredFileSchema>(); + ASSERT_NE(response, nullptr); + ASSERT_FALSE(response->Status.IsSuccess()); + Cerr << response->Status.GetIssues().ToOneLineString() << Endl; + // ASSERT_EQ(...) +} + +TEST_F(ArrowInferenceTest, empty_json_each_row) { + TString s3Data = ""; + + Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult { + EXPECT_EQ(url, BaseUrl + Path); + EXPECT_EQ(data, ""); + + NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));; + return result; + }); + + auto inferencinatorId = RegisterInferencinator("json_each_row"); + ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); + }); + + std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); + auto response = event->CastAsLocal<TEvInferredFileSchema>(); + ASSERT_NE(response, nullptr); + ASSERT_FALSE(response->Status.IsSuccess()); + Cerr << response->Status.GetIssues().ToOneLineString() << Endl; + // ASSERT_EQ(...) +} + +TEST_F(ArrowInferenceTest, empty_json_list) { + TString s3Data = ""; + + Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult { + EXPECT_EQ(url, BaseUrl + Path); + EXPECT_EQ(data, ""); + + NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));; + return result; + }); + + auto inferencinatorId = RegisterInferencinator("json_list"); + ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); + }); + + std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); + auto response = event->CastAsLocal<TEvInferredFileSchema>(); + ASSERT_NE(response, nullptr); + ASSERT_FALSE(response->Status.IsSuccess()); + Cerr << response->Status.GetIssues().ToOneLineString() << Endl; + // ASSERT_EQ(...) +} + +TEST_F(ArrowInferenceTest, broken_json_list) { + TString s3Data = "\nfoobar\n"; + + Gateway->AddDefaultResponse([=, this](TString url, NYql::IHTTPGateway::THeaders, TString data) -> NYql::IHTTPGateway::TResult { + EXPECT_EQ(url, BaseUrl + Path); + EXPECT_EQ(data, ""); + + NYql::IHTTPGateway::TResult result(NYql::IHTTPGateway::TContent(s3Data, 200));; + return result; + }); + + auto inferencinatorId = RegisterInferencinator("json_list"); + ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); + }); + + std::unique_ptr<NActors::IEventHandle> event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); + auto response = event->CastAsLocal<TEvInferredFileSchema>(); + ASSERT_NE(response, nullptr); + ASSERT_FALSE(response->Status.IsSuccess()); + Cerr << response->Status.GetIssues().ToOneLineString() << Endl; + // ASSERT_EQ(...) +} +// TODO: broken_compression, unrecognized_compression, broken_csv, broken_tsv (is there?), mock errors inside arrow::BufferBuilder,... + } // namespace |