diff options
author | Ivan Sukhov <evanevannnn@ydb.tech> | 2024-08-09 19:55:38 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-09 16:55:38 +0000 |
commit | ee6b772c5f61986bc4b03006fb56680cd91a3a30 (patch) | |
tree | 96898bdfdfac53d2e9831f0032c05f1f214d6d61 | |
parent | 1269ae5d3762666dcbb7fedda30029035a4b285c (diff) | |
download | ydb-ee6b772c5f61986bc4b03006fb56680cd91a3a30.tar.gz |
handle EVFileError event in TObjectStorageExternalSource (#7569)
-rw-r--r-- | ydb/core/external_sources/object_storage.cpp | 6 | ||||
-rw-r--r-- | ydb/core/external_sources/object_storage/events.h | 7 | ||||
-rw-r--r-- | ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp | 18 | ||||
-rw-r--r-- | ydb/tests/fq/s3/test_s3_0.py | 33 |
4 files changed, 58 insertions, 6 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index b1947d828f..cdad28cd34 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -355,13 +355,17 @@ struct TObjectStorageExternalSource : public IExternalSource { return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) { auto promise = NThreading::NewPromise<TMetadataResult>(); auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) { + if (!response.Status.IsSuccess()) { + metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues())); + return; + } + TMetadataResult result; meta->Changed = true; meta->Schema.clear_column(); for (const auto& column : response.Fields) { auto& destColumn = *meta->Schema.add_column(); destColumn = column; } - TMetadataResult result; result.SetSuccess(); result.Metadata = meta; metaPromise.SetValue(std::move(result)); diff --git a/ydb/core/external_sources/object_storage/events.h b/ydb/core/external_sources/object_storage/events.h index 1e1d0b03a7..e10dcaced5 100644 --- a/ydb/core/external_sources/object_storage/events.h +++ b/ydb/core/external_sources/object_storage/events.h @@ -10,6 +10,7 @@ #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/core/fq/libs/config/protos/issue_id.pb.h> #include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> namespace NKikimr::NExternalSource::NObjectStorage { @@ -128,10 +129,16 @@ struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvIn struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> { TEvInferredFileSchema(TString path, std::vector<Ydb::Column>&& fields) : Path{std::move(path)} + , Status{NYdb::EStatus::SUCCESS, {}} , Fields{std::move(fields)} {} + TEvInferredFileSchema(TString path, NYql::TIssues&& issues) + : Path{std::move(path)} + , Status{NYdb::EStatus::INTERNAL_ERROR, std::move(issues)} + {} TString Path; + NYdb::TStatus Status; std::vector<Ydb::Column> Fields; }; diff --git a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp index 43e692cb5b..ce72a93fb0 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp +++ b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp @@ -153,6 +153,14 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type) { } return false; } + +TEvInferredFileSchema* MakeErrorSchema(TString path, NFq::TIssuesIds::EIssueCode code, TString message) { + NYql::TIssues issues; + issues.AddIssue(std::move(message)); + issues.back().SetCode(code, NYql::TSeverityIds::S_ERROR); + return new TEvInferredFileSchema{std::move(path), std::move(issues)}; +} + } struct FormatConfig { @@ -181,14 +189,14 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand .Value(&reader); if (!readerStatus.ok()) { - return TString{TStringBuilder{} << "couldn't make table from data: " << readerStatus.ToString()}; + return TString{TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString()}; } std::shared_ptr<arrow::Table> table; auto tableRes = reader->Read().Value(&table); if (!tableRes.ok()) { - return TStringBuilder{} << "couldn't read table from data: " << readerStatus.ToString(); + return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString(); } return table->fields(); @@ -259,7 +267,7 @@ public: auto& file = *ev->Get(); auto mbArrowFields = InferType(Format_, file.File, *Config_); if (std::holds_alternative<TString>(mbArrowFields)) { - ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields))); + ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields))); return; } @@ -269,7 +277,7 @@ public: ydbFields.emplace_back(); auto& ydbField = ydbFields.back(); if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type())) { - ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString())); + ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString())); return; } ydbField.mutable_name()->assign(field->name()); @@ -279,7 +287,7 @@ public: void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) { Cout << "TArrowInferencinator::HandleFileError" << Endl; - ctx.Send(RequesterId_, ev->Release()); + ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues))); } private: diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py index 1c3ce29e64..31324e2943 100644 --- a/ydb/tests/fq/s3/test_s3_0.py +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -310,6 +310,39 @@ Apple,2,22, assert result_set.rows[2].items[2].int64_value == 3 assert sum(kikimr.control_plane.get_metering(1)) == 10 + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_inference_file_error(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + read_data = '''{"a" : [10, 20, 30]}''' + s3_client.put_object(Body=read_data, Bucket='fbucket', Key='data.json', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "json_bucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`data.json` + WITH (format=csv_with_names, with_infer='true'); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + assert "couldn\\'t parse csv/tsv file, check format and compression params:" in str( + client.describe_query(query_id).result + ) + @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix): |