aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-02-09 21:19:10 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-02-09 21:19:10 +0300
commitaf6e46c00c6c8b81ccafc0df6b675a6808ae7f5d (patch)
treeeaf5eea912e2db3cf0b0737f0a971afe3cce46e1
parent3b01241beefaef786877f62b95fea71e68fc6554 (diff)
downloadydb-af6e46c00c6c8b81ccafc0df6b675a6808ae7f5d.tar.gz
fix converted schema
fix for correct validation
-rw-r--r--ydb/core/io_formats/csv_arrow.cpp8
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h5
2 files changed, 9 insertions, 4 deletions
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp
index 7028c62b000..480456a6f10 100644
--- a/ydb/core/io_formats/csv_arrow.cpp
+++ b/ydb/core/io_formats/csv_arrow.cpp
@@ -99,6 +99,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
std::vector<std::shared_ptr<arrow::Array>> resultColumns;
std::set<std::string> columnsFilter(ResultColumns.begin(), ResultColumns.end());
+ arrow::SchemaBuilder sBuilderFixed;
for (auto&& f : schema->fields()) {
auto fArr = parsedBatch->GetColumnByName(f->name());
std::shared_ptr<arrow::DataType> originalType;
@@ -106,8 +107,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
auto it = OriginalColumnTypes.find(f->name());
Y_VERIFY(it != OriginalColumnTypes.end());
originalType = it->second;
+ Y_VERIFY(sBuilderFixed.AddField(std::make_shared<arrow::Field>(f->name(), originalType)).ok());
} else {
- originalType = f->type();
+ continue;
}
if (fArr->type()->Equals(originalType)) {
resultColumns.emplace_back(fArr);
@@ -148,7 +150,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
}
}
- return arrow::RecordBatch::Make(schema, parsedBatch->num_rows(), resultColumns);
+ auto resultSchemaFixed = sBuilderFixed.Finish();
+ Y_VERIFY(resultSchemaFixed.ok());
+ return arrow::RecordBatch::Make(*resultSchemaFixed, parsedBatch->num_rows(), resultColumns);
}
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TString& errString) {
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index d6ea07490ee..19260691b0e 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -661,8 +661,9 @@ private:
Y_VERIFY(batch);
#if 1 // TODO: check we call ValidateFull() once over pipeline (upsert -> long tx -> shard insert)
- if (!batch->ValidateFull().ok()) {
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Bad batch in bulk upsert data", ctx);
+ auto validationInfo = batch->ValidateFull();
+ if (!validationInfo.ok()) {
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Bad batch in bulk upsert data: " + validationInfo.message() + "; order:" + JoinSeq(", ", outputColumns), ctx);
}
#endif