diff options
author | stepandrey <stepandrey@yandex-team.com> | 2023-03-14 11:33:38 +0300 |
---|---|---|
committer | stepandrey <stepandrey@yandex-team.com> | 2023-03-14 11:33:38 +0300 |
commit | 905e6db573bae8dca6b071c468c29b8d3cd7a242 (patch) | |
tree | 86af8dbd32d8f867d55ab585315eb8cb7530c397 | |
parent | 4dc924fece9830cb2b8c7f447c04b745561af4a6 (diff) | |
download | ydb-905e6db573bae8dca6b071c468c29b8d3cd7a242.tar.gz |
fix type mismatch in parquet import
implement SameOrConvertableDstType
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 37 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 20 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 70 |
4 files changed, 138 insertions, 3 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 89b9a1af915..7ff35617a5c 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -1141,6 +1141,43 @@ std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow:: return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns); } +static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column, + NScheme::TTypeInfo colType) { + switch (colType.GetTypeId()) { + case NScheme::NTypeIds::Timestamp: { + Y_VERIFY(arrow::is_primitive(column->type()->id())); + Y_VERIFY(arrow::bit_width(column->type()->id()) == 64); + return std::make_shared<arrow::TimestampArray>(column->data()); + } + case NScheme::NTypeIds::Date: { + Y_VERIFY(arrow::is_primitive(column->type()->id())); + Y_VERIFY(arrow::bit_width(column->type()->id()) == 32); + return std::make_shared<arrow::Date32Array>(column->data()); + } + default: + return {}; + } +} + +std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch, + const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert) { + std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns(); + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.reserve(batch->num_columns()); + for (i32 i = 0; i < batch->num_columns(); ++i) { + auto& colName = batch->column_name(i); + auto it = columnsToConvert.find(TString(colName.data(), colName.size())); + if (it != columnsToConvert.end()) { + columns[i] = InplaceConvertColumn(columns[i], it->second); + } + fields.push_back(std::make_shared<arrow::Field>(colName, columns[i]->type())); + } + auto resultSchemaFixed = std::make_shared<arrow::Schema>(fields); + auto convertedBatch = arrow::RecordBatch::Make(resultSchemaFixed, batch->num_rows(), columns); + Y_VERIFY(convertedBatch->ValidateFull() == arrow::Status::OK()); + return convertedBatch; +} + bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& errorMessage) { std::vector<std::shared_ptr<arrow::Array>> allColumns; allColumns.reserve(YdbSchema.size()); diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 1736b93851f..cfd6c8c4a14 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -188,6 +188,18 @@ public: return false; } + static bool NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) { + switch (expectedType.GetTypeId()) { + case NScheme::NTypeIds::Timestamp: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int64; + case NScheme::NTypeIds::Date: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Uint16; + default: + break; + } + return false; + } + TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter) : YdbSchema(ydbSchema) , RowWriter(rowWriter) @@ -207,6 +219,8 @@ public: std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert); +std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch, + const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert); inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) { return column->null_bitmap_data(); diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 988cd9fff27..6f508342f72 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -150,6 +150,7 @@ protected: TVector<std::pair<TString, NScheme::TTypeInfo>> YdbSchema; THashMap<ui32, size_t> Id2Position; // columnId -> its position in YdbSchema THashMap<TString, NScheme::TTypeInfo> ColumnsToConvert; + THashMap<TString, NScheme::TTypeInfo> ColumnsToConvertInplace; bool WriteToTableShadow = false; bool AllowWriteToPrivateTable = false; @@ -284,6 +285,11 @@ private: return res; } + static bool SameOrConvertableDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) { + bool ok = SameDstType(type1, type2, allowConvert); + return ok || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2); + } + bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) { Y_UNUSED(ctx); Y_VERIFY(ResolveNamesResult); @@ -365,13 +371,17 @@ private: if (typeInProto.type_id()) { auto typeInRequest = NScheme::TTypeInfo(typeInProto.type_id()); - bool ok = SameDstType(typeInRequest, ci.PType, GetSourceType() != EUploadSource::ProtoValues); - if (!ok) { - errorMessage = Sprintf("Type mismatch for column %s: expected %s, got %s", + bool sourceIsArrow = GetSourceType() != EUploadSource::ProtoValues; + bool ok = SameOrConvertableDstType(typeInRequest, ci.PType, sourceIsArrow); // TODO + if (!ok) { + errorMessage = Sprintf("Type mismatch for column %s: expected %s, got %s", name.c_str(), NScheme::TypeName(ci.PType).c_str(), NScheme::TypeName(typeInRequest).c_str()); return false; } + if (NArrow::TArrowToYdbConverter::NeedInplaceConversion(typeInRequest, ci.PType)) { + ColumnsToConvertInplace[name] = ci.PType; + } } else if (typeInProto.has_decimal_type() && ci.PType.GetTypeId() == NScheme::NTypeIds::Decimal) { int precision = typeInProto.decimal_type().precision(); int scale = typeInProto.decimal_type().scale(); @@ -572,6 +582,9 @@ private: if (!ExtractBatch(errorMessage)) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } + if (!ColumnsToConvertInplace.empty()) { + Batch = NArrow::InplaceConvertColumns(Batch, ColumnsToConvertInplace); + } // Explicit types conversion if (!ColumnsToConvert.empty()) { Batch = NArrow::ConvertColumns(Batch, ColumnsToConvert); @@ -608,6 +621,7 @@ private: if (TableKind == NSchemeCache::TSchemeCacheNavigate::KindTable) { ResolveShards(ctx); } else if (isColumnTable) { + // Batch is already converted WriteToColumnTable(ctx); } else { return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index f86823c13bd..63eb6fd97dc 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/core/formats/arrow_helpers.h> using namespace NYdb; @@ -194,6 +195,75 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } } + Y_UNIT_TEST(ParquetImportBug) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + NYdb::NTable::TTableClient client(connection); + auto session = client.GetSession().ExtractValueSync().GetSession(); + TString tablePath = TTestOlap::TablePath; + + std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = { + { "id", NYdb::EPrimitiveType::Uint32 }, + { "timestamp", NYdb::EPrimitiveType::Timestamp }, + { "date", NYdb::EPrimitiveType::Date } + }; + + auto tableBuilder = client.GetTableBuilder(); + for (auto& [name, type] : schema) { + if (name == "id") { + tableBuilder.AddNonNullableColumn(name, type); + } else { + tableBuilder.AddNullableColumn(name, type); + } + } + tableBuilder.SetPrimaryKeyColumns({"id"}); + auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); + + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto batchSchema = std::make_shared<arrow::Schema>( + std::vector<std::shared_ptr<arrow::Field>>{ + arrow::field("id", arrow::uint32()), + arrow::field("timestamp", arrow::int64()), + arrow::field("date", arrow::uint16()) + }); + + size_t rowsCount = 100; + auto builders = NArrow::MakeBuilders(batchSchema, rowsCount); + + for (size_t i = 0; i < rowsCount; ++i) { + Y_VERIFY(NArrow::Append<arrow::UInt32Type>(*builders[0], i)); + Y_VERIFY(NArrow::Append<arrow::Int64Type>(*builders[1], i)); + Y_VERIFY(NArrow::Append<arrow::UInt16Type>(*builders[2], i)); + } + + auto srcBatch = arrow::RecordBatch::Make(batchSchema, rowsCount, NArrow::Finish(std::move(builders))); + TString strSchema = NArrow::SerializeSchema(*batchSchema); + TString strBatch = NArrow::SerializeBatchNoCompression(srcBatch); + + TInstant start = TInstant::Now(); + { + auto res = client.BulkUpsert(tablePath, + NYdb::NTable::EDataFormat::ApacheArrow, strBatch, strSchema).GetValueSync(); + + Cerr << res.GetStatus() << Endl; + UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); + } + Cerr << "Upsert done: " << TInstant::Now() - start << Endl; + + { // Read all + auto rows = ScanQuerySelect(client, tablePath, schema); + UNIT_ASSERT_GT(rows.size(), 0); + } + } + Y_UNIT_TEST(UpsertCsvBug) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); |