aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <150132506+iddqdex@users.noreply.github.com>2024-12-09 14:01:01 +0300
committerGitHub <noreply@github.com>2024-12-09 14:01:01 +0300
commit018b4410367a298fead3a986db6f8e0b121312d0 (patch)
tree5b50574bc5b8a53b21795e924634aeac190b9f8d
parente54c9ee613934b681c257c191d7ef2a868cbd8aa (diff)
downloadydb-018b4410367a298fead3a986db6f8e0b121312d0.tar.gz
Cli parquet (#12372)
-rw-r--r--ydb/apps/ydb/CHANGELOG.md1
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp39
-rw-r--r--ydb/core/io_formats/arrow/csv_arrow/csv_arrow.cpp (renamed from ydb/core/io_formats/arrow/csv_arrow.cpp)151
-rw-r--r--ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h (renamed from ydb/core/io_formats/arrow/csv_arrow.h)25
-rw-r--r--ydb/core/io_formats/arrow/csv_arrow/ya.make12
-rw-r--r--ydb/core/io_formats/arrow/csv_arrow_ut.cpp12
-rw-r--r--ydb/core/io_formats/arrow/scheme/scheme.cpp30
-rw-r--r--ydb/core/io_formats/arrow/scheme/scheme.h17
-rw-r--r--ydb/core/io_formats/arrow/scheme/ya.make15
-rw-r--r--ydb/core/io_formats/arrow/table/table.cpp120
-rw-r--r--ydb/core/io_formats/arrow/table/table.h21
-rw-r--r--ydb/core/io_formats/arrow/table/ya.make12
-rw-r--r--ydb/core/io_formats/arrow/ut/ya.make2
-rw-r--r--ydb/core/io_formats/arrow/ya.make22
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h2
-rw-r--r--ydb/core/tx/tx_proxy/ya.make2
-rw-r--r--ydb/public/lib/ydb_cli/commands/ya.make1
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp101
18 files changed, 407 insertions, 178 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md
index 4d0d17a61b..386d6022ea 100644
--- a/ydb/apps/ydb/CHANGELOG.md
+++ b/ydb/apps/ydb/CHANGELOG.md
@@ -1,3 +1,4 @@
+* Use parquet format instead of CSV to fill tables in `ydb workload` benchmarks
* Made `--consumer` flag in `ydb topic read` command optional. Now if this flag is not specified, reading is performed in no-consumer mode. In this mode partition IDs should be specified with `--partition-ids` option.
* Fixed a bug in `ydb import file csv` where multiple columns with escaped quotes in the same row were parsed incorrectly
* Truncate query results output in benchmarks
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index 2faf34222e..5f6173a446 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -460,46 +460,13 @@ private:
case EUploadSource::CSV:
{
auto& data = GetSourceData();
- auto& cvsSettings = GetCsvSettings();
- ui32 skipRows = cvsSettings.skip_rows();
- auto& delimiter = cvsSettings.delimiter();
- auto& nullValue = cvsSettings.null_value();
- bool withHeader = cvsSettings.header();
-
- auto reader = NFormats::TArrowCSV::Create(SrcColumns, withHeader, NotNullColumns);
+ auto& csvSettings = GetCsvSettings();
+ auto reader = NFormats::TArrowCSVScheme::Create(SrcColumns, csvSettings.header(), NotNullColumns);
if (!reader.ok()) {
errorMessage = reader.status().ToString();
return false;
}
- const auto& quoting = cvsSettings.quoting();
- if (quoting.quote_char().length() > 1) {
- errorMessage = TStringBuilder() << "Wrong quote char '" << quoting.quote_char() << "'";
- return false;
- }
- const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front();
- reader->SetQuoting(!quoting.disabled(), qchar, !quoting.double_quote_disabled());
- reader->SetSkipRows(skipRows);
-
- if (!delimiter.empty()) {
- if (delimiter.size() != 1) {
- errorMessage = TStringBuilder() << "Wrong delimiter '" << delimiter << "'";
- return false;
- }
-
- reader->SetDelimiter(delimiter[0]);
- }
-
- if (!nullValue.empty()) {
- reader->SetNullValue(nullValue);
- }
-
- if (data.size() > NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) {
- ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE;
- blockSize *= data.size() / blockSize + 1;
- reader->SetBlockSize(blockSize);
- }
-
- Batch = reader->ReadSingleBatch(data, errorMessage);
+ Batch = reader->ReadSingleBatch(data, csvSettings, errorMessage);
if (!Batch) {
return false;
}
diff --git a/ydb/core/io_formats/arrow/csv_arrow.cpp b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.cpp
index a3877733b7..7a756691d8 100644
--- a/ydb/core/io_formats/arrow/csv_arrow.cpp
+++ b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.cpp
@@ -1,8 +1,7 @@
#include "csv_arrow.h"
-#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/core/formats/arrow/serializer/stream.h>
-
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h>
#include <util/string/join.h>
@@ -43,29 +42,6 @@ public:
}
-arrow::Result<TArrowCSV> TArrowCSV::Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header, const std::set<std::string>& notNullColumns) {
- TVector<TString> errors;
- TColummns convertedColumns;
- convertedColumns.reserve(columns.size());
- for (auto& [name, type] : columns) {
- const auto arrowType = NArrow::GetArrowType(type);
- if (!arrowType.ok()) {
- errors.emplace_back("column " + name + ": " + arrowType.status().ToString());
- continue;
- }
- const auto csvArrowType = NArrow::GetCSVArrowType(type);
- if (!csvArrowType.ok()) {
- errors.emplace_back("column " + name + ": " + csvArrowType.status().ToString());
- continue;
- }
- convertedColumns.emplace_back(TColumnInfo{name, *arrowType, *csvArrowType});
- }
- if (!errors.empty()) {
- return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors));
- }
- return TArrowCSV(convertedColumns, header, notNullColumns);
-}
-
TArrowCSV::TArrowCSV(const TColummns& columns, bool header, const std::set<std::string>& notNullColumns)
: ReadOptions(arrow::csv::ReadOptions::Defaults())
, ParseOptions(arrow::csv::ParseOptions::Defaults())
@@ -107,6 +83,27 @@ TArrowCSV::TArrowCSV(const TColummns& columns, bool header, const std::set<std::
SetNullValue(); // set default null value
}
+namespace {
+
+ template<class TBuilder, class TOriginalArray>
+ std::shared_ptr<arrow::Array> ConvertArray(std::shared_ptr<arrow::ArrayData> data, ui64 dev) {
+ auto originalArr = std::make_shared<TOriginalArray>(data);
+ TBuilder aBuilder;
+ Y_ABORT_UNLESS(aBuilder.Reserve(originalArr->length()).ok());
+ for (long i = 0; i < originalArr->length(); ++i) {
+ if (originalArr->IsNull(i)) {
+ Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
+ } else {
+ aBuilder.UnsafeAppend(originalArr->Value(i) / dev);
+ }
+ }
+ auto res = aBuilder.Finish();
+ Y_ABORT_UNLESS(res.ok());
+ return *res;
+ }
+
+}
+
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const {
if (!parsedBatch) {
return nullptr;
@@ -134,59 +131,20 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
if (fArr->type()->Equals(originalType)) {
resultColumns.emplace_back(fArr);
} else if (fArr->type()->id() == arrow::TimestampType::type_id) {
- arrow::Result<std::shared_ptr<arrow::Array>> arrResult;
- {
- std::shared_ptr<arrow::TimestampArray> i64Arr = std::make_shared<arrow::TimestampArray>(fArr->data());
- if (originalType->id() == arrow::UInt16Type::type_id) {
- arrow::UInt16Builder aBuilder;
- Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
- for (long i = 0; i < parsedBatch->num_rows(); ++i) {
- if (i64Arr->IsNull(i)) {
- Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
- } else {
- aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400ull);
- }
- }
- arrResult = aBuilder.Finish();
- } else if (originalType->id() == arrow::UInt32Type::type_id) {
- arrow::UInt32Builder aBuilder;
- Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
- for (long i = 0; i < parsedBatch->num_rows(); ++i) {
- if (i64Arr->IsNull(i)) {
- Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
- } else {
- aBuilder.UnsafeAppend(i64Arr->Value(i));
- }
- }
- arrResult = aBuilder.Finish();
- } else if (originalType->id() == arrow::Int32Type::type_id) {
- arrow::Int32Builder aBuilder;
- Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
- for (long i = 0; i < parsedBatch->num_rows(); ++i) {
- if (i64Arr->IsNull(i)) {
- Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
- } else {
- aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400);
- }
- }
- arrResult = aBuilder.Finish();
- } else if (originalType->id() == arrow::Int64Type::type_id) {
- arrow::Int64Builder aBuilder;
- Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
- for (long i = 0; i < parsedBatch->num_rows(); ++i) {
- if (i64Arr->IsNull(i)) {
- Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
- } else {
- aBuilder.UnsafeAppend(i64Arr->Value(i));
- }
- }
- arrResult = aBuilder.Finish();
- } else {
+ resultColumns.emplace_back([originalType, fArr]() {
+ switch (originalType->id()) {
+ case arrow::UInt16Type::type_id: // Date
+ return ConvertArray<arrow::UInt16Builder, arrow::TimestampArray>(fArr->data(), 86400);
+ case arrow::UInt32Type::type_id: // Datetime
+ return ConvertArray<arrow::UInt32Builder, arrow::TimestampArray>(fArr->data(), 1);
+ case arrow::Int32Type::type_id: // Date32
+ return ConvertArray<arrow::Int32Builder, arrow::TimestampArray>(fArr->data(), 86400);
+ case arrow::Int64Type::type_id:// Datetime64, Timestamp64
+ return ConvertArray<arrow::Int64Builder, arrow::TimestampArray>(fArr->data(), 1);
+ default:
Y_ABORT_UNLESS(false);
}
- }
- Y_ABORT_UNLESS(arrResult.ok());
- resultColumns.emplace_back(*arrResult);
+ }());
} else {
Y_ABORT_UNLESS(false);
}
@@ -204,7 +162,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
return {};
}
- auto buffer = std::make_shared<NArrow::NSerialization::TBufferOverString>(csv);
+ auto buffer = std::make_shared<arrow::Buffer>(arrow::util::string_view(csv.c_str(), csv.length()));
auto input = std::make_shared<arrow::io::BufferReader>(buffer);
auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input,
ReadOptions, ParseOptions, ConvertOptions);
@@ -249,11 +207,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
return {};
}
- if (batch && ResultColumns.size()) {
- batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(batch, ResultColumns);
- if (!batch) {
- errString = ErrorPrefix() + "not all result columns present";
- }
+ if (batch && ResultColumns.size() && batch->schema()->fields().size() != ResultColumns.size()) {
+ errString = ErrorPrefix() + "not all result columns present";
+ batch.reset();
}
return batch;
}
@@ -279,5 +235,34 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& cs
}
return batch;
}
+std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, const Ydb::Formats::CsvSettings& csvSettings, TString& errString) {
+ const auto& quoting = csvSettings.quoting();
+ if (quoting.quote_char().length() > 1) {
+ errString = ErrorPrefix() + "Wrong quote char '" + quoting.quote_char() + "'";
+ return {};
+ }
+
+ const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front();
+ SetQuoting(!quoting.disabled(), qchar, !quoting.double_quote_disabled());
+ if (csvSettings.delimiter()) {
+ if (csvSettings.delimiter().size() != 1) {
+ errString = ErrorPrefix() + "Invalid delimitr in csv: " + csvSettings.delimiter();
+ return {};
+ }
+ SetDelimiter(csvSettings.delimiter().front());
+ }
+ SetSkipRows(csvSettings.skip_rows());
+
+ if (csvSettings.null_value()) {
+ SetNullValue(csvSettings.null_value());
+ }
+
+ if (csv.size() > NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) {
+ ui32 blockSize = NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE;
+ blockSize *= csv.size() / blockSize + 1;
+ SetBlockSize(blockSize);
+ }
+ return ReadSingleBatch(csv, errString);
+}
}
diff --git a/ydb/core/io_formats/arrow/csv_arrow.h b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h
index 49fd0cd84c..86ba35af40 100644
--- a/ydb/core/io_formats/arrow/csv_arrow.h
+++ b/ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h
@@ -1,9 +1,13 @@
#pragma once
-#include <ydb/core/scheme_types/scheme_type_info.h>
-
+#include <ydb/public/api/protos/ydb_formats.pb.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/csv/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <set>
+#include <vector>
+#include <unordered_map>
namespace NKikimr::NFormats {
@@ -11,12 +15,9 @@ class TArrowCSV {
public:
static constexpr ui32 DEFAULT_BLOCK_SIZE = 1024 * 1024;
- /// If header is true read column names from first line after skipRows. Parse columns as strings in this case.
- /// @note It's possible to skip header with skipRows and use typed columns instead.
- static arrow::Result<TArrowCSV> Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false, const std::set<std::string>& notNullColumns = {});
-
std::shared_ptr<arrow::RecordBatch> ReadNext(const TString& csv, TString& errString);
std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, TString& errString);
+ std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, const Ydb::Formats::CsvSettings& csvSettings, TString& errString);
void Reset() {
Reader = {};
@@ -49,7 +50,7 @@ public:
void SetNullValue(const TString& null = "");
-private:
+protected:
struct TColumnInfo {
TString Name;
std::shared_ptr<arrow::DataType> ArrowType;
@@ -57,6 +58,12 @@ private:
};
using TColummns = TVector<TColumnInfo>;
TArrowCSV(const TColummns& columns, bool header, const std::set<std::string>& notNullColumns);
+
+ static TString ErrorPrefix() {
+ return "Cannot read CSV: ";
+ }
+
+private:
arrow::csv::ReadOptions ReadOptions;
arrow::csv::ParseOptions ParseOptions;
arrow::csv::ConvertOptions ConvertOptions;
@@ -66,10 +73,6 @@ private:
std::set<std::string> NotNullColumns;
std::shared_ptr<arrow::RecordBatch> ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const;
-
- static TString ErrorPrefix() {
- return "Cannot read CSV: ";
- }
};
}
diff --git a/ydb/core/io_formats/arrow/csv_arrow/ya.make b/ydb/core/io_formats/arrow/csv_arrow/ya.make
new file mode 100644
index 0000000000..5daf419b6f
--- /dev/null
+++ b/ydb/core/io_formats/arrow/csv_arrow/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ csv_arrow.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/public/api/protos
+)
+
+END()
diff --git a/ydb/core/io_formats/arrow/csv_arrow_ut.cpp b/ydb/core/io_formats/arrow/csv_arrow_ut.cpp
index aa37ebbdbc..736fcb32c3 100644
--- a/ydb/core/io_formats/arrow/csv_arrow_ut.cpp
+++ b/ydb/core/io_formats/arrow/csv_arrow_ut.cpp
@@ -1,4 +1,4 @@
-#include "csv_arrow.h"
+#include <ydb/core/io_formats/arrow/scheme/scheme.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -64,7 +64,7 @@ TestReadSingleBatch(TArrowCSV& reader,
std::shared_ptr<arrow::RecordBatch>
TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data,
char delimiter, bool header, ui32 numRows, ui32 skipRows = 0, std::optional<char> escape = {}) {
- auto reader = TArrowCSV::Create(columns, header);
+ auto reader = TArrowCSVScheme::Create(columns, header);
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
reader->SetDelimiter(delimiter);
if (skipRows) {
@@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
};
TInstant dtInstant;
Y_ABORT_UNLESS(TInstant::TryParseIso8601(dateTimeString, dtInstant));
- auto reader = TArrowCSV::Create(columns, false);
+ auto reader = TArrowCSVScheme::Create(columns, false);
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
TString errorMessage;
@@ -159,7 +159,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
{
- auto reader = TArrowCSV::Create(columns, false);
+ auto reader = TArrowCSVScheme::Create(columns, false);
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
TString errorMessage;
@@ -175,7 +175,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
{"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)}
};
- auto reader = TArrowCSV::Create(columns, false);
+ auto reader = TArrowCSVScheme::Create(columns, false);
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
TString errorMessage;
@@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
csv += TString() + null + delimiter + q + null + q + delimiter + q + null + q + endLine;
csv += TString() + null + delimiter + null + delimiter + null + endLine;
- auto reader = TArrowCSV::Create(columns, false);
+ auto reader = TArrowCSVScheme::Create(columns, false);
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
if (!nulls.empty() || !defaultNull) {
reader->SetNullValue(null);
diff --git a/ydb/core/io_formats/arrow/scheme/scheme.cpp b/ydb/core/io_formats/arrow/scheme/scheme.cpp
new file mode 100644
index 0000000000..c5281943fe
--- /dev/null
+++ b/ydb/core/io_formats/arrow/scheme/scheme.cpp
@@ -0,0 +1,30 @@
+#include "scheme.h"
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <util/string/join.h>
+
+namespace NKikimr::NFormats {
+
+arrow::Result<TArrowCSV> TArrowCSVScheme::Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header, const std::set<std::string>& notNullColumns) {
+ TVector<TString> errors;
+ TColummns convertedColumns;
+ convertedColumns.reserve(columns.size());
+ for (auto& [name, type] : columns) {
+ const auto arrowType = NArrow::GetArrowType(type);
+ if (!arrowType.ok()) {
+ errors.emplace_back("column " + name + ": " + arrowType.status().ToString());
+ continue;
+ }
+ const auto csvArrowType = NArrow::GetCSVArrowType(type);
+ if (!csvArrowType.ok()) {
+ errors.emplace_back("column " + name + ": " + csvArrowType.status().ToString());
+ continue;
+ }
+ convertedColumns.emplace_back(TColumnInfo{name, *arrowType, *csvArrowType});
+ }
+ if (!errors.empty()) {
+ return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors));
+ }
+ return TArrowCSVScheme(convertedColumns, header, notNullColumns);
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/io_formats/arrow/scheme/scheme.h b/ydb/core/io_formats/arrow/scheme/scheme.h
new file mode 100644
index 0000000000..3f6c47cf84
--- /dev/null
+++ b/ydb/core/io_formats/arrow/scheme/scheme.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h>
+#include <ydb/core/scheme_types/scheme_type_info.h>
+
+namespace NKikimr::NFormats {
+
+class TArrowCSVScheme: public TArrowCSV {
+ using TArrowCSV::TArrowCSV;
+public:
+ /// If header is true read column names from first line after skipRows. Parse columns as strings in this case.
+ /// @note It's possible to skip header with skipRows and use typed columns instead.
+ static arrow::Result<TArrowCSV> Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false, const std::set<std::string>& notNullColumns = {});
+
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/io_formats/arrow/scheme/ya.make b/ydb/core/io_formats/arrow/scheme/ya.make
new file mode 100644
index 0000000000..d11eb59149
--- /dev/null
+++ b/ydb/core/io_formats/arrow/scheme/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ scheme.cpp
+)
+
+PEERDIR(
+ ydb/core/formats/arrow
+ ydb/core/io_formats/arrow/csv_arrow
+ ydb/core/scheme_types
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/io_formats/arrow/table/table.cpp b/ydb/core/io_formats/arrow/table/table.cpp
new file mode 100644
index 0000000000..0364acfc70
--- /dev/null
+++ b/ydb/core/io_formats/arrow/table/table.cpp
@@ -0,0 +1,120 @@
+#include "table.h"
+#include <util/string/join.h>
+
+namespace NKikimr::NFormats {
+
+arrow::Result<TArrowCSV> TArrowCSVTable::Create(const TVector<NYdb::NTable::TTableColumn>& columns, bool header) {
+ TVector<TString> errors;
+ TColummns convertedColumns;
+ convertedColumns.reserve(columns.size());
+ std::set<std::string> notNullColumns;
+ for (auto& column : columns) {
+ const auto arrowType = GetArrowType(column.Type);
+ if (!arrowType.ok()) {
+ errors.emplace_back("column " + column.Name + ": " + arrowType.status().ToString());
+ continue;
+ }
+ const auto csvArrowType = GetCSVArrowType(column.Type);
+ if (!csvArrowType.ok()) {
+ errors.emplace_back("column " + column.Name + ": " + csvArrowType.status().ToString());
+ continue;
+ }
+ convertedColumns.emplace_back(TColumnInfo{column.Name, *arrowType, *csvArrowType});
+ if (NYdb::TTypeParser(column.Type).GetKind() != NYdb::TTypeParser::ETypeKind::Optional || column.NotNull.value_or(false)) {
+ notNullColumns.emplace(column.Name);
+ }
+ }
+ if (!errors.empty()) {
+ return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors));
+ }
+ return TArrowCSVTable(convertedColumns, header, notNullColumns);
+}
+
+NYdb::TTypeParser TArrowCSVTable::ExtractType(const NYdb::TType& type) {
+ NYdb::TTypeParser tp(type);
+ if (tp.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
+ tp.OpenOptional();
+ }
+ return std::move(tp);
+}
+
+arrow::Result<std::shared_ptr<arrow::DataType>> TArrowCSVTable::GetArrowType(const NYdb::TType& type) {
+ auto tp = ExtractType(type);
+ switch (tp.GetKind()) {
+ case NYdb::TTypeParser::ETypeKind::Decimal:
+ return arrow::decimal(tp.GetDecimal().Precision, tp.GetDecimal().Scale);
+ case NYdb::TTypeParser::ETypeKind::Primitive:
+ switch (tp.GetPrimitive()) {
+ case NYdb::EPrimitiveType::Bool:
+ return arrow::boolean();
+ case NYdb::EPrimitiveType::Int8:
+ return arrow::int8();
+ case NYdb::EPrimitiveType::Uint8:
+ return arrow::uint8();
+ case NYdb::EPrimitiveType::Int16:
+ return arrow::int16();
+ case NYdb::EPrimitiveType::Date:
+ return arrow::uint16();
+ case NYdb::EPrimitiveType::Date32:
+ return arrow::int32();
+ case NYdb::EPrimitiveType::Datetime:
+ return arrow::uint32();
+ case NYdb::EPrimitiveType::Uint16:
+ return arrow::uint16();
+ case NYdb::EPrimitiveType::Int32:
+ return arrow::int32();
+ case NYdb::EPrimitiveType::Uint32:
+ return arrow::uint32();
+ case NYdb::EPrimitiveType::Int64:
+ return arrow::int64();
+ case NYdb::EPrimitiveType::Uint64:
+ return arrow::uint64();
+ case NYdb::EPrimitiveType::Float:
+ return arrow::float32();
+ case NYdb::EPrimitiveType::Double:
+ return arrow::float64();
+ case NYdb::EPrimitiveType::Utf8:
+ case NYdb::EPrimitiveType::Json:
+ return arrow::utf8();
+ case NYdb::EPrimitiveType::String:
+ case NYdb::EPrimitiveType::Yson:
+ case NYdb::EPrimitiveType::DyNumber:
+ case NYdb::EPrimitiveType::JsonDocument:
+ return arrow::binary();
+ case NYdb::EPrimitiveType::Timestamp:
+ return arrow::timestamp(arrow::TimeUnit::MICRO);
+ case NYdb::EPrimitiveType::Interval:
+ return arrow::duration(arrow::TimeUnit::MILLI);
+ case NYdb::EPrimitiveType::Datetime64:
+ case NYdb::EPrimitiveType::Interval64:
+ case NYdb::EPrimitiveType::Timestamp64:
+ return arrow::int64();
+ default:
+ return arrow::Status::TypeError(ErrorPrefix() + "Not supported type " + ToString(tp.GetPrimitive()));
+ }
+ default:
+ return arrow::Status::TypeError(ErrorPrefix() + "Not supported type kind " + ToString(tp.GetKind()));
+ }
+}
+
+arrow::Result<std::shared_ptr<arrow::DataType>> TArrowCSVTable::GetCSVArrowType(const NYdb::TType& type) {
+ auto tp = ExtractType(type);
+ if (tp.GetKind() == NYdb::TTypeParser::ETypeKind::Primitive) {
+ switch (tp.GetPrimitive()) {
+ case NYdb::EPrimitiveType::Datetime:
+ case NYdb::EPrimitiveType::Datetime64:
+ return arrow::timestamp(arrow::TimeUnit::SECOND);
+ case NYdb::EPrimitiveType::Timestamp:
+ case NYdb::EPrimitiveType::Timestamp64:
+ return arrow::timestamp(arrow::TimeUnit::MICRO);
+ case NYdb::EPrimitiveType::Date:
+ case NYdb::EPrimitiveType::Date32:
+ return arrow::timestamp(arrow::TimeUnit::SECOND);
+ default:
+ break;
+ }
+ }
+ return GetArrowType(type);
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/io_formats/arrow/table/table.h b/ydb/core/io_formats/arrow/table/table.h
new file mode 100644
index 0000000000..7e686e9e97
--- /dev/null
+++ b/ydb/core/io_formats/arrow/table/table.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NKikimr::NFormats {
+
+class TArrowCSVTable: public TArrowCSV {
+ using TArrowCSV::TArrowCSV;
+public:
+ /// If header is true read column names from first line after skipRows. Parse columns as strings in this case.
+ /// @note It's possible to skip header with skipRows and use typed columns instead.
+ static arrow::Result<TArrowCSV> Create(const TVector<NYdb::NTable::TTableColumn>& columns, bool header = false);
+
+private:
+ static NYdb::TTypeParser ExtractType(const NYdb::TType& type);
+ static arrow::Result<std::shared_ptr<arrow::DataType>> GetArrowType(const NYdb::TType& type);
+ static arrow::Result<std::shared_ptr<arrow::DataType>> GetCSVArrowType(const NYdb::TType& type);
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/io_formats/arrow/table/ya.make b/ydb/core/io_formats/arrow/table/ya.make
new file mode 100644
index 0000000000..273fea3bf7
--- /dev/null
+++ b/ydb/core/io_formats/arrow/table/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ table.cpp
+)
+
+PEERDIR(
+ ydb/core/io_formats/arrow/csv_arrow
+ ydb/public/sdk/cpp/client/ydb_table
+)
+
+END()
diff --git a/ydb/core/io_formats/arrow/ut/ya.make b/ydb/core/io_formats/arrow/ut/ya.make
index f957632820..04f06d111f 100644
--- a/ydb/core/io_formats/arrow/ut/ya.make
+++ b/ydb/core/io_formats/arrow/ut/ya.make
@@ -3,7 +3,7 @@ UNITTEST_FOR(ydb/core/io_formats/arrow)
SIZE(SMALL)
PEERDIR(
- ydb/core/io_formats/arrow
+ ydb/core/io_formats/arrow/scheme
# for NYql::NUdf alloc stuff used in binary_json
yql/essentials/public/udf/service/exception_policy
diff --git a/ydb/core/io_formats/arrow/ya.make b/ydb/core/io_formats/arrow/ya.make
index 58ba9c23ba..e8969661e3 100644
--- a/ydb/core/io_formats/arrow/ya.make
+++ b/ydb/core/io_formats/arrow/ya.make
@@ -1,20 +1,6 @@
-RECURSE_FOR_TESTS(ut)
-
-LIBRARY()
-
-SRCS(
- csv_arrow.cpp
-)
-
-CFLAGS(
- -Wno-unused-parameter
+RECURSE(
+ csv_arrow
+ scheme
)
-PEERDIR(
- ydb/core/scheme_types
- ydb/core/formats/arrow
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
+RECURSE_FOR_TESTS(ut)
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index d92f7a0503..36fcb2d9cc 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -6,7 +6,7 @@
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/formats/arrow/converter.h>
-#include <ydb/core/io_formats/arrow/csv_arrow.h>
+#include <ydb/core/io_formats/arrow/scheme/scheme.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/path.h>
#include <ydb/core/base/feature_flags.h>
diff --git a/ydb/core/tx/tx_proxy/ya.make b/ydb/core/tx/tx_proxy/ya.make
index a5042ac58b..911b835d3a 100644
--- a/ydb/core/tx/tx_proxy/ya.make
+++ b/ydb/core/tx/tx_proxy/ya.make
@@ -33,7 +33,7 @@ PEERDIR(
ydb/core/engine
ydb/core/formats
ydb/core/grpc_services/local_rpc
- ydb/core/io_formats/arrow
+ ydb/core/io_formats/arrow/scheme
ydb/core/protos
ydb/core/scheme
ydb/core/sys_view/common
diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make
index ca8cb98765..6fe15ccccb 100644
--- a/ydb/public/lib/ydb_cli/commands/ya.make
+++ b/ydb/public/lib/ydb_cli/commands/ya.make
@@ -66,6 +66,7 @@ PEERDIR(
ydb/public/sdk/cpp/client/ydb_table
ydb/public/sdk/cpp/client/ydb_topic
ydb/public/sdk/cpp/client/ydb_types/credentials/login
+ ydb/core/io_formats/arrow/table
)
GENERATE_ENUM_SERIALIZATION(ydb_ping.h)
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
index 504715a83e..5fc4a77ec9 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
@@ -1,5 +1,9 @@
#include "ydb_workload_import.h"
+#include <ydb/core/io_formats/arrow/table/table.h>
+#include <ydb/public/api/protos/ydb_formats.pb.h>
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
+#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
#include <library/cpp/threading/future/async.h>
#include <util/generic/deque.h>
@@ -43,14 +47,14 @@ TWorkloadCommandImport::TUploadCommand::TUploadCommand(NYdbWorkload::TWorkloadPa
, Initializer(initializer)
{}
-int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& /*workloadGen*/, TConfig& /*config*/) {
+int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) {
auto dataGeneratorList = Initializer->GetBulkInitialData();
AtomicSet(ErrorsCount, 0);
InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight);
if (UploadParams.FileOutputPath.IsDefined()) {
Writer = MakeHolder<TFileWriter>(*this);
} else {
- Writer = MakeHolder<TDbWriter>(*this);
+ Writer = MakeHolder<TDbWriter>(*this, workloadGen, config);
}
for (auto dataGen : dataGeneratorList) {
TThreadPoolParams params;
@@ -75,40 +79,92 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
}
return AtomicGet(ErrorsCount) ? EXIT_FAILURE : EXIT_SUCCESS;
}
-
class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
public:
- using IWriter::IWriter;
+ TDbWriter(TWorkloadCommandImport::TUploadCommand& owner, NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config)
+ : IWriter(owner)
+ {
+ RetrySettings.RetryUndefined(true);
+ RetrySettings.MaxRetries(30);
+ for(const auto& path: workloadGen.GetCleanPaths()) {
+ const auto list = NConsoleClient::RecursiveList(*owner.SchemeClient, config.Database + "/" + path.c_str());
+ for (const auto& entry : list.Entries) {
+ if (entry.Type == NScheme::ESchemeEntryType::ColumnTable || entry.Type == NScheme::ESchemeEntryType::Table) {
+ const auto tableDescr = owner.TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync().GetSession().DescribeTable(entry.Name).ExtractValueSync().GetTableDescription();
+ auto& params = ArrowCsvParams[entry.Name];
+ params.Columns = tableDescr.GetTableColumns();
+ }
+ }
+ }
+ }
TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
- auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
- return TStatus(result.GetValueSync());
- };
if (std::holds_alternative<NYdbWorkload::IBulkDataGenerator::TDataPortion::TSkip>(portion->MutableData())) {
return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
}
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
- return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
+ return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(ConvertResult);
}
- NRetry::TRetryOperationSettings retrySettings;
- retrySettings.RetryUndefined(true);
- retrySettings.MaxRetries(30);
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
- return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
- NTable::TBulkUpsertSettings settings;
- settings.FormatSettings(value->FormatString);
- return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings)
- .Apply(convertResult);
- }, retrySettings);
+ return WriteCsv(portion);
}
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
- return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
+ return Owner.TableClient->RetryOperation([value, portion](NTable::TTableClient& client) {
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
- .Apply(convertResult);
- }, retrySettings);
+ .Apply(ConvertResult);
+ }, RetrySettings);
}
Y_FAIL_S("Invalid data portion");
}
+
+private:
+ TAsyncStatus WriteCsv(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) {
+ const auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData());
+ const auto* param = MapFindPtr(ArrowCsvParams, portion->GetTable());
+ if (!param) {
+ return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue("Table does not exist: " + portion->GetTable())})));
+ }
+ auto arrowCsv = NKikimr::NFormats::TArrowCSVTable::Create(param->Columns, true);
+ if (!arrowCsv.ok()) {
+ return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(arrowCsv.status().ToString())})));
+ }
+ Ydb::Formats::CsvSettings csvSettings;
+ if (!csvSettings.ParseFromString(value->FormatString)) {
+ return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue("Invalid format string")})));
+ };
+
+ auto writeOptions = arrow::ipc::IpcWriteOptions::Defaults();
+ constexpr auto codecType = arrow::Compression::type::ZSTD;
+ writeOptions.codec = *arrow::util::Codec::Create(codecType);
+ TString error;
+ if (auto batch = arrowCsv->ReadSingleBatch(value->Data, csvSettings, error)) {
+ if (error) {
+ return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(error)})));
+ }
+ return Owner.TableClient->RetryOperation([
+ parquet = NYdb_cli::NArrow::SerializeBatch(batch, writeOptions),
+ schema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()),
+ portion](NTable::TTableClient& client) {
+ return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, parquet, schema)
+ .Apply(ConvertResult);
+ }, RetrySettings);
+ }
+ if (error) {
+ return NThreading::MakeFuture(TStatus(EStatus::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(error)})));
+ }
+ return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
+ }
+
+ static TStatus ConvertResult(const NTable::TAsyncBulkUpsertResult& result) {
+ return TStatus(result.GetValueSync());
+ }
+
+ struct TArrowCSVParams {
+ TVector<NYdb::NTable::TTableColumn> Columns;
+ };
+
+ TMap<TString, TArrowCSVParams> ArrowCsvParams;
+ NRetry::TRetryOperationSettings RetrySettings;
};
class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
@@ -139,7 +195,10 @@ public:
return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
}
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
- return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented"));
+ auto g = Guard(Lock);
+ auto [out, created] = GetOutput(portion->GetTable());
+ out->Write(value->Data);
+ return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
}
Y_FAIL_S("Invalid data portion");
}