diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-09 21:19:10 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-09 21:19:10 +0300 |
commit | af6e46c00c6c8b81ccafc0df6b675a6808ae7f5d (patch) | |
tree | eaf5eea912e2db3cf0b0737f0a971afe3cce46e1 | |
parent | 3b01241beefaef786877f62b95fea71e68fc6554 (diff) | |
download | ydb-af6e46c00c6c8b81ccafc0df6b675a6808ae7f5d.tar.gz |
fix converted schema
fix for correct validation
-rw-r--r-- | ydb/core/io_formats/csv_arrow.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 5 |
2 files changed, 9 insertions, 4 deletions
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp index 7028c62b000..480456a6f10 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/csv_arrow.cpp @@ -99,6 +99,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt std::vector<std::shared_ptr<arrow::Array>> resultColumns; std::set<std::string> columnsFilter(ResultColumns.begin(), ResultColumns.end()); + arrow::SchemaBuilder sBuilderFixed; for (auto&& f : schema->fields()) { auto fArr = parsedBatch->GetColumnByName(f->name()); std::shared_ptr<arrow::DataType> originalType; @@ -106,8 +107,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt auto it = OriginalColumnTypes.find(f->name()); Y_VERIFY(it != OriginalColumnTypes.end()); originalType = it->second; + Y_VERIFY(sBuilderFixed.AddField(std::make_shared<arrow::Field>(f->name(), originalType)).ok()); } else { - originalType = f->type(); + continue; } if (fArr->type()->Equals(originalType)) { resultColumns.emplace_back(fArr); @@ -148,7 +150,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt } } - return arrow::RecordBatch::Make(schema, parsedBatch->num_rows(), resultColumns); + auto resultSchemaFixed = sBuilderFixed.Finish(); + Y_VERIFY(resultSchemaFixed.ok()); + return arrow::RecordBatch::Make(*resultSchemaFixed, parsedBatch->num_rows(), resultColumns); } std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TString& errString) { diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index d6ea07490ee..19260691b0e 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -661,8 +661,9 @@ private: Y_VERIFY(batch); #if 1 // TODO: check we call ValidateFull() once over pipeline (upsert -> long tx -> shard insert) - if (!batch->ValidateFull().ok()) { - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Bad batch in bulk upsert data", ctx); + auto validationInfo = batch->ValidateFull(); + if (!validationInfo.ok()) { + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Bad batch in bulk upsert data: " + validationInfo.message() + "; order:" + JoinSeq(", ", outputColumns), ctx); } #endif |