diff options
author | stanly <[email protected]> | 2023-03-30 13:11:28 +0300 |
---|---|---|
committer | stanly <[email protected]> | 2023-03-30 13:11:28 +0300 |
commit | cb8ce3a0ebddeeb9bac836ee697b8db2b8f4e1a9 (patch) | |
tree | 778894fc6b0c6ed7ee3a3106781bf6b30b0a8a96 | |
parent | a208151261bfd48ab7ef4724c52079852d745bb6 (diff) |
support Uint16 -> Timestamp / Text -> Bytes
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 57 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 34 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 20 |
3 files changed, 71 insertions, 40 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 386ecc149f4..306c6298a10 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -1171,15 +1171,24 @@ std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow:: static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column, NScheme::TTypeInfo colType) { switch (colType.GetTypeId()) { - case NScheme::NTypeIds::Timestamp: { - Y_VERIFY(arrow::is_primitive(column->type()->id())); - Y_VERIFY(arrow::bit_width(column->type()->id()) == 64); - return std::make_shared<arrow::TimestampArray>(column->data()); + case NScheme::NTypeIds::Bytes: { + Y_VERIFY(column->type()->id() == arrow::Type::STRING); + return std::make_shared<arrow::BinaryArray>(column->data()); } case NScheme::NTypeIds::Date: { Y_VERIFY(arrow::is_primitive(column->type()->id())); + Y_VERIFY(arrow::bit_width(column->type()->id()) == 16); + return std::make_shared<arrow::NumericArray<arrow::UInt16Type>>(column->data()); + } + case NScheme::NTypeIds::Datetime: { + Y_VERIFY(arrow::is_primitive(column->type()->id())); Y_VERIFY(arrow::bit_width(column->type()->id()) == 32); - return std::make_shared<arrow::Date32Array>(column->data()); + return std::make_shared<arrow::NumericArray<arrow::Int32Type>>(column->data()); + } + case NScheme::NTypeIds::Timestamp: { + Y_VERIFY(arrow::is_primitive(column->type()->id())); + Y_VERIFY(arrow::bit_width(column->type()->id()) == 64); + return std::make_shared<arrow::TimestampArray>(column->data()); } default: return {}; @@ -1205,6 +1214,44 @@ std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr< return convertedBatch; } +bool TArrowToYdbConverter::NeedDataConversion(const NScheme::TTypeInfo& colType) { + switch (colType.GetTypeId()) { + case NScheme::NTypeIds::DyNumber: + case NScheme::NTypeIds::JsonDocument: + case NScheme::NTypeIds::Decimal: + return true; + default: + break; + } + return false; +} + +bool TArrowToYdbConverter::NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) { + switch (expectedType.GetTypeId()) { + case NScheme::NTypeIds::Bytes: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8; + case NScheme::NTypeIds::Date: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Uint16; + case NScheme::NTypeIds::Datetime: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int32; + case NScheme::NTypeIds::Timestamp: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int64; + default: + break; + } + return false; +} + +bool TArrowToYdbConverter::NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) { + switch (expectedType.GetTypeId()) { + case NScheme::NTypeIds::JsonDocument: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8; + default: + break; + } + return false; +} + bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& errorMessage) { std::vector<std::shared_ptr<arrow::Array>> allColumns; allColumns.reserve(YdbSchema.size()); diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index c68260c53fe..69d551323fd 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -179,39 +179,11 @@ private: } public: - static bool NeedDataConversion(const NScheme::TTypeInfo& colType) { - switch (colType.GetTypeId()) { - case NScheme::NTypeIds::DyNumber: - case NScheme::NTypeIds::JsonDocument: - case NScheme::NTypeIds::Decimal: - return true; - default: - break; - } - return false; - } + static bool NeedDataConversion(const NScheme::TTypeInfo& colType); - static bool NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) { - switch (expectedType.GetTypeId()) { - case NScheme::NTypeIds::Timestamp: - return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int64; - case NScheme::NTypeIds::Date: - return typeInRequest.GetTypeId() == NScheme::NTypeIds::Uint16; - default: - break; - } - return false; - } + static bool NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType); - static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) { - switch (expectedType.GetTypeId()) { - case NScheme::NTypeIds::JsonDocument: - return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8; - default: - break; - } - return false; - } + static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType); TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter) : YdbSchema(ydbSchema) diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 63eb6fd97dc..3c940864f1d 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -211,7 +211,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = { { "id", NYdb::EPrimitiveType::Uint32 }, { "timestamp", NYdb::EPrimitiveType::Timestamp }, - { "date", NYdb::EPrimitiveType::Date } + { "dateTimeS", NYdb::EPrimitiveType::Datetime }, + { "dateTimeU", NYdb::EPrimitiveType::Datetime }, + { "date", NYdb::EPrimitiveType::Date }, + { "utf8ToString", NYdb::EPrimitiveType::String }, + { "stringToString", NYdb::EPrimitiveType::String }, }; auto tableBuilder = client.GetTableBuilder(); @@ -232,16 +236,24 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { std::vector<std::shared_ptr<arrow::Field>>{ arrow::field("id", arrow::uint32()), arrow::field("timestamp", arrow::int64()), - arrow::field("date", arrow::uint16()) + arrow::field("dateTimeS", arrow::int32()), + arrow::field("dateTimeU", arrow::uint32()), + arrow::field("date", arrow::uint16()), + arrow::field("utf8ToString", arrow::utf8()), + arrow::field("stringToString", arrow::binary()), }); - + size_t rowsCount = 100; auto builders = NArrow::MakeBuilders(batchSchema, rowsCount); for (size_t i = 0; i < rowsCount; ++i) { Y_VERIFY(NArrow::Append<arrow::UInt32Type>(*builders[0], i)); Y_VERIFY(NArrow::Append<arrow::Int64Type>(*builders[1], i)); - Y_VERIFY(NArrow::Append<arrow::UInt16Type>(*builders[2], i)); + Y_VERIFY(NArrow::Append<arrow::Int32Type>(*builders[2], i)); + Y_VERIFY(NArrow::Append<arrow::Int32Type>(*builders[3], i)); + Y_VERIFY(NArrow::Append<arrow::UInt16Type>(*builders[4], i)); + Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[5], std::to_string(i))); + Y_VERIFY(NArrow::Append<arrow::BinaryType>(*builders[6], std::to_string(i))); } auto srcBatch = arrow::RecordBatch::Make(batchSchema, rowsCount, NArrow::Finish(std::move(builders))); |