diff options
author | chertus <azuikov@ydb.tech> | 2023-08-24 14:06:53 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-08-24 14:38:41 +0300 |
commit | 0d5dd26da4012de80c0af860d7b244ca2be85cd3 (patch) | |
tree | 5bc65bcbb1819c317782df89c177588f41d01c94 | |
parent | e78d174cac68037472bbfc1832105152848786c1 (diff) | |
download | ydb-0d5dd26da4012de80c0af860d7b244ca2be85cd3.tar.gz |
KIKIMR-18717 fix inplace types conversion in arrow bulk upsert
-rw-r--r-- | ydb/core/formats/arrow/converter.cpp | 24 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 4 |
2 files changed, 21 insertions, 7 deletions
diff --git a/ydb/core/formats/arrow/converter.cpp b/ydb/core/formats/arrow/converter.cpp index b55c98f8c1..e72e36ec96 100644 --- a/ydb/core/formats/arrow/converter.cpp +++ b/ydb/core/formats/arrow/converter.cpp @@ -127,22 +127,33 @@ static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr< switch (colType.GetTypeId()) { case NScheme::NTypeIds::Bytes: { Y_VERIFY(column->type()->id() == arrow::Type::STRING); - return std::make_shared<arrow::BinaryArray>(arrow::ArrayData::Make(arrow::binary(), column->data()->length, column->data()->buffers, column->data()->null_count, column->data()->offset)); + return std::make_shared<arrow::BinaryArray>( + arrow::ArrayData::Make(arrow::binary(), column->data()->length, + column->data()->buffers, column->data()->null_count, column->data()->offset)); } case NScheme::NTypeIds::Date: { Y_VERIFY(arrow::is_primitive(column->type()->id())); Y_VERIFY(arrow::bit_width(column->type()->id()) == 16); - return std::make_shared<arrow::NumericArray<arrow::UInt16Type>>(column->data()); + + auto newData = column->data()->Copy(); + newData->type = arrow::uint16(); + return std::make_shared<arrow::NumericArray<arrow::UInt16Type>>(newData); } case NScheme::NTypeIds::Datetime: { Y_VERIFY(arrow::is_primitive(column->type()->id())); Y_VERIFY(arrow::bit_width(column->type()->id()) == 32); - return std::make_shared<arrow::NumericArray<arrow::Int32Type>>(column->data()); + + auto newData = column->data()->Copy(); + newData->type = arrow::uint32(); + return std::make_shared<arrow::NumericArray<arrow::UInt32Type>>(newData); } case NScheme::NTypeIds::Timestamp: { Y_VERIFY(arrow::is_primitive(column->type()->id())); Y_VERIFY(arrow::bit_width(column->type()->id()) == 64); - return std::make_shared<arrow::TimestampArray>(column->data()); + + auto newData = column->data()->Copy(); + newData->type = arrow::timestamp(arrow::TimeUnit::MICRO); + return std::make_shared<arrow::TimestampArray>(newData); } default: return {}; @@ -154,6 +165,7 @@ std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr< std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns(); std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(batch->num_columns()); + for (i32 i = 0; i < batch->num_columns(); ++i) { auto& colName = batch->column_name(i); auto it = columnsToConvert.find(TString(colName.data(), colName.size())); @@ -164,7 +176,9 @@ std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr< } auto resultSchemaFixed = std::make_shared<arrow::Schema>(std::move(fields)); auto convertedBatch = arrow::RecordBatch::Make(resultSchemaFixed, batch->num_rows(), std::move(columns)); - Y_VERIFY(convertedBatch->ValidateFull() == arrow::Status::OK()); + + Y_VERIFY(convertedBatch->Validate().ok()); + Y_VERIFY_DEBUG(convertedBatch->ValidateFull().ok()); return convertedBatch; } diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 10867cf5ec..37b477ea13 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -359,11 +359,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { UNIT_ASSERT_GT(rows.size(), 0); } } -#if 0 // TODO: KIKIMR-18717 + Y_UNIT_TEST(ParquetImportBug) { ParquetImportBug(true); } -#endif + Y_UNIT_TEST(ParquetImportBug_Datashard) { ParquetImportBug(false); } |