aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan Sukhov <evanevannnn@ydb.tech>2024-08-09 19:55:38 +0300
committerGitHub <noreply@github.com>2024-08-09 16:55:38 +0000
commitee6b772c5f61986bc4b03006fb56680cd91a3a30 (patch)
tree96898bdfdfac53d2e9831f0032c05f1f214d6d61
parent1269ae5d3762666dcbb7fedda30029035a4b285c (diff)
downloadydb-ee6b772c5f61986bc4b03006fb56680cd91a3a30.tar.gz
handle EVFileError event in TObjectStorageExternalSource (#7569)
-rw-r--r--ydb/core/external_sources/object_storage.cpp6
-rw-r--r--ydb/core/external_sources/object_storage/events.h7
-rw-r--r--ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp18
-rw-r--r--ydb/tests/fq/s3/test_s3_0.py33
4 files changed, 58 insertions, 6 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index b1947d828f..cdad28cd34 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -355,13 +355,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
+ if (!response.Status.IsSuccess()) {
+ metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues()));
+ return;
+ }
+ TMetadataResult result;
meta->Changed = true;
meta->Schema.clear_column();
for (const auto& column : response.Fields) {
auto& destColumn = *meta->Schema.add_column();
destColumn = column;
}
- TMetadataResult result;
result.SetSuccess();
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
diff --git a/ydb/core/external_sources/object_storage/events.h b/ydb/core/external_sources/object_storage/events.h
index 1e1d0b03a7..e10dcaced5 100644
--- a/ydb/core/external_sources/object_storage/events.h
+++ b/ydb/core/external_sources/object_storage/events.h
@@ -10,6 +10,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
namespace NKikimr::NExternalSource::NObjectStorage {
@@ -128,10 +129,16 @@ struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvIn
struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
TEvInferredFileSchema(TString path, std::vector<Ydb::Column>&& fields)
: Path{std::move(path)}
+ , Status{NYdb::EStatus::SUCCESS, {}}
, Fields{std::move(fields)}
{}
+ TEvInferredFileSchema(TString path, NYql::TIssues&& issues)
+ : Path{std::move(path)}
+ , Status{NYdb::EStatus::INTERNAL_ERROR, std::move(issues)}
+ {}
TString Path;
+ NYdb::TStatus Status;
std::vector<Ydb::Column> Fields;
};
diff --git a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp
index 43e692cb5b..ce72a93fb0 100644
--- a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp
+++ b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp
@@ -153,6 +153,14 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type) {
}
return false;
}
+
+TEvInferredFileSchema* MakeErrorSchema(TString path, NFq::TIssuesIds::EIssueCode code, TString message) {
+ NYql::TIssues issues;
+ issues.AddIssue(std::move(message));
+ issues.back().SetCode(code, NYql::TSeverityIds::S_ERROR);
+ return new TEvInferredFileSchema{std::move(path), std::move(issues)};
+}
+
}
struct FormatConfig {
@@ -181,14 +189,14 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
.Value(&reader);
if (!readerStatus.ok()) {
- return TString{TStringBuilder{} << "couldn't make table from data: " << readerStatus.ToString()};
+ return TString{TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString()};
}
std::shared_ptr<arrow::Table> table;
auto tableRes = reader->Read().Value(&table);
if (!tableRes.ok()) {
- return TStringBuilder{} << "couldn't read table from data: " << readerStatus.ToString();
+ return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString();
}
return table->fields();
@@ -259,7 +267,7 @@ public:
auto& file = *ev->Get();
auto mbArrowFields = InferType(Format_, file.File, *Config_);
if (std::holds_alternative<TString>(mbArrowFields)) {
- ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
+ ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
return;
}
@@ -269,7 +277,7 @@ public:
ydbFields.emplace_back();
auto& ydbField = ydbFields.back();
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type())) {
- ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
+ ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
return;
}
ydbField.mutable_name()->assign(field->name());
@@ -279,7 +287,7 @@ public:
void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) {
Cout << "TArrowInferencinator::HandleFileError" << Endl;
- ctx.Send(RequesterId_, ev->Release());
+ ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues)));
}
private:
diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py
index 1c3ce29e64..31324e2943 100644
--- a/ydb/tests/fq/s3/test_s3_0.py
+++ b/ydb/tests/fq/s3/test_s3_0.py
@@ -310,6 +310,39 @@ Apple,2,22,
assert result_set.rows[2].items[2].int64_value == 3
assert sum(kikimr.control_plane.get_metering(1)) == 10
+ @yq_v2
+ @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
+ def test_inference_file_error(self, kikimr, s3, client, unique_prefix):
+ resource = boto3.resource(
+ "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
+ )
+
+ bucket = resource.Bucket("fbucket")
+ bucket.create(ACL='public-read')
+ bucket.objects.all().delete()
+
+ s3_client = boto3.client(
+ "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
+ )
+
+ read_data = '''{"a" : [10, 20, 30]}'''
+ s3_client.put_object(Body=read_data, Bucket='fbucket', Key='data.json', ContentType='text/plain')
+ kikimr.control_plane.wait_bootstrap(1)
+ storage_connection_name = unique_prefix + "json_bucket"
+ client.create_storage_connection(storage_connection_name, "fbucket")
+
+ sql = f'''
+ SELECT *
+ FROM `{storage_connection_name}`.`data.json`
+ WITH (format=csv_with_names, with_infer='true');
+ '''
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ assert "couldn\\'t parse csv/tsv file, check format and compression params:" in str(
+ client.describe_query(query_id).result
+ )
+
@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):