aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstepandrey <stepandrey@yandex-team.com>2023-03-14 11:33:38 +0300
committerstepandrey <stepandrey@yandex-team.com>2023-03-14 11:33:38 +0300
commit905e6db573bae8dca6b071c468c29b8d3cd7a242 (patch)
tree86af8dbd32d8f867d55ab585315eb8cb7530c397
parent4dc924fece9830cb2b8c7f447c04b745561af4a6 (diff)
downloadydb-905e6db573bae8dca6b071c468c29b8d3cd7a242.tar.gz
fix type mismatch in parquet import
implement SameOrConvertableDstType
-rw-r--r--ydb/core/formats/arrow_helpers.cpp37
-rw-r--r--ydb/core/formats/arrow_helpers.h14
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h20
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp70
4 files changed, 138 insertions, 3 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 89b9a1af915..7ff35617a5c 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -1141,6 +1141,43 @@ std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::
return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
}
+static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column,
+ NScheme::TTypeInfo colType) {
+ switch (colType.GetTypeId()) {
+ case NScheme::NTypeIds::Timestamp: {
+ Y_VERIFY(arrow::is_primitive(column->type()->id()));
+ Y_VERIFY(arrow::bit_width(column->type()->id()) == 64);
+ return std::make_shared<arrow::TimestampArray>(column->data());
+ }
+ case NScheme::NTypeIds::Date: {
+ Y_VERIFY(arrow::is_primitive(column->type()->id()));
+ Y_VERIFY(arrow::bit_width(column->type()->id()) == 32);
+ return std::make_shared<arrow::Date32Array>(column->data());
+ }
+ default:
+ return {};
+ }
+}
+
+std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert) {
+ std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.reserve(batch->num_columns());
+ for (i32 i = 0; i < batch->num_columns(); ++i) {
+ auto& colName = batch->column_name(i);
+ auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
+ if (it != columnsToConvert.end()) {
+ columns[i] = InplaceConvertColumn(columns[i], it->second);
+ }
+ fields.push_back(std::make_shared<arrow::Field>(colName, columns[i]->type()));
+ }
+ auto resultSchemaFixed = std::make_shared<arrow::Schema>(fields);
+ auto convertedBatch = arrow::RecordBatch::Make(resultSchemaFixed, batch->num_rows(), columns);
+ Y_VERIFY(convertedBatch->ValidateFull() == arrow::Status::OK());
+ return convertedBatch;
+}
+
bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& errorMessage) {
std::vector<std::shared_ptr<arrow::Array>> allColumns;
allColumns.reserve(YdbSchema.size());
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 1736b93851f..cfd6c8c4a14 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -188,6 +188,18 @@ public:
return false;
}
+ static bool NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
+ switch (expectedType.GetTypeId()) {
+ case NScheme::NTypeIds::Timestamp:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int64;
+ case NScheme::NTypeIds::Date:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Uint16;
+ default:
+ break;
+ }
+ return false;
+ }
+
TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter)
: YdbSchema(ydbSchema)
, RowWriter(rowWriter)
@@ -207,6 +219,8 @@ public:
std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
+std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
return column->null_bitmap_data();
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 988cd9fff27..6f508342f72 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -150,6 +150,7 @@ protected:
TVector<std::pair<TString, NScheme::TTypeInfo>> YdbSchema;
THashMap<ui32, size_t> Id2Position; // columnId -> its position in YdbSchema
THashMap<TString, NScheme::TTypeInfo> ColumnsToConvert;
+ THashMap<TString, NScheme::TTypeInfo> ColumnsToConvertInplace;
bool WriteToTableShadow = false;
bool AllowWriteToPrivateTable = false;
@@ -284,6 +285,11 @@ private:
return res;
}
+ static bool SameOrConvertableDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) {
+ bool ok = SameDstType(type1, type2, allowConvert);
+ return ok || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2);
+ }
+
bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) {
Y_UNUSED(ctx);
Y_VERIFY(ResolveNamesResult);
@@ -365,13 +371,17 @@ private:
if (typeInProto.type_id()) {
auto typeInRequest = NScheme::TTypeInfo(typeInProto.type_id());
- bool ok = SameDstType(typeInRequest, ci.PType, GetSourceType() != EUploadSource::ProtoValues);
- if (!ok) {
- errorMessage = Sprintf("Type mismatch for column %s: expected %s, got %s",
+ bool sourceIsArrow = GetSourceType() != EUploadSource::ProtoValues;
+ bool ok = SameOrConvertableDstType(typeInRequest, ci.PType, sourceIsArrow); // TODO
+ if (!ok) {
+ errorMessage = Sprintf("Type mismatch for column %s: expected %s, got %s",
name.c_str(), NScheme::TypeName(ci.PType).c_str(),
NScheme::TypeName(typeInRequest).c_str());
return false;
}
+ if (NArrow::TArrowToYdbConverter::NeedInplaceConversion(typeInRequest, ci.PType)) {
+ ColumnsToConvertInplace[name] = ci.PType;
+ }
} else if (typeInProto.has_decimal_type() && ci.PType.GetTypeId() == NScheme::NTypeIds::Decimal) {
int precision = typeInProto.decimal_type().precision();
int scale = typeInProto.decimal_type().scale();
@@ -572,6 +582,9 @@ private:
if (!ExtractBatch(errorMessage)) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
+ if (!ColumnsToConvertInplace.empty()) {
+ Batch = NArrow::InplaceConvertColumns(Batch, ColumnsToConvertInplace);
+ }
// Explicit types conversion
if (!ColumnsToConvert.empty()) {
Batch = NArrow::ConvertColumns(Batch, ColumnsToConvert);
@@ -608,6 +621,7 @@ private:
if (TableKind == NSchemeCache::TSchemeCacheNavigate::KindTable) {
ResolveShards(ctx);
} else if (isColumnTable) {
+ // Batch is already converted
WriteToColumnTable(ctx);
} else {
return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index f86823c13bd..63eb6fd97dc 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -6,6 +6,7 @@
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/core/formats/arrow_helpers.h>
using namespace NYdb;
@@ -194,6 +195,75 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
}
+ Y_UNIT_TEST(ParquetImportBug) {
+ NKikimrConfig::TAppConfig appConfig;
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(connection);
+ auto session = client.GetSession().ExtractValueSync().GetSession();
+ TString tablePath = TTestOlap::TablePath;
+
+ std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = {
+ { "id", NYdb::EPrimitiveType::Uint32 },
+ { "timestamp", NYdb::EPrimitiveType::Timestamp },
+ { "date", NYdb::EPrimitiveType::Date }
+ };
+
+ auto tableBuilder = client.GetTableBuilder();
+ for (auto& [name, type] : schema) {
+ if (name == "id") {
+ tableBuilder.AddNonNullableColumn(name, type);
+ } else {
+ tableBuilder.AddNullableColumn(name, type);
+ }
+ }
+ tableBuilder.SetPrimaryKeyColumns({"id"});
+ auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
+
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ auto batchSchema = std::make_shared<arrow::Schema>(
+ std::vector<std::shared_ptr<arrow::Field>>{
+ arrow::field("id", arrow::uint32()),
+ arrow::field("timestamp", arrow::int64()),
+ arrow::field("date", arrow::uint16())
+ });
+
+ size_t rowsCount = 100;
+ auto builders = NArrow::MakeBuilders(batchSchema, rowsCount);
+
+ for (size_t i = 0; i < rowsCount; ++i) {
+ Y_VERIFY(NArrow::Append<arrow::UInt32Type>(*builders[0], i));
+ Y_VERIFY(NArrow::Append<arrow::Int64Type>(*builders[1], i));
+ Y_VERIFY(NArrow::Append<arrow::UInt16Type>(*builders[2], i));
+ }
+
+ auto srcBatch = arrow::RecordBatch::Make(batchSchema, rowsCount, NArrow::Finish(std::move(builders)));
+ TString strSchema = NArrow::SerializeSchema(*batchSchema);
+ TString strBatch = NArrow::SerializeBatchNoCompression(srcBatch);
+
+ TInstant start = TInstant::Now();
+ {
+ auto res = client.BulkUpsert(tablePath,
+ NYdb::NTable::EDataFormat::ApacheArrow, strBatch, strSchema).GetValueSync();
+
+ Cerr << res.GetStatus() << Endl;
+ UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
+ }
+ Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
+
+ { // Read all
+ auto rows = ScanQuerySelect(client, tablePath, schema);
+ UNIT_ASSERT_GT(rows.size(), 0);
+ }
+ }
+
Y_UNIT_TEST(UpsertCsvBug) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);