diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-20 17:58:29 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-20 17:58:29 +0300 |
commit | 1e90d439ad65bba680872176ad4ad3b3f87224d1 (patch) | |
tree | 6c89a7b777546bf853fbd6920f4a205d72cb2b0a | |
parent | ec56091007b81e45277c49d5c88ef1fa85c25de2 (diff) | |
download | ydb-1e90d439ad65bba680872176ad4ad3b3f87224d1.tar.gz |
parse date/datetime in csv by array
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 14 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 1 | ||||
-rw-r--r-- | ydb/core/io_formats/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/io_formats/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/io_formats/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/io_formats/csv.h | 9 | ||||
-rw-r--r-- | ydb/core/io_formats/csv_arrow.cpp | 120 | ||||
-rw-r--r-- | ydb/core/io_formats/ut_csv.cpp | 54 |
8 files changed, 192 insertions, 9 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 7920eb0e39a..89b9a1af915 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -219,6 +219,20 @@ std::shared_ptr<arrow::DataType> GetArrowType(NScheme::TTypeInfo typeId) { return std::make_shared<arrow::NullType>(); } +std::shared_ptr<arrow::DataType> GetCSVArrowType(NScheme::TTypeInfo typeId) { + std::shared_ptr<arrow::DataType> result; + switch (typeId.GetTypeId()) { + case NScheme::NTypeIds::Datetime: + return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND); + case NScheme::NTypeIds::Timestamp: + return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO); + case NScheme::NTypeIds::Date: + return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND); + default: + return GetArrowType(typeId); + } +} + std::vector<std::shared_ptr<arrow::Field>> MakeArrowFields(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns) { std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(columns.size()); diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 4ec2bae4cee..1736b93851f 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -24,6 +24,7 @@ public: }; std::shared_ptr<arrow::DataType> GetArrowType(NScheme::TTypeInfo typeInfo); +std::shared_ptr<arrow::DataType> GetCSVArrowType(NScheme::TTypeInfo typeId); template <typename T> inline bool ArrayEqualValue(const std::shared_ptr<arrow::Array>& x, const std::shared_ptr<arrow::Array>& y) { diff --git a/ydb/core/io_formats/CMakeLists.darwin.txt b/ydb/core/io_formats/CMakeLists.darwin.txt index 6e8e51e9673..12ffede84bb 100644 --- a/ydb/core/io_formats/CMakeLists.darwin.txt +++ b/ydb/core/io_formats/CMakeLists.darwin.txt @@ -10,6 +10,7 @@ add_subdirectory(ut) add_library(ydb-core-io_formats) target_compile_options(ydb-core-io_formats PRIVATE + -Wno-unused-parameter -DUSE_CURRENT_UDF_ABI_VERSION ) target_link_libraries(ydb-core-io_formats PUBLIC diff --git a/ydb/core/io_formats/CMakeLists.linux-aarch64.txt b/ydb/core/io_formats/CMakeLists.linux-aarch64.txt index f1fb5a7597d..b8d428bc209 100644 --- a/ydb/core/io_formats/CMakeLists.linux-aarch64.txt +++ b/ydb/core/io_formats/CMakeLists.linux-aarch64.txt @@ -10,6 +10,7 @@ add_subdirectory(ut) add_library(ydb-core-io_formats) target_compile_options(ydb-core-io_formats PRIVATE + -Wno-unused-parameter -DUSE_CURRENT_UDF_ABI_VERSION ) target_link_libraries(ydb-core-io_formats PUBLIC diff --git a/ydb/core/io_formats/CMakeLists.linux.txt b/ydb/core/io_formats/CMakeLists.linux.txt index f1fb5a7597d..b8d428bc209 100644 --- a/ydb/core/io_formats/CMakeLists.linux.txt +++ b/ydb/core/io_formats/CMakeLists.linux.txt @@ -10,6 +10,7 @@ add_subdirectory(ut) add_library(ydb-core-io_formats) target_compile_options(ydb-core-io_formats PRIVATE + -Wno-unused-parameter -DUSE_CURRENT_UDF_ABI_VERSION ) target_link_libraries(ydb-core-io_formats PUBLIC diff --git a/ydb/core/io_formats/csv.h b/ydb/core/io_formats/csv.h index 8e9ef79e254..d2ac59c6ac0 100644 --- a/ydb/core/io_formats/csv.h +++ b/ydb/core/io_formats/csv.h @@ -57,11 +57,7 @@ public: ParseOptions.escape_char = escapeChar; } - void SetNullValue(const TString& null = "") { - ConvertOptions.null_values = { std::string(null.data(), null.size()) }; - ConvertOptions.strings_can_be_null = true; - ConvertOptions.quoted_strings_can_be_null = false; - } + void SetNullValue(const TString& null = ""); private: arrow::csv::ReadOptions ReadOptions; @@ -69,6 +65,9 @@ private: arrow::csv::ConvertOptions ConvertOptions; std::shared_ptr<arrow::csv::StreamingReader> Reader; std::vector<TString> ResultColumns; + std::unordered_map<std::string, std::shared_ptr<arrow::DataType>> OriginalColumnTypes; + + std::shared_ptr<arrow::RecordBatch> ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const; static TString ErrorPrefix() { return "Cannot read CSV: "; diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp index 78a1a300ff1..40a560fb53a 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/csv_arrow.cpp @@ -1,14 +1,54 @@ #include "csv.h" #include <ydb/core/formats/arrow_helpers.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h> namespace NKikimr::NFormats { +namespace { +class TimestampIntParser: public arrow::TimestampParser { +public: + TimestampIntParser() {} + + bool operator()(const char* s, size_t length, arrow::TimeUnit::type out_unit, + int64_t* out) const override { + int64_t unitsCount; + if (!TryFromString(TString(s, length), unitsCount)) { + return false; + } + *out = unitsCount; + switch (out_unit) { + case arrow::TimeUnit::NANO: + *out *= 1000000000; + break; + case arrow::TimeUnit::MICRO: + *out *= 1000000; + break; + case arrow::TimeUnit::MILLI: + *out *= 1000; + break; + case arrow::TimeUnit::SECOND: + *out *= 1; + break; + } + return true; + } + + const char* kind() const override { return "ts_int"; } +}; + +} + TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header) : ReadOptions(arrow::csv::ReadOptions::Defaults()) , ParseOptions(arrow::csv::ParseOptions::Defaults()) , ConvertOptions(arrow::csv::ConvertOptions::Defaults()) { ConvertOptions.check_utf8 = false; + ConvertOptions.timestamp_parsers.clear(); + ConvertOptions.timestamp_parsers.emplace_back(arrow::TimestampParser::MakeISO8601()); + ConvertOptions.timestamp_parsers.emplace_back(std::make_shared<TimestampIntParser>()); + ReadOptions.block_size = DEFAULT_BLOCK_SIZE; ReadOptions.use_threads = false; ReadOptions.autogenerate_column_names = false; @@ -19,7 +59,8 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu for (auto& [name, type] : columns) { ResultColumns.push_back(name); std::string columnName(name.data(), name.size()); - ConvertOptions.column_types[columnName] = NArrow::GetArrowType(type); + ConvertOptions.column_types[columnName] = NArrow::GetCSVArrowType(type); + OriginalColumnTypes[columnName] = NArrow::GetArrowType(type); } } else if (!columns.empty()) { // !autogenerate + !column_names.empty() => specified columns @@ -28,7 +69,8 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu for (auto& [name, type] : columns) { std::string columnName(name.data(), name.size()); ReadOptions.column_names.push_back(columnName); - ConvertOptions.column_types[columnName] = NArrow::GetArrowType(type); + ConvertOptions.column_types[columnName] = NArrow::GetCSVArrowType(type); + OriginalColumnTypes[columnName] = NArrow::GetArrowType(type); } #if 0 } else { @@ -39,6 +81,68 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu SetNullValue(); // set default null value } +std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const { + if (!parsedBatch) { + return parsedBatch; + } + std::shared_ptr<arrow::Schema> schema; + { + arrow::SchemaBuilder sBuilder; + for (auto&& f : parsedBatch->schema()->fields()) { + Y_VERIFY(sBuilder.AddField(std::make_shared<arrow::Field>(f->name(), f->type())).ok()); + + } + auto resultSchema = sBuilder.Finish(); + Y_VERIFY(resultSchema.ok()); + schema = *resultSchema; + } + + std::vector<std::shared_ptr<arrow::Array>> resultColumns; + std::set<std::string> columnsFilter(ResultColumns.begin(), ResultColumns.end()); + for (auto&& f : schema->fields()) { + auto fArr = parsedBatch->GetColumnByName(f->name()); + std::shared_ptr<arrow::DataType> originalType; + if (columnsFilter.contains(f->name()) || columnsFilter.empty()) { + auto it = OriginalColumnTypes.find(f->name()); + Y_VERIFY(it != OriginalColumnTypes.end()); + originalType = it->second; + } else { + originalType = f->type(); + } + if (fArr->type()->Equals(originalType)) { + resultColumns.emplace_back(fArr); + } else if (fArr->type()->id() == arrow::TimestampType::type_id) { + arrow::Result<std::shared_ptr<arrow::Array>> arrResult; + { + std::shared_ptr<arrow::TimestampArray> i64Arr = std::make_shared<arrow::TimestampArray>(fArr->data()); + if (originalType->id() == arrow::UInt16Type::type_id) { + arrow::UInt16Builder aBuilder; + Y_VERIFY(aBuilder.Reserve(parsedBatch->num_rows()).ok()); + for (long i = 0; i < parsedBatch->num_rows(); ++i) { + aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400ull); + } + arrResult = aBuilder.Finish(); + } else if (originalType->id() == arrow::UInt32Type::type_id) { + arrow::UInt32Builder aBuilder; + Y_VERIFY(aBuilder.Reserve(parsedBatch->num_rows()).ok()); + for (long i = 0; i < parsedBatch->num_rows(); ++i) { + aBuilder.UnsafeAppend(i64Arr->Value(i)); + } + arrResult = aBuilder.Finish(); + } else { + Y_VERIFY(false); + } + } + Y_VERIFY(arrResult.ok()); + resultColumns.emplace_back(*arrResult); + } else { + Y_VERIFY(false); + } + } + + return arrow::RecordBatch::Make(schema, parsedBatch->num_rows(), resultColumns); +} + std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TString& errString) { if (!Reader) { if (ConvertOptions.column_types.empty()) { @@ -62,8 +166,10 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr return {}; } - std::shared_ptr<arrow::RecordBatch> batch; - Reader->ReadNext(&batch).ok(); + std::shared_ptr<arrow::RecordBatch> batchParsed; + Reader->ReadNext(&batchParsed).ok(); + + std::shared_ptr<arrow::RecordBatch> batch = ConvertColumnTypes(batchParsed); if (batch && !ResultColumns.empty()) { batch = NArrow::ExtractColumns(batch, ResultColumns); @@ -74,6 +180,12 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr return batch; } +void TArrowCSV::SetNullValue(const TString& null) { + ConvertOptions.null_values = { std::string(null.data(), null.size()) }; + ConvertOptions.strings_can_be_null = true; + ConvertOptions.quoted_strings_can_be_null = false; +} + std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, TString& errString) { auto batch = ReadNext(csv, errString); if (!batch) { diff --git a/ydb/core/io_formats/ut_csv.cpp b/ydb/core/io_formats/ut_csv.cpp index 38b16a67364..6f2c4428670 100644 --- a/ydb/core/io_formats/ut_csv.cpp +++ b/ydb/core/io_formats/ut_csv.cpp @@ -77,6 +77,60 @@ TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colum } Y_UNIT_TEST_SUITE(FormatCSV) { + Y_UNIT_TEST(Instants) { + const TString dateTimeString = "2005-08-09T18:31:42"; + const TString data = "11,12,2013-07-15," + dateTimeString + "," + dateTimeString; + TVector<std::pair<TString, NScheme::TTypeInfo>> columns; + + { + columns = { + {"datetime_int", NScheme::TTypeInfo(NScheme::NTypeIds::Datetime)}, + {"timestamp_int", NScheme::TTypeInfo(NScheme::NTypeIds::Timestamp)}, + {"date", NScheme::TTypeInfo(NScheme::NTypeIds::Date) }, + {"datetime", NScheme::TTypeInfo(NScheme::NTypeIds::Datetime)}, + {"timestamp", NScheme::TTypeInfo(NScheme::NTypeIds::Timestamp)} + }; + TInstant dtInstant; + Y_VERIFY(TInstant::TryParseIso8601(dateTimeString, dtInstant)); + TArrowCSV reader(columns, false); + + TString errorMessage; + auto batch = reader.ReadNext(data, errorMessage); + Cerr << errorMessage << "\n"; + UNIT_ASSERT(!!batch); + UNIT_ASSERT(errorMessage.empty()); + + auto cDatetimeInt = batch->GetColumnByName("datetime_int"); + auto cTimestampInt = batch->GetColumnByName("timestamp_int"); + auto cDate = batch->GetColumnByName("date"); + auto cDatetime = batch->GetColumnByName("datetime"); + auto cTimestamp = batch->GetColumnByName("timestamp"); + + Y_VERIFY(cDate->type()->id() == arrow::UInt16Type::type_id); + Y_VERIFY(cDatetime->type()->id() == arrow::UInt32Type::type_id); + Y_VERIFY(cTimestamp->type()->id() == arrow::TimestampType::type_id); + Y_VERIFY(cDatetimeInt->type()->id() == arrow::UInt32Type::type_id); + Y_VERIFY(cTimestampInt->type()->id() == arrow::TimestampType::type_id); + Y_VERIFY(batch->num_rows() == 1); + + { + auto& ui32Column = static_cast<arrow::UInt32Array&>(*cDatetimeInt); + Y_VERIFY(ui32Column.Value(0) == 11, "%d", ui32Column.Value(0)); + } + { + auto& tsColumn = static_cast<arrow::TimestampArray&>(*cTimestampInt); + Cerr << tsColumn.Value(0) << Endl; + Y_VERIFY(tsColumn.Value(0) == 12 * 1000000); + } + auto& ui16Column = static_cast<arrow::UInt16Array&>(*cDate); + Y_VERIFY(ui16Column.Value(0) == 15901, "%d", ui16Column.Value(0)); + auto& ui32Column = static_cast<arrow::UInt32Array&>(*cDatetime); + Y_VERIFY(ui32Column.Value(0) == dtInstant.Seconds(), "%d", ui32Column.Value(0)); + auto& tsColumn = static_cast<arrow::TimestampArray&>(*cTimestamp); + Y_VERIFY(tsColumn.Value(0) == (i64)dtInstant.MicroSeconds()); + } + } + Y_UNIT_TEST(EmptyData) { TString data = ""; TVector<std::pair<TString, NScheme::TTypeInfo>> columns; |