diff options
author | Олег <150132506+iddqdex@users.noreply.github.com> | 2024-12-09 14:01:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-09 14:01:01 +0300 |
commit | 018b4410367a298fead3a986db6f8e0b121312d0 (patch) | |
tree | 5b50574bc5b8a53b21795e924634aeac190b9f8d | |
parent | e54c9ee613934b681c257c191d7ef2a868cbd8aa (diff) | |
download | ydb-018b4410367a298fead3a986db6f8e0b121312d0.tar.gz |
Cli parquet (#12372)
18 files changed, 407 insertions, 178 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md index 4d0d17a61b..386d6022ea 100644 --- a/ydb/apps/ydb/CHANGELOG.md +++ b/ydb/apps/ydb/CHANGELOG.md @@ -1,3 +1,4 @@ +* Use parquet format instead of CSV to fill tables in `ydb workload` benchmarks * Made `--consumer` flag in `ydb topic read` command optional. Now if this flag is not specified, reading is performed in no-consumer mode. In this mode partition IDs should be specified with `--partition-ids` option. * Fixed a bug in `ydb import file csv` where multiple columns with escaped quotes in the same row were parsed incorrectly * Truncate query results output in benchmarks diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 2faf34222e..5f6173a446 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -460,46 +460,13 @@ private: case EUploadSource::CSV: { auto& data = GetSourceData(); - auto& cvsSettings = GetCsvSettings(); - ui32 skipRows = cvsSettings.skip_rows(); - auto& delimiter = cvsSettings.delimiter(); - auto& nullValue = cvsSettings.null_value(); - bool withHeader = cvsSettings.header(); - - auto reader = NFormats::TArrowCSV::Create(SrcColumns, withHeader, NotNullColumns); + auto& csvSettings = GetCsvSettings(); + auto reader = NFormats::TArrowCSVScheme::Create(SrcColumns, csvSettings.header(), NotNullColumns); if (!reader.ok()) { errorMessage = reader.status().ToString(); return false; } - const auto& quoting = cvsSettings.quoting(); - if (quoting.quote_char().length() > 1) { - errorMessage = TStringBuilder() << "Wrong quote char '" << quoting.quote_char() << "'"; - return false; - } - const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front(); - reader->SetQuoting(!quoting.disabled(), qchar, !quoting.double_quote_disabled()); - reader->SetSkipRows(skipRows); - - if (!delimiter.empty()) { - if (delimiter.size() != 1) { - errorMessage = TStringBuilder() << "Wrong delimiter '" << delimiter << "'"; - return false; - } - - reader->SetDelimiter(delimiter[0]); - } - - if (!nullValue.empty()) { - reader->SetNullValue(nullValue); - } - - if (data.size() > NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) { - ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE; - blockSize *= data.size() / blockSize + 1; - reader->SetBlockSize(blockSize); - } - - Batch = reader->ReadSingleBatch(data, errorMessage); + Batch = reader->ReadSingleBatch(data, csvSettings, errorMessage); if (!Batch) { return false; } diff --git a/ydb/core/io_formats/arrow/csv_arrow.cpp b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.cpp index a3877733b7..7a756691d8 100644 --- a/ydb/core/io_formats/arrow/csv_arrow.cpp +++ b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.cpp @@ -1,8 +1,7 @@ #include "csv_arrow.h" -#include <ydb/core/formats/arrow/arrow_helpers.h> -#include <ydb/core/formats/arrow/serializer/stream.h> - +#include <contrib/libs/apache/arrow/cpp/src/arrow/array.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h> #include <util/string/join.h> @@ -43,29 +42,6 @@ public: } -arrow::Result<TArrowCSV> TArrowCSV::Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header, const std::set<std::string>& notNullColumns) { - TVector<TString> errors; - TColummns convertedColumns; - convertedColumns.reserve(columns.size()); - for (auto& [name, type] : columns) { - const auto arrowType = NArrow::GetArrowType(type); - if (!arrowType.ok()) { - errors.emplace_back("column " + name + ": " + arrowType.status().ToString()); - continue; - } - const auto csvArrowType = NArrow::GetCSVArrowType(type); - if (!csvArrowType.ok()) { - errors.emplace_back("column " + name + ": " + csvArrowType.status().ToString()); - continue; - } - convertedColumns.emplace_back(TColumnInfo{name, *arrowType, *csvArrowType}); - } - if (!errors.empty()) { - return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors)); - } - return TArrowCSV(convertedColumns, header, notNullColumns); -} - TArrowCSV::TArrowCSV(const TColummns& columns, bool header, const std::set<std::string>& notNullColumns) : ReadOptions(arrow::csv::ReadOptions::Defaults()) , ParseOptions(arrow::csv::ParseOptions::Defaults()) @@ -107,6 +83,27 @@ TArrowCSV::TArrowCSV(const TColummns& columns, bool header, const std::set<std:: SetNullValue(); // set default null value } +namespace { + + template<class TBuilder, class TOriginalArray> + std::shared_ptr<arrow::Array> ConvertArray(std::shared_ptr<arrow::ArrayData> data, ui64 dev) { + auto originalArr = std::make_shared<TOriginalArray>(data); + TBuilder aBuilder; + Y_ABORT_UNLESS(aBuilder.Reserve(originalArr->length()).ok()); + for (long i = 0; i < originalArr->length(); ++i) { + if (originalArr->IsNull(i)) { + Y_ABORT_UNLESS(aBuilder.AppendNull().ok()); + } else { + aBuilder.UnsafeAppend(originalArr->Value(i) / dev); + } + } + auto res = aBuilder.Finish(); + Y_ABORT_UNLESS(res.ok()); + return *res; + } + +} + std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const { if (!parsedBatch) { return nullptr; @@ -134,59 +131,20 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt if (fArr->type()->Equals(originalType)) { resultColumns.emplace_back(fArr); } else if (fArr->type()->id() == arrow::TimestampType::type_id) { - arrow::Result<std::shared_ptr<arrow::Array>> arrResult; - { - std::shared_ptr<arrow::TimestampArray> i64Arr = std::make_shared<arrow::TimestampArray>(fArr->data()); - if (originalType->id() == arrow::UInt16Type::type_id) { - arrow::UInt16Builder aBuilder; - Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok()); - for (long i = 0; i < parsedBatch->num_rows(); ++i) { - if (i64Arr->IsNull(i)) { - Y_ABORT_UNLESS(aBuilder.AppendNull().ok()); - } else { - aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400ull); - } - } - arrResult = aBuilder.Finish(); - } else if (originalType->id() == arrow::UInt32Type::type_id) { - arrow::UInt32Builder aBuilder; - Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok()); - for (long i = 0; i < parsedBatch->num_rows(); ++i) { - if (i64Arr->IsNull(i)) { - Y_ABORT_UNLESS(aBuilder.AppendNull().ok()); - } else { - aBuilder.UnsafeAppend(i64Arr->Value(i)); - } - } - arrResult = aBuilder.Finish(); - } else if (originalType->id() == arrow::Int32Type::type_id) { - arrow::Int32Builder aBuilder; - Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok()); - for (long i = 0; i < parsedBatch->num_rows(); ++i) { - if (i64Arr->IsNull(i)) { - Y_ABORT_UNLESS(aBuilder.AppendNull().ok()); - } else { - aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400); - } - } - arrResult = aBuilder.Finish(); - } else if (originalType->id() == arrow::Int64Type::type_id) { - arrow::Int64Builder aBuilder; - Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok()); - for (long i = 0; i < parsedBatch->num_rows(); ++i) { - if (i64Arr->IsNull(i)) { - Y_ABORT_UNLESS(aBuilder.AppendNull().ok()); - } else { - aBuilder.UnsafeAppend(i64Arr->Value(i)); - } - } - arrResult = aBuilder.Finish(); - } else { + resultColumns.emplace_back([originalType, fArr]() { + switch (originalType->id()) { + case arrow::UInt16Type::type_id: // Date + return ConvertArray<arrow::UInt16Builder, arrow::TimestampArray>(fArr->data(), 86400); + case arrow::UInt32Type::type_id: // Datetime + return ConvertArray<arrow::UInt32Builder, arrow::TimestampArray>(fArr->data(), 1); + case arrow::Int32Type::type_id: // Date32 + return ConvertArray<arrow::Int32Builder, arrow::TimestampArray>(fArr->data(), 86400); + case arrow::Int64Type::type_id:// Datetime64, Timestamp64 + return ConvertArray<arrow::Int64Builder, arrow::TimestampArray>(fArr->data(), 1); + default: Y_ABORT_UNLESS(false); } - } - Y_ABORT_UNLESS(arrResult.ok()); - resultColumns.emplace_back(*arrResult); + }()); } else { Y_ABORT_UNLESS(false); } @@ -204,7 +162,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr return {}; } - auto buffer = std::make_shared<NArrow::NSerialization::TBufferOverString>(csv); + auto buffer = std::make_shared<arrow::Buffer>(arrow::util::string_view(csv.c_str(), csv.length())); auto input = std::make_shared<arrow::io::BufferReader>(buffer); auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input, ReadOptions, ParseOptions, ConvertOptions); @@ -249,11 +207,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr return {}; } - if (batch && ResultColumns.size()) { - batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(batch, ResultColumns); - if (!batch) { - errString = ErrorPrefix() + "not all result columns present"; - } + if (batch && ResultColumns.size() && batch->schema()->fields().size() != ResultColumns.size()) { + errString = ErrorPrefix() + "not all result columns present"; + batch.reset(); } return batch; } @@ -279,5 +235,34 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& cs } return batch; } +std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, const Ydb::Formats::CsvSettings& csvSettings, TString& errString) { + const auto& quoting = csvSettings.quoting(); + if (quoting.quote_char().length() > 1) { + errString = ErrorPrefix() + "Wrong quote char '" + quoting.quote_char() + "'"; + return {}; + } + + const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front(); + SetQuoting(!quoting.disabled(), qchar, !quoting.double_quote_disabled()); + if (csvSettings.delimiter()) { + if (csvSettings.delimiter().size() != 1) { + errString = ErrorPrefix() + "Invalid delimitr in csv: " + csvSettings.delimiter(); + return {}; + } + SetDelimiter(csvSettings.delimiter().front()); + } + SetSkipRows(csvSettings.skip_rows()); + + if (csvSettings.null_value()) { + SetNullValue(csvSettings.null_value()); + } + + if (csv.size() > NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) { + ui32 blockSize = NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE; + blockSize *= csv.size() / blockSize + 1; + SetBlockSize(blockSize); + } + return ReadSingleBatch(csv, errString); +} } diff --git a/ydb/core/io_formats/arrow/csv_arrow.h b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h index 49fd0cd84c..86ba35af40 100644 --- a/ydb/core/io_formats/arrow/csv_arrow.h +++ b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h @@ -1,9 +1,13 @@ #pragma once -#include <ydb/core/scheme_types/scheme_type_info.h> - +#include <ydb/public/api/protos/ydb_formats.pb.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/csv/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <set> +#include <vector> +#include <unordered_map> namespace NKikimr::NFormats { @@ -11,12 +15,9 @@ class TArrowCSV { public: static constexpr ui32 DEFAULT_BLOCK_SIZE = 1024 * 1024; - /// If header is true read column names from first line after skipRows. Parse columns as strings in this case. - /// @note It's possible to skip header with skipRows and use typed columns instead. - static arrow::Result<TArrowCSV> Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false, const std::set<std::string>& notNullColumns = {}); - std::shared_ptr<arrow::RecordBatch> ReadNext(const TString& csv, TString& errString); std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, TString& errString); + std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, const Ydb::Formats::CsvSettings& csvSettings, TString& errString); void Reset() { Reader = {}; @@ -49,7 +50,7 @@ public: void SetNullValue(const TString& null = ""); -private: +protected: struct TColumnInfo { TString Name; std::shared_ptr<arrow::DataType> ArrowType; @@ -57,6 +58,12 @@ private: }; using TColummns = TVector<TColumnInfo>; TArrowCSV(const TColummns& columns, bool header, const std::set<std::string>& notNullColumns); + + static TString ErrorPrefix() { + return "Cannot read CSV: "; + } + +private: arrow::csv::ReadOptions ReadOptions; arrow::csv::ParseOptions ParseOptions; arrow::csv::ConvertOptions ConvertOptions; @@ -66,10 +73,6 @@ private: std::set<std::string> NotNullColumns; std::shared_ptr<arrow::RecordBatch> ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const; - - static TString ErrorPrefix() { - return "Cannot read CSV: "; - } }; } diff --git a/ydb/core/io_formats/arrow/csv_arrow/ya.make b/ydb/core/io_formats/arrow/csv_arrow/ya.make new file mode 100644 index 0000000000..5daf419b6f --- /dev/null +++ b/ydb/core/io_formats/arrow/csv_arrow/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + csv_arrow.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/public/api/protos +) + +END() diff --git a/ydb/core/io_formats/arrow/csv_arrow_ut.cpp b/ydb/core/io_formats/arrow/csv_arrow_ut.cpp index aa37ebbdbc..736fcb32c3 100644 --- a/ydb/core/io_formats/arrow/csv_arrow_ut.cpp +++ b/ydb/core/io_formats/arrow/csv_arrow_ut.cpp @@ -1,4 +1,4 @@ -#include "csv_arrow.h" +#include <ydb/core/io_formats/arrow/scheme/scheme.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <library/cpp/testing/unittest/registar.h> @@ -64,7 +64,7 @@ TestReadSingleBatch(TArrowCSV& reader, std::shared_ptr<arrow::RecordBatch> TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data, char delimiter, bool header, ui32 numRows, ui32 skipRows = 0, std::optional<char> escape = {}) { - auto reader = TArrowCSV::Create(columns, header); + auto reader = TArrowCSVScheme::Create(columns, header); UNIT_ASSERT_C(reader.ok(), reader.status().ToString()); reader->SetDelimiter(delimiter); if (skipRows) { @@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) { }; TInstant dtInstant; Y_ABORT_UNLESS(TInstant::TryParseIso8601(dateTimeString, dtInstant)); - auto reader = TArrowCSV::Create(columns, false); + auto reader = TArrowCSVScheme::Create(columns, false); UNIT_ASSERT_C(reader.ok(), reader.status().ToString()); TString errorMessage; @@ -159,7 +159,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) { TVector<std::pair<TString, NScheme::TTypeInfo>> columns; { - auto reader = TArrowCSV::Create(columns, false); + auto reader = TArrowCSVScheme::Create(columns, false); UNIT_ASSERT_C(reader.ok(), reader.status().ToString()); TString errorMessage; @@ -175,7 +175,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) { {"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)} }; - auto reader = TArrowCSV::Create(columns, false); + auto reader = TArrowCSVScheme::Create(columns, false); UNIT_ASSERT_C(reader.ok(), reader.status().ToString()); TString errorMessage; @@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) { csv += TString() + null + delimiter + q + null + q + delimiter + q + null + q + endLine; csv += TString() + null + delimiter + null + delimiter + null + endLine; - auto reader = TArrowCSV::Create(columns, false); + auto reader = TArrowCSVScheme::Create(columns, false); UNIT_ASSERT_C(reader.ok(), reader.status().ToString()); if (!nulls.empty() || !defaultNull) { reader->SetNullValue(null); diff --git a/ydb/core/io_formats/arrow/scheme/scheme.cpp b/ydb/core/io_formats/arrow/scheme/scheme.cpp new file mode 100644 index 0000000000..c5281943fe --- /dev/null +++ b/ydb/core/io_formats/arrow/scheme/scheme.cpp @@ -0,0 +1,30 @@ +#include "scheme.h" +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <util/string/join.h> + +namespace NKikimr::NFormats { + +arrow::Result<TArrowCSV> TArrowCSVScheme::Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header, const std::set<std::string>& notNullColumns) { + TVector<TString> errors; + TColummns convertedColumns; + convertedColumns.reserve(columns.size()); + for (auto& [name, type] : columns) { + const auto arrowType = NArrow::GetArrowType(type); + if (!arrowType.ok()) { + errors.emplace_back("column " + name + ": " + arrowType.status().ToString()); + continue; + } + const auto csvArrowType = NArrow::GetCSVArrowType(type); + if (!csvArrowType.ok()) { + errors.emplace_back("column " + name + ": " + csvArrowType.status().ToString()); + continue; + } + convertedColumns.emplace_back(TColumnInfo{name, *arrowType, *csvArrowType}); + } + if (!errors.empty()) { + return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors)); + } + return TArrowCSVScheme(convertedColumns, header, notNullColumns); +} + +}
\ No newline at end of file diff --git a/ydb/core/io_formats/arrow/scheme/scheme.h b/ydb/core/io_formats/arrow/scheme/scheme.h new file mode 100644 index 0000000000..3f6c47cf84 --- /dev/null +++ b/ydb/core/io_formats/arrow/scheme/scheme.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h> +#include <ydb/core/scheme_types/scheme_type_info.h> + +namespace NKikimr::NFormats { + +class TArrowCSVScheme: public TArrowCSV { + using TArrowCSV::TArrowCSV; +public: + /// If header is true read column names from first line after skipRows. Parse columns as strings in this case. + /// @note It's possible to skip header with skipRows and use typed columns instead. + static arrow::Result<TArrowCSV> Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false, const std::set<std::string>& notNullColumns = {}); + +}; + +}
\ No newline at end of file diff --git a/ydb/core/io_formats/arrow/scheme/ya.make b/ydb/core/io_formats/arrow/scheme/ya.make new file mode 100644 index 0000000000..d11eb59149 --- /dev/null +++ b/ydb/core/io_formats/arrow/scheme/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + scheme.cpp +) + +PEERDIR( + ydb/core/formats/arrow + ydb/core/io_formats/arrow/csv_arrow + ydb/core/scheme_types +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/io_formats/arrow/table/table.cpp b/ydb/core/io_formats/arrow/table/table.cpp new file mode 100644 index 0000000000..0364acfc70 --- /dev/null +++ b/ydb/core/io_formats/arrow/table/table.cpp @@ -0,0 +1,120 @@ +#include "table.h" +#include <util/string/join.h> + +namespace NKikimr::NFormats { + +arrow::Result<TArrowCSV> TArrowCSVTable::Create(const TVector<NYdb::NTable::TTableColumn>& columns, bool header) { + TVector<TString> errors; + TColummns convertedColumns; + convertedColumns.reserve(columns.size()); + std::set<std::string> notNullColumns; + for (auto& column : columns) { + const auto arrowType = GetArrowType(column.Type); + if (!arrowType.ok()) { + errors.emplace_back("column " + column.Name + ": " + arrowType.status().ToString()); + continue; + } + const auto csvArrowType = GetCSVArrowType(column.Type); + if (!csvArrowType.ok()) { + errors.emplace_back("column " + column.Name + ": " + csvArrowType.status().ToString()); + continue; + } + convertedColumns.emplace_back(TColumnInfo{column.Name, *arrowType, *csvArrowType}); + if (NYdb::TTypeParser(column.Type).GetKind() != NYdb::TTypeParser::ETypeKind::Optional || column.NotNull.value_or(false)) { + notNullColumns.emplace(column.Name); + } + } + if (!errors.empty()) { + return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors)); + } + return TArrowCSVTable(convertedColumns, header, notNullColumns); +} + +NYdb::TTypeParser TArrowCSVTable::ExtractType(const NYdb::TType& type) { + NYdb::TTypeParser tp(type); + if (tp.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { + tp.OpenOptional(); + } + return std::move(tp); +} + +arrow::Result<std::shared_ptr<arrow::DataType>> TArrowCSVTable::GetArrowType(const NYdb::TType& type) { + auto tp = ExtractType(type); + switch (tp.GetKind()) { + case NYdb::TTypeParser::ETypeKind::Decimal: + return arrow::decimal(tp.GetDecimal().Precision, tp.GetDecimal().Scale); + case NYdb::TTypeParser::ETypeKind::Primitive: + switch (tp.GetPrimitive()) { + case NYdb::EPrimitiveType::Bool: + return arrow::boolean(); + case NYdb::EPrimitiveType::Int8: + return arrow::int8(); + case NYdb::EPrimitiveType::Uint8: + return arrow::uint8(); + case NYdb::EPrimitiveType::Int16: + return arrow::int16(); + case NYdb::EPrimitiveType::Date: + return arrow::uint16(); + case NYdb::EPrimitiveType::Date32: + return arrow::int32(); + case NYdb::EPrimitiveType::Datetime: + return arrow::uint32(); + case NYdb::EPrimitiveType::Uint16: + return arrow::uint16(); + case NYdb::EPrimitiveType::Int32: + return arrow::int32(); + case NYdb::EPrimitiveType::Uint32: + return arrow::uint32(); + case NYdb::EPrimitiveType::Int64: + return arrow::int64(); + case NYdb::EPrimitiveType::Uint64: + return arrow::uint64(); + case NYdb::EPrimitiveType::Float: + return arrow::float32(); + case NYdb::EPrimitiveType::Double: + return arrow::float64(); + case NYdb::EPrimitiveType::Utf8: + case NYdb::EPrimitiveType::Json: + return arrow::utf8(); + case NYdb::EPrimitiveType::String: + case NYdb::EPrimitiveType::Yson: + case NYdb::EPrimitiveType::DyNumber: + case NYdb::EPrimitiveType::JsonDocument: + return arrow::binary(); + case NYdb::EPrimitiveType::Timestamp: + return arrow::timestamp(arrow::TimeUnit::MICRO); + case NYdb::EPrimitiveType::Interval: + return arrow::duration(arrow::TimeUnit::MILLI); + case NYdb::EPrimitiveType::Datetime64: + case NYdb::EPrimitiveType::Interval64: + case NYdb::EPrimitiveType::Timestamp64: + return arrow::int64(); + default: + return arrow::Status::TypeError(ErrorPrefix() + "Not supported type " + ToString(tp.GetPrimitive())); + } + default: + return arrow::Status::TypeError(ErrorPrefix() + "Not supported type kind " + ToString(tp.GetKind())); + } +} + +arrow::Result<std::shared_ptr<arrow::DataType>> TArrowCSVTable::GetCSVArrowType(const NYdb::TType& type) { + auto tp = ExtractType(type); + if (tp.GetKind() == NYdb::TTypeParser::ETypeKind::Primitive) { + switch (tp.GetPrimitive()) { + case NYdb::EPrimitiveType::Datetime: + case NYdb::EPrimitiveType::Datetime64: + return arrow::timestamp(arrow::TimeUnit::SECOND); + case NYdb::EPrimitiveType::Timestamp: + case NYdb::EPrimitiveType::Timestamp64: + return arrow::timestamp(arrow::TimeUnit::MICRO); + case NYdb::EPrimitiveType::Date: + case NYdb::EPrimitiveType::Date32: + return arrow::timestamp(arrow::TimeUnit::SECOND); + default: + break; + } + } + return GetArrowType(type); +} + +}
\ No newline at end of file diff --git a/ydb/core/io_formats/arrow/table/table.h b/ydb/core/io_formats/arrow/table/table.h new file mode 100644 index 0000000000..7e686e9e97 --- /dev/null +++ b/ydb/core/io_formats/arrow/table/table.h @@ -0,0 +1,21 @@ +#pragma once + +#include <ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NKikimr::NFormats { + +class TArrowCSVTable: public TArrowCSV { + using TArrowCSV::TArrowCSV; +public: + /// If header is true read column names from first line after skipRows. Parse columns as strings in this case. + /// @note It's possible to skip header with skipRows and use typed columns instead. + static arrow::Result<TArrowCSV> Create(const TVector<NYdb::NTable::TTableColumn>& columns, bool header = false); + +private: + static NYdb::TTypeParser ExtractType(const NYdb::TType& type); + static arrow::Result<std::shared_ptr<arrow::DataType>> GetArrowType(const NYdb::TType& type); + static arrow::Result<std::shared_ptr<arrow::DataType>> GetCSVArrowType(const NYdb::TType& type); +}; + +}
\ No newline at end of file diff --git a/ydb/core/io_formats/arrow/table/ya.make b/ydb/core/io_formats/arrow/table/ya.make new file mode 100644 index 0000000000..273fea3bf7 --- /dev/null +++ b/ydb/core/io_formats/arrow/table/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + table.cpp +) + +PEERDIR( + ydb/core/io_formats/arrow/csv_arrow + ydb/public/sdk/cpp/client/ydb_table +) + +END() diff --git a/ydb/core/io_formats/arrow/ut/ya.make b/ydb/core/io_formats/arrow/ut/ya.make index f957632820..04f06d111f 100644 --- a/ydb/core/io_formats/arrow/ut/ya.make +++ b/ydb/core/io_formats/arrow/ut/ya.make @@ -3,7 +3,7 @@ UNITTEST_FOR(ydb/core/io_formats/arrow) SIZE(SMALL) PEERDIR( - ydb/core/io_formats/arrow + ydb/core/io_formats/arrow/scheme # for NYql::NUdf alloc stuff used in binary_json yql/essentials/public/udf/service/exception_policy diff --git a/ydb/core/io_formats/arrow/ya.make b/ydb/core/io_formats/arrow/ya.make index 58ba9c23ba..e8969661e3 100644 --- a/ydb/core/io_formats/arrow/ya.make +++ b/ydb/core/io_formats/arrow/ya.make @@ -1,20 +1,6 @@ -RECURSE_FOR_TESTS(ut) - -LIBRARY() - -SRCS( - csv_arrow.cpp -) - -CFLAGS( - -Wno-unused-parameter +RECURSE( + csv_arrow + scheme ) -PEERDIR( - ydb/core/scheme_types - ydb/core/formats/arrow -) - -YQL_LAST_ABI_VERSION() - -END() +RECURSE_FOR_TESTS(ut) diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index d92f7a0503..36fcb2d9cc 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -6,7 +6,7 @@ #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> #include <ydb/core/formats/arrow/converter.h> -#include <ydb/core/io_formats/arrow/csv_arrow.h> +#include <ydb/core/io_formats/arrow/scheme/scheme.h> #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/base/path.h> #include <ydb/core/base/feature_flags.h> diff --git a/ydb/core/tx/tx_proxy/ya.make b/ydb/core/tx/tx_proxy/ya.make index a5042ac58b..911b835d3a 100644 --- a/ydb/core/tx/tx_proxy/ya.make +++ b/ydb/core/tx/tx_proxy/ya.make @@ -33,7 +33,7 @@ PEERDIR( ydb/core/engine ydb/core/formats ydb/core/grpc_services/local_rpc - ydb/core/io_formats/arrow + ydb/core/io_formats/arrow/scheme ydb/core/protos ydb/core/scheme ydb/core/sys_view/common diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make index ca8cb98765..6fe15ccccb 100644 --- a/ydb/public/lib/ydb_cli/commands/ya.make +++ b/ydb/public/lib/ydb_cli/commands/ya.make @@ -66,6 +66,7 @@ PEERDIR( ydb/public/sdk/cpp/client/ydb_table ydb/public/sdk/cpp/client/ydb_topic ydb/public/sdk/cpp/client/ydb_types/credentials/login + ydb/core/io_formats/arrow/table ) GENERATE_ENUM_SERIALIZATION(ydb_ping.h) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp index 504715a83e..5fc4a77ec9 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp @@ -1,5 +1,9 @@ #include "ydb_workload_import.h" +#include <ydb/core/io_formats/arrow/table/table.h> +#include <ydb/public/api/protos/ydb_formats.pb.h> #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/lib/ydb_cli/common/recursive_list.h> +#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h> #include <ydb/library/yverify_stream/yverify_stream.h> #include <library/cpp/threading/future/async.h> #include <util/generic/deque.h> @@ -43,14 +47,14 @@ TWorkloadCommandImport::TUploadCommand::TUploadCommand(NYdbWorkload::TWorkloadPa , Initializer(initializer) {} -int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& /*workloadGen*/, TConfig& /*config*/) { +int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) { auto dataGeneratorList = Initializer->GetBulkInitialData(); AtomicSet(ErrorsCount, 0); InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight); if (UploadParams.FileOutputPath.IsDefined()) { Writer = MakeHolder<TFileWriter>(*this); } else { - Writer = MakeHolder<TDbWriter>(*this); + Writer = MakeHolder<TDbWriter>(*this, workloadGen, config); } for (auto dataGen : dataGeneratorList) { TThreadPoolParams params; @@ -75,40 +79,92 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe } return AtomicGet(ErrorsCount) ? EXIT_FAILURE : EXIT_SUCCESS; } - class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter { public: - using IWriter::IWriter; + TDbWriter(TWorkloadCommandImport::TUploadCommand& owner, NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) + : IWriter(owner) + { + RetrySettings.RetryUndefined(true); + RetrySettings.MaxRetries(30); + for(const auto& path: workloadGen.GetCleanPaths()) { + const auto list = NConsoleClient::RecursiveList(*owner.SchemeClient, config.Database + "/" + path.c_str()); + for (const auto& entry : list.Entries) { + if (entry.Type == NScheme::ESchemeEntryType::ColumnTable || entry.Type == NScheme::ESchemeEntryType::Table) { + const auto tableDescr = owner.TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync().GetSession().DescribeTable(entry.Name).ExtractValueSync().GetTableDescription(); + auto& params = ArrowCsvParams[entry.Name]; + params.Columns = tableDescr.GetTableColumns(); + } + } + } + } TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override { - auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) { - return TStatus(result.GetValueSync()); - }; if (std::holds_alternative<NYdbWorkload::IBulkDataGenerator::TDataPortion::TSkip>(portion->MutableData())) { return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); } if (auto* value = std::get_if<TValue>(&portion->MutableData())) { - return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult); + return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(ConvertResult); } - NRetry::TRetryOperationSettings retrySettings; - retrySettings.RetryUndefined(true); - retrySettings.MaxRetries(30); if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) { - return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) { - NTable::TBulkUpsertSettings settings; - settings.FormatSettings(value->FormatString); - return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings) - .Apply(convertResult); - }, retrySettings); + return WriteCsv(portion); } if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) { - return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) { + return Owner.TableClient->RetryOperation([value, portion](NTable::TTableClient& client) { return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema) - .Apply(convertResult); - }, retrySettings); + .Apply(ConvertResult); + }, RetrySettings); } Y_FAIL_S("Invalid data portion"); } + +private: + TAsyncStatus WriteCsv(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) { + const auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData()); + const auto* param = MapFindPtr(ArrowCsvParams, portion->GetTable()); + if (!param) { + return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue("Table does not exist: " + portion->GetTable())}))); + } + auto arrowCsv = NKikimr::NFormats::TArrowCSVTable::Create(param->Columns, true); + if (!arrowCsv.ok()) { + return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(arrowCsv.status().ToString())}))); + } + Ydb::Formats::CsvSettings csvSettings; + if (!csvSettings.ParseFromString(value->FormatString)) { + return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue("Invalid format string")}))); + }; + + auto writeOptions = arrow::ipc::IpcWriteOptions::Defaults(); + constexpr auto codecType = arrow::Compression::type::ZSTD; + writeOptions.codec = *arrow::util::Codec::Create(codecType); + TString error; + if (auto batch = arrowCsv->ReadSingleBatch(value->Data, csvSettings, error)) { + if (error) { + return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(error)}))); + } + return Owner.TableClient->RetryOperation([ + parquet = NYdb_cli::NArrow::SerializeBatch(batch, writeOptions), + schema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()), + portion](NTable::TTableClient& client) { + return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, parquet, schema) + .Apply(ConvertResult); + }, RetrySettings); + } + if (error) { + return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(error)}))); + } + return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); + } + + static TStatus ConvertResult(const NTable::TAsyncBulkUpsertResult& result) { + return TStatus(result.GetValueSync()); + } + + struct TArrowCSVParams { + TVector<NYdb::NTable::TTableColumn> Columns; + }; + + TMap<TString, TArrowCSVParams> ArrowCsvParams; + NRetry::TRetryOperationSettings RetrySettings; }; class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter { @@ -139,7 +195,10 @@ public: return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); } if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) { - return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented")); + auto g = Guard(Lock); + auto [out, created] = GetOutput(portion->GetTable()); + out->Write(value->Data); + return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); } Y_FAIL_S("Invalid data portion"); } |