aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-12-20 17:58:29 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-12-20 17:58:29 +0300
commit1e90d439ad65bba680872176ad4ad3b3f87224d1 (patch)
tree6c89a7b777546bf853fbd6920f4a205d72cb2b0a
parentec56091007b81e45277c49d5c88ef1fa85c25de2 (diff)
downloadydb-1e90d439ad65bba680872176ad4ad3b3f87224d1.tar.gz
parse date/datetime in csv by array
-rw-r--r--ydb/core/formats/arrow_helpers.cpp14
-rw-r--r--ydb/core/formats/arrow_helpers.h1
-rw-r--r--ydb/core/io_formats/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/io_formats/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/io_formats/CMakeLists.linux.txt1
-rw-r--r--ydb/core/io_formats/csv.h9
-rw-r--r--ydb/core/io_formats/csv_arrow.cpp120
-rw-r--r--ydb/core/io_formats/ut_csv.cpp54
8 files changed, 192 insertions, 9 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 7920eb0e39a..89b9a1af915 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -219,6 +219,20 @@ std::shared_ptr<arrow::DataType> GetArrowType(NScheme::TTypeInfo typeId) {
return std::make_shared<arrow::NullType>();
}
+std::shared_ptr<arrow::DataType> GetCSVArrowType(NScheme::TTypeInfo typeId) {
+ std::shared_ptr<arrow::DataType> result;
+ switch (typeId.GetTypeId()) {
+ case NScheme::NTypeIds::Datetime:
+ return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
+ case NScheme::NTypeIds::Timestamp:
+ return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+ case NScheme::NTypeIds::Date:
+ return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
+ default:
+ return GetArrowType(typeId);
+ }
+}
+
std::vector<std::shared_ptr<arrow::Field>> MakeArrowFields(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(columns.size());
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 4ec2bae4cee..1736b93851f 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -24,6 +24,7 @@ public:
};
std::shared_ptr<arrow::DataType> GetArrowType(NScheme::TTypeInfo typeInfo);
+std::shared_ptr<arrow::DataType> GetCSVArrowType(NScheme::TTypeInfo typeId);
template <typename T>
inline bool ArrayEqualValue(const std::shared_ptr<arrow::Array>& x, const std::shared_ptr<arrow::Array>& y) {
diff --git a/ydb/core/io_formats/CMakeLists.darwin.txt b/ydb/core/io_formats/CMakeLists.darwin.txt
index 6e8e51e9673..12ffede84bb 100644
--- a/ydb/core/io_formats/CMakeLists.darwin.txt
+++ b/ydb/core/io_formats/CMakeLists.darwin.txt
@@ -10,6 +10,7 @@ add_subdirectory(ut)
add_library(ydb-core-io_formats)
target_compile_options(ydb-core-io_formats PRIVATE
+ -Wno-unused-parameter
-DUSE_CURRENT_UDF_ABI_VERSION
)
target_link_libraries(ydb-core-io_formats PUBLIC
diff --git a/ydb/core/io_formats/CMakeLists.linux-aarch64.txt b/ydb/core/io_formats/CMakeLists.linux-aarch64.txt
index f1fb5a7597d..b8d428bc209 100644
--- a/ydb/core/io_formats/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/io_formats/CMakeLists.linux-aarch64.txt
@@ -10,6 +10,7 @@ add_subdirectory(ut)
add_library(ydb-core-io_formats)
target_compile_options(ydb-core-io_formats PRIVATE
+ -Wno-unused-parameter
-DUSE_CURRENT_UDF_ABI_VERSION
)
target_link_libraries(ydb-core-io_formats PUBLIC
diff --git a/ydb/core/io_formats/CMakeLists.linux.txt b/ydb/core/io_formats/CMakeLists.linux.txt
index f1fb5a7597d..b8d428bc209 100644
--- a/ydb/core/io_formats/CMakeLists.linux.txt
+++ b/ydb/core/io_formats/CMakeLists.linux.txt
@@ -10,6 +10,7 @@ add_subdirectory(ut)
add_library(ydb-core-io_formats)
target_compile_options(ydb-core-io_formats PRIVATE
+ -Wno-unused-parameter
-DUSE_CURRENT_UDF_ABI_VERSION
)
target_link_libraries(ydb-core-io_formats PUBLIC
diff --git a/ydb/core/io_formats/csv.h b/ydb/core/io_formats/csv.h
index 8e9ef79e254..d2ac59c6ac0 100644
--- a/ydb/core/io_formats/csv.h
+++ b/ydb/core/io_formats/csv.h
@@ -57,11 +57,7 @@ public:
ParseOptions.escape_char = escapeChar;
}
- void SetNullValue(const TString& null = "") {
- ConvertOptions.null_values = { std::string(null.data(), null.size()) };
- ConvertOptions.strings_can_be_null = true;
- ConvertOptions.quoted_strings_can_be_null = false;
- }
+ void SetNullValue(const TString& null = "");
private:
arrow::csv::ReadOptions ReadOptions;
@@ -69,6 +65,9 @@ private:
arrow::csv::ConvertOptions ConvertOptions;
std::shared_ptr<arrow::csv::StreamingReader> Reader;
std::vector<TString> ResultColumns;
+ std::unordered_map<std::string, std::shared_ptr<arrow::DataType>> OriginalColumnTypes;
+
+ std::shared_ptr<arrow::RecordBatch> ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const;
static TString ErrorPrefix() {
return "Cannot read CSV: ";
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp
index 78a1a300ff1..40a560fb53a 100644
--- a/ydb/core/io_formats/csv_arrow.cpp
+++ b/ydb/core/io_formats/csv_arrow.cpp
@@ -1,14 +1,54 @@
#include "csv.h"
#include <ydb/core/formats/arrow_helpers.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h>
namespace NKikimr::NFormats {
+namespace {
+class TimestampIntParser: public arrow::TimestampParser {
+public:
+ TimestampIntParser() {}
+
+ bool operator()(const char* s, size_t length, arrow::TimeUnit::type out_unit,
+ int64_t* out) const override {
+ int64_t unitsCount;
+ if (!TryFromString(TString(s, length), unitsCount)) {
+ return false;
+ }
+ *out = unitsCount;
+ switch (out_unit) {
+ case arrow::TimeUnit::NANO:
+ *out *= 1000000000;
+ break;
+ case arrow::TimeUnit::MICRO:
+ *out *= 1000000;
+ break;
+ case arrow::TimeUnit::MILLI:
+ *out *= 1000;
+ break;
+ case arrow::TimeUnit::SECOND:
+ *out *= 1;
+ break;
+ }
+ return true;
+ }
+
+ const char* kind() const override { return "ts_int"; }
+};
+
+}
+
TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header)
: ReadOptions(arrow::csv::ReadOptions::Defaults())
, ParseOptions(arrow::csv::ParseOptions::Defaults())
, ConvertOptions(arrow::csv::ConvertOptions::Defaults())
{
ConvertOptions.check_utf8 = false;
+ ConvertOptions.timestamp_parsers.clear();
+ ConvertOptions.timestamp_parsers.emplace_back(arrow::TimestampParser::MakeISO8601());
+ ConvertOptions.timestamp_parsers.emplace_back(std::make_shared<TimestampIntParser>());
+
ReadOptions.block_size = DEFAULT_BLOCK_SIZE;
ReadOptions.use_threads = false;
ReadOptions.autogenerate_column_names = false;
@@ -19,7 +59,8 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu
for (auto& [name, type] : columns) {
ResultColumns.push_back(name);
std::string columnName(name.data(), name.size());
- ConvertOptions.column_types[columnName] = NArrow::GetArrowType(type);
+ ConvertOptions.column_types[columnName] = NArrow::GetCSVArrowType(type);
+ OriginalColumnTypes[columnName] = NArrow::GetArrowType(type);
}
} else if (!columns.empty()) {
// !autogenerate + !column_names.empty() => specified columns
@@ -28,7 +69,8 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu
for (auto& [name, type] : columns) {
std::string columnName(name.data(), name.size());
ReadOptions.column_names.push_back(columnName);
- ConvertOptions.column_types[columnName] = NArrow::GetArrowType(type);
+ ConvertOptions.column_types[columnName] = NArrow::GetCSVArrowType(type);
+ OriginalColumnTypes[columnName] = NArrow::GetArrowType(type);
}
#if 0
} else {
@@ -39,6 +81,68 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu
SetNullValue(); // set default null value
}
+std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const {
+ if (!parsedBatch) {
+ return parsedBatch;
+ }
+ std::shared_ptr<arrow::Schema> schema;
+ {
+ arrow::SchemaBuilder sBuilder;
+ for (auto&& f : parsedBatch->schema()->fields()) {
+ Y_VERIFY(sBuilder.AddField(std::make_shared<arrow::Field>(f->name(), f->type())).ok());
+
+ }
+ auto resultSchema = sBuilder.Finish();
+ Y_VERIFY(resultSchema.ok());
+ schema = *resultSchema;
+ }
+
+ std::vector<std::shared_ptr<arrow::Array>> resultColumns;
+ std::set<std::string> columnsFilter(ResultColumns.begin(), ResultColumns.end());
+ for (auto&& f : schema->fields()) {
+ auto fArr = parsedBatch->GetColumnByName(f->name());
+ std::shared_ptr<arrow::DataType> originalType;
+ if (columnsFilter.contains(f->name()) || columnsFilter.empty()) {
+ auto it = OriginalColumnTypes.find(f->name());
+ Y_VERIFY(it != OriginalColumnTypes.end());
+ originalType = it->second;
+ } else {
+ originalType = f->type();
+ }
+ if (fArr->type()->Equals(originalType)) {
+ resultColumns.emplace_back(fArr);
+ } else if (fArr->type()->id() == arrow::TimestampType::type_id) {
+ arrow::Result<std::shared_ptr<arrow::Array>> arrResult;
+ {
+ std::shared_ptr<arrow::TimestampArray> i64Arr = std::make_shared<arrow::TimestampArray>(fArr->data());
+ if (originalType->id() == arrow::UInt16Type::type_id) {
+ arrow::UInt16Builder aBuilder;
+ Y_VERIFY(aBuilder.Reserve(parsedBatch->num_rows()).ok());
+ for (long i = 0; i < parsedBatch->num_rows(); ++i) {
+ aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400ull);
+ }
+ arrResult = aBuilder.Finish();
+ } else if (originalType->id() == arrow::UInt32Type::type_id) {
+ arrow::UInt32Builder aBuilder;
+ Y_VERIFY(aBuilder.Reserve(parsedBatch->num_rows()).ok());
+ for (long i = 0; i < parsedBatch->num_rows(); ++i) {
+ aBuilder.UnsafeAppend(i64Arr->Value(i));
+ }
+ arrResult = aBuilder.Finish();
+ } else {
+ Y_VERIFY(false);
+ }
+ }
+ Y_VERIFY(arrResult.ok());
+ resultColumns.emplace_back(*arrResult);
+ } else {
+ Y_VERIFY(false);
+ }
+ }
+
+ return arrow::RecordBatch::Make(schema, parsedBatch->num_rows(), resultColumns);
+}
+
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TString& errString) {
if (!Reader) {
if (ConvertOptions.column_types.empty()) {
@@ -62,8 +166,10 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
return {};
}
- std::shared_ptr<arrow::RecordBatch> batch;
- Reader->ReadNext(&batch).ok();
+ std::shared_ptr<arrow::RecordBatch> batchParsed;
+ Reader->ReadNext(&batchParsed).ok();
+
+ std::shared_ptr<arrow::RecordBatch> batch = ConvertColumnTypes(batchParsed);
if (batch && !ResultColumns.empty()) {
batch = NArrow::ExtractColumns(batch, ResultColumns);
@@ -74,6 +180,12 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
return batch;
}
+void TArrowCSV::SetNullValue(const TString& null) {
+ ConvertOptions.null_values = { std::string(null.data(), null.size()) };
+ ConvertOptions.strings_can_be_null = true;
+ ConvertOptions.quoted_strings_can_be_null = false;
+}
+
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, TString& errString) {
auto batch = ReadNext(csv, errString);
if (!batch) {
diff --git a/ydb/core/io_formats/ut_csv.cpp b/ydb/core/io_formats/ut_csv.cpp
index 38b16a67364..6f2c4428670 100644
--- a/ydb/core/io_formats/ut_csv.cpp
+++ b/ydb/core/io_formats/ut_csv.cpp
@@ -77,6 +77,60 @@ TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colum
}
Y_UNIT_TEST_SUITE(FormatCSV) {
+ Y_UNIT_TEST(Instants) {
+ const TString dateTimeString = "2005-08-09T18:31:42";
+ const TString data = "11,12,2013-07-15," + dateTimeString + "," + dateTimeString;
+ TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
+
+ {
+ columns = {
+ {"datetime_int", NScheme::TTypeInfo(NScheme::NTypeIds::Datetime)},
+ {"timestamp_int", NScheme::TTypeInfo(NScheme::NTypeIds::Timestamp)},
+ {"date", NScheme::TTypeInfo(NScheme::NTypeIds::Date) },
+ {"datetime", NScheme::TTypeInfo(NScheme::NTypeIds::Datetime)},
+ {"timestamp", NScheme::TTypeInfo(NScheme::NTypeIds::Timestamp)}
+ };
+ TInstant dtInstant;
+ Y_VERIFY(TInstant::TryParseIso8601(dateTimeString, dtInstant));
+ TArrowCSV reader(columns, false);
+
+ TString errorMessage;
+ auto batch = reader.ReadNext(data, errorMessage);
+ Cerr << errorMessage << "\n";
+ UNIT_ASSERT(!!batch);
+ UNIT_ASSERT(errorMessage.empty());
+
+ auto cDatetimeInt = batch->GetColumnByName("datetime_int");
+ auto cTimestampInt = batch->GetColumnByName("timestamp_int");
+ auto cDate = batch->GetColumnByName("date");
+ auto cDatetime = batch->GetColumnByName("datetime");
+ auto cTimestamp = batch->GetColumnByName("timestamp");
+
+ Y_VERIFY(cDate->type()->id() == arrow::UInt16Type::type_id);
+ Y_VERIFY(cDatetime->type()->id() == arrow::UInt32Type::type_id);
+ Y_VERIFY(cTimestamp->type()->id() == arrow::TimestampType::type_id);
+ Y_VERIFY(cDatetimeInt->type()->id() == arrow::UInt32Type::type_id);
+ Y_VERIFY(cTimestampInt->type()->id() == arrow::TimestampType::type_id);
+ Y_VERIFY(batch->num_rows() == 1);
+
+ {
+ auto& ui32Column = static_cast<arrow::UInt32Array&>(*cDatetimeInt);
+ Y_VERIFY(ui32Column.Value(0) == 11, "%d", ui32Column.Value(0));
+ }
+ {
+ auto& tsColumn = static_cast<arrow::TimestampArray&>(*cTimestampInt);
+ Cerr << tsColumn.Value(0) << Endl;
+ Y_VERIFY(tsColumn.Value(0) == 12 * 1000000);
+ }
+ auto& ui16Column = static_cast<arrow::UInt16Array&>(*cDate);
+ Y_VERIFY(ui16Column.Value(0) == 15901, "%d", ui16Column.Value(0));
+ auto& ui32Column = static_cast<arrow::UInt32Array&>(*cDatetime);
+ Y_VERIFY(ui32Column.Value(0) == dtInstant.Seconds(), "%d", ui32Column.Value(0));
+ auto& tsColumn = static_cast<arrow::TimestampArray&>(*cTimestamp);
+ Y_VERIFY(tsColumn.Value(0) == (i64)dtInstant.MicroSeconds());
+ }
+ }
+
Y_UNIT_TEST(EmptyData) {
TString data = "";
TVector<std::pair<TString, NScheme::TTypeInfo>> columns;