summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <[email protected]>2023-03-30 13:11:28 +0300
committerstanly <[email protected]>2023-03-30 13:11:28 +0300
commitcb8ce3a0ebddeeb9bac836ee697b8db2b8f4e1a9 (patch)
tree778894fc6b0c6ed7ee3a3106781bf6b30b0a8a96
parenta208151261bfd48ab7ef4724c52079852d745bb6 (diff)
support Uint16 -> Timestamp / Text -> Bytes
-rw-r--r--ydb/core/formats/arrow_helpers.cpp57
-rw-r--r--ydb/core/formats/arrow_helpers.h34
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp20
3 files changed, 71 insertions, 40 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 386ecc149f4..306c6298a10 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -1171,15 +1171,24 @@ std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::
static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column,
NScheme::TTypeInfo colType) {
switch (colType.GetTypeId()) {
- case NScheme::NTypeIds::Timestamp: {
- Y_VERIFY(arrow::is_primitive(column->type()->id()));
- Y_VERIFY(arrow::bit_width(column->type()->id()) == 64);
- return std::make_shared<arrow::TimestampArray>(column->data());
+ case NScheme::NTypeIds::Bytes: {
+ Y_VERIFY(column->type()->id() == arrow::Type::STRING);
+ return std::make_shared<arrow::BinaryArray>(column->data());
}
case NScheme::NTypeIds::Date: {
Y_VERIFY(arrow::is_primitive(column->type()->id()));
+ Y_VERIFY(arrow::bit_width(column->type()->id()) == 16);
+ return std::make_shared<arrow::NumericArray<arrow::UInt16Type>>(column->data());
+ }
+ case NScheme::NTypeIds::Datetime: {
+ Y_VERIFY(arrow::is_primitive(column->type()->id()));
Y_VERIFY(arrow::bit_width(column->type()->id()) == 32);
- return std::make_shared<arrow::Date32Array>(column->data());
+ return std::make_shared<arrow::NumericArray<arrow::Int32Type>>(column->data());
+ }
+ case NScheme::NTypeIds::Timestamp: {
+ Y_VERIFY(arrow::is_primitive(column->type()->id()));
+ Y_VERIFY(arrow::bit_width(column->type()->id()) == 64);
+ return std::make_shared<arrow::TimestampArray>(column->data());
}
default:
return {};
@@ -1205,6 +1214,44 @@ std::shared_ptr<arrow::RecordBatch> InplaceConvertColumns(const std::shared_ptr<
return convertedBatch;
}
+bool TArrowToYdbConverter::NeedDataConversion(const NScheme::TTypeInfo& colType) {
+ switch (colType.GetTypeId()) {
+ case NScheme::NTypeIds::DyNumber:
+ case NScheme::NTypeIds::JsonDocument:
+ case NScheme::NTypeIds::Decimal:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
+bool TArrowToYdbConverter::NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
+ switch (expectedType.GetTypeId()) {
+ case NScheme::NTypeIds::Bytes:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8;
+ case NScheme::NTypeIds::Date:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Uint16;
+ case NScheme::NTypeIds::Datetime:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int32;
+ case NScheme::NTypeIds::Timestamp:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int64;
+ default:
+ break;
+ }
+ return false;
+}
+
+bool TArrowToYdbConverter::NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
+ switch (expectedType.GetTypeId()) {
+ case NScheme::NTypeIds::JsonDocument:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8;
+ default:
+ break;
+ }
+ return false;
+}
+
bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& errorMessage) {
std::vector<std::shared_ptr<arrow::Array>> allColumns;
allColumns.reserve(YdbSchema.size());
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index c68260c53fe..69d551323fd 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -179,39 +179,11 @@ private:
}
public:
- static bool NeedDataConversion(const NScheme::TTypeInfo& colType) {
- switch (colType.GetTypeId()) {
- case NScheme::NTypeIds::DyNumber:
- case NScheme::NTypeIds::JsonDocument:
- case NScheme::NTypeIds::Decimal:
- return true;
- default:
- break;
- }
- return false;
- }
+ static bool NeedDataConversion(const NScheme::TTypeInfo& colType);
- static bool NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
- switch (expectedType.GetTypeId()) {
- case NScheme::NTypeIds::Timestamp:
- return typeInRequest.GetTypeId() == NScheme::NTypeIds::Int64;
- case NScheme::NTypeIds::Date:
- return typeInRequest.GetTypeId() == NScheme::NTypeIds::Uint16;
- default:
- break;
- }
- return false;
- }
+ static bool NeedInplaceConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType);
- static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
- switch (expectedType.GetTypeId()) {
- case NScheme::NTypeIds::JsonDocument:
- return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8;
- default:
- break;
- }
- return false;
- }
+ static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType);
TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter)
: YdbSchema(ydbSchema)
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index 63eb6fd97dc..3c940864f1d 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -211,7 +211,11 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = {
{ "id", NYdb::EPrimitiveType::Uint32 },
{ "timestamp", NYdb::EPrimitiveType::Timestamp },
- { "date", NYdb::EPrimitiveType::Date }
+ { "dateTimeS", NYdb::EPrimitiveType::Datetime },
+ { "dateTimeU", NYdb::EPrimitiveType::Datetime },
+ { "date", NYdb::EPrimitiveType::Date },
+ { "utf8ToString", NYdb::EPrimitiveType::String },
+ { "stringToString", NYdb::EPrimitiveType::String },
};
auto tableBuilder = client.GetTableBuilder();
@@ -232,16 +236,24 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
std::vector<std::shared_ptr<arrow::Field>>{
arrow::field("id", arrow::uint32()),
arrow::field("timestamp", arrow::int64()),
- arrow::field("date", arrow::uint16())
+ arrow::field("dateTimeS", arrow::int32()),
+ arrow::field("dateTimeU", arrow::uint32()),
+ arrow::field("date", arrow::uint16()),
+ arrow::field("utf8ToString", arrow::utf8()),
+ arrow::field("stringToString", arrow::binary()),
});
-
+
size_t rowsCount = 100;
auto builders = NArrow::MakeBuilders(batchSchema, rowsCount);
for (size_t i = 0; i < rowsCount; ++i) {
Y_VERIFY(NArrow::Append<arrow::UInt32Type>(*builders[0], i));
Y_VERIFY(NArrow::Append<arrow::Int64Type>(*builders[1], i));
- Y_VERIFY(NArrow::Append<arrow::UInt16Type>(*builders[2], i));
+ Y_VERIFY(NArrow::Append<arrow::Int32Type>(*builders[2], i));
+ Y_VERIFY(NArrow::Append<arrow::Int32Type>(*builders[3], i));
+ Y_VERIFY(NArrow::Append<arrow::UInt16Type>(*builders[4], i));
+ Y_VERIFY(NArrow::Append<arrow::StringType>(*builders[5], std::to_string(i)));
+ Y_VERIFY(NArrow::Append<arrow::BinaryType>(*builders[6], std::to_string(i)));
}
auto srcBatch = arrow::RecordBatch::Make(batchSchema, rowsCount, NArrow::Finish(std::move(builders)));