aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-08-24 14:06:53 +0300
committerchertus <azuikov@ydb.tech>2023-08-24 14:38:41 +0300
commit0d5dd26da4012de80c0af860d7b244ca2be85cd3 (patch)
tree5bc65bcbb1819c317782df89c177588f41d01c94
parente78d174cac68037472bbfc1832105152848786c1 (diff)
downloadydb-0d5dd26da4012de80c0af860d7b244ca2be85cd3.tar.gz
KIKIMR-18717 fix inplace types conversion in arrow bulk upsert
-rw-r--r--ydb/core/formats/arrow/converter.cpp24
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp4
2 files changed, 21 insertions, 7 deletions
diff --git a/ydb/core/formats/arrow/converter.cpp b/ydb/core/formats/arrow/converter.cpp
index b55c98f8c1..e72e36ec96 100644
--- a/ydb/core/formats/arrow/converter.cpp
+++ b/ydb/core/formats/arrow/converter.cpp
@@ -127,22 +127,33 @@ static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<
switch (colType.GetTypeId()) {
case NScheme::NTypeIds::Bytes: {
Y_VERIFY(column->type()->id() == arrow::Type::STRING);
- return std::make_shared<arrow::BinaryArray>(arrow::ArrayData::Make(arrow::binary(), column->data()->length, column->data()->buffers, column->data()->null_count, column->data()->offset));
+ return std::make_shared<arrow::BinaryArray>(
+ arrow::ArrayData::Make(arrow::binary(), column->data()->length,
+ column->data()->buffers, column->data()->null_count, column->data()->offset));
}
case NScheme::NTypeIds::Date: {
Y_VERIFY(arrow::is_primitive(column->type()->id()));
Y_VERIFY(arrow::bit_width(column->type()->id()) == 16);
- return std::make_shared<arrow::NumericArray<arrow::UInt16Type>>(column->data());
+
+ auto newData = column->data()->Copy();
+ newData->type = arrow::uint16();
+ return std::make_shared<arrow::NumericArray<arrow::UInt16Type>>(newData);
}
case NScheme::NTypeIds::Datetime: {
Y_VERIFY(arrow::is_primitive(column->type()->id()));
Y_VERIFY(arrow::bit_width(column->type()->id()) == 32);
- return std::make_shared<arrow::NumericArray<arrow::Int32Type>>(column->data());
+
+ auto newData = column->data()->Copy();
+ newData->type = arrow::uint32();
+ return std::make_shared<arrow::NumericArray<arrow::UInt32Type>>(newData);
}
case NScheme::NTypeIds::Timestamp: {
Y_VERIFY(arrow::is_primitive(column->type()->id()));
Y_VERIFY(arrow::bit_width(column->type()->id()) == 64);
- return std::make_shared<arrow::TimestampArray>(column->data());
+
+ auto newData = column->data()->Copy();
+ newData->type = arrow::timestamp(arrow::TimeUnit::MICRO);
+ return std::make_shared<arrow::TimestampArray>(newData);
}
default:
return {};
@@ -154,6 +165,7 @@ std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<
std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(batch->num_columns());
+
for (i32 i = 0; i < batch->num_columns(); ++i) {
auto& colName = batch->column_name(i);
auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
@@ -164,7 +176,9 @@ std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<
}
auto resultSchemaFixed = std::make_shared<arrow::Schema>(std::move(fields));
auto convertedBatch = arrow::RecordBatch::Make(resultSchemaFixed, batch->num_rows(), std::move(columns));
- Y_VERIFY(convertedBatch->ValidateFull() == arrow::Status::OK());
+
+ Y_VERIFY(convertedBatch->Validate().ok());
+ Y_VERIFY_DEBUG(convertedBatch->ValidateFull().ok());
return convertedBatch;
}
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index 10867cf5ec..37b477ea13 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -359,11 +359,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
UNIT_ASSERT_GT(rows.size(), 0);
}
}
-#if 0 // TODO: KIKIMR-18717
+
Y_UNIT_TEST(ParquetImportBug) {
ParquetImportBug(true);
}
-#endif
+
Y_UNIT_TEST(ParquetImportBug_Datashard) {
ParquetImportBug(false);
}