diff options
author | stepandrey <stepandrey@yandex-team.com> | 2022-11-30 16:32:10 +0300 |
---|---|---|
committer | stepandrey <stepandrey@yandex-team.com> | 2022-11-30 16:32:10 +0300 |
commit | b6b02f6683bc476c07dc34badbd641aa5a902e49 (patch) | |
tree | 15b63008edf61c5d29644a12fa5b14429524abee | |
parent | a8f566e1db4827fefcd4a9003b9d7894aacd9a4c (diff) | |
download | ydb-b6b02f6683bc476c07dc34badbd641aa5a902e49.tar.gz |
support Parquet in ydb cli
parquet file import implementation
17 files changed, 645 insertions, 1 deletions
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h new file mode 100644 index 0000000000..c0d772c697 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/util/config.h" // IWYU pragma: export + +#include "arrow/filesystem/filesystem.h" // IWYU pragma: export +#include "arrow/filesystem/localfs.h" // IWYU pragma: export +#include "arrow/filesystem/mockfs.h" // IWYU pragma: export +#ifdef ARROW_S3 +#error #include "arrow/filesystem/s3fs.h" // IWYU pragma: export +#endif diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/python/api.h b/contrib/libs/apache/arrow/cpp/src/arrow/python/api.h new file mode 100644 index 0000000000..a0b13d6d13 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/arrow/python/api.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/python/arrow_to_pandas.h" +#include "arrow/python/common.h" +#include "arrow/python/datetime.h" +#include "arrow/python/deserialize.h" +#include "arrow/python/helpers.h" +#include "arrow/python/inference.h" +#include "arrow/python/io.h" +#include "arrow/python/numpy_convert.h" +#include "arrow/python/numpy_to_arrow.h" +#include "arrow/python/python_to_arrow.h" +#include "arrow/python/serialize.h" diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/io.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/io.h new file mode 100644 index 0000000000..28a00f12a7 --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/io.h @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "parquet/exception.h" diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h new file mode 100644 index 0000000000..7e746e8c5b --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +// Column reader API +#include "parquet/column_reader.h" +#include "parquet/column_scanner.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/metadata.h" +#include "parquet/platform.h" +#include "parquet/printer.h" +#include "parquet/properties.h" +#include "parquet/statistics.h" + +// Schemas +#include "parquet/api/schema.h" + +// IO +#include "parquet/api/io.h" diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h new file mode 100644 index 0000000000..7ca714f47b --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +// Schemas +#include "parquet/schema.h" diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h new file mode 100644 index 0000000000..b072dcf74d --- /dev/null +++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "parquet/api/io.h" +#include "parquet/api/schema.h" +#include "parquet/column_writer.h" +#include "parquet/exception.h" +#include "parquet/file_writer.h" +#include "parquet/statistics.h" diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index 63420950a2..42747d12d8 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -140,6 +140,7 @@ TCommandImportFromFile::TCommandImportFromFile() AddCommand(std::make_unique<TCommandImportFromCsv>()); AddCommand(std::make_unique<TCommandImportFromTsv>()); AddCommand(std::make_unique<TCommandImportFromJson>()); + AddCommand(std::make_unique<TCommandImportFromParquet>()); } /// Import File Shared Config @@ -255,5 +256,21 @@ int TCommandImportFromJson::Run(TConfig& config) { return EXIT_SUCCESS; } +/// Import Parquet +void TCommandImportFromParquet::Config(TConfig& config) { + TCommandImportFileBase::Config(config); +} + +int TCommandImportFromParquet::Run(TConfig& config) { + TImportFileSettings settings; + settings.Format(InputFormat); + settings.MaxInFlightRequests(MaxInFlightRequests); + settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); + + TImportFileClient client(CreateDriver(config)); + ThrowOnError(client.Import(FilePath, Path, settings)); + + return EXIT_SUCCESS; +} } } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h index af304b92a6..ddb2c2c272 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h @@ -101,5 +101,15 @@ public: int Run(TConfig& config) override; }; +class TCommandImportFromParquet : public TCommandImportFileBase { +public: + TCommandImportFromParquet(const TString& cmd = "parquet", const TString& cmdDescription = "Import data from Parquet file") + : TCommandImportFileBase(cmd, cmdDescription) + { + InputFormat = EOutputFormat::Parquet; + } + void Config(TConfig& config) override; + int Run(TConfig& config) override; +}; } } diff --git a/ydb/public/lib/ydb_cli/common/formats.h b/ydb/public/lib/ydb_cli/common/formats.h index 09a893db58..4756529273 100644 --- a/ydb/public/lib/ydb_cli/common/formats.h +++ b/ydb/public/lib/ydb_cli/common/formats.h @@ -16,6 +16,7 @@ enum class EOutputFormat { ProtoJsonBase64 /* "proto-json-base64" */, Csv /* "csv" */, Tsv /* "tsv" */, + Parquet /* "parquet" */, }; // EMessagingFormat to be used in both input and output when working with files/pipes in operations related to messaging diff --git a/ydb/public/lib/ydb_cli/import/CMakeLists.txt b/ydb/public/lib/ydb_cli/import/CMakeLists.txt index d28016ad5b..7e7520d171 100644 --- a/ydb/public/lib/ydb_cli/import/CMakeLists.txt +++ b/ydb/public/lib/ydb_cli/import/CMakeLists.txt @@ -15,7 +15,9 @@ target_link_libraries(lib-ydb_cli-import PUBLIC common cpp-client-ydb_proto public-lib-json_value + libs-apache-arrow ) target_sources(lib-ydb_cli-import PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/import/import.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp ) diff --git a/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp new file mode 100644 index 0000000000..28a1c30087 --- /dev/null +++ b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp @@ -0,0 +1,183 @@ +/* + This file contains code copied from core/formats/arrow_helpers.cpp in order to cut client dependecies +*/ + + +#include "cli_arrow_helpers.h" +#include "cli_switch_type.h" + +#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> +#include <util/generic/vector.h> +#include <util/stream/file.h> +#include <util/string/builder.h> +#include <util/folder/path.h> + +#define Y_VERIFY_OK(status) Y_VERIFY(status.ok(), "%s", status.ToString().c_str()) + +namespace NYdb_cli::NArrow { + std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& table) { + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(table->num_columns()); + for (auto& col : table->columns()) { + Y_VERIFY(col->num_chunks() == 1); + columns.push_back(col->chunk(0)); + } + return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns); + } + ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch) { + if (!batch) { + return 0; + } + ui64 bytes = 0; + for (auto& column : batch->columns()) { // TODO: use column_data() instead of columns() + bytes += GetArrayDataSize(column); + } + return bytes; + } + + template <typename TType> + ui64 GetArrayDataSizeImpl(const std::shared_ptr<arrow::Array>& column) { + return sizeof(typename TType::c_type) * column->length(); + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::NullType>(const std::shared_ptr<arrow::Array>& column) { + return column->length() * 8; // Special value for empty lines + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::StringType>(const std::shared_ptr<arrow::Array>& column) { + auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column); + return typedColumn->total_values_length(); + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::LargeStringType>(const std::shared_ptr<arrow::Array>& column) { + auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column); + return typedColumn->total_values_length(); + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::BinaryType>(const std::shared_ptr<arrow::Array>& column) { + auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column); + return typedColumn->total_values_length(); + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::LargeBinaryType>(const std::shared_ptr<arrow::Array>& column) { + auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column); + return typedColumn->total_values_length(); + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::FixedSizeBinaryType>(const std::shared_ptr<arrow::Array>& column) { + auto typedColumn = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(column); + return typedColumn->byte_width() * typedColumn->length(); + } + + template <> + ui64 GetArrayDataSizeImpl<arrow::Decimal128Type>(const std::shared_ptr<arrow::Array>& column) { + return sizeof(ui64) * 2 * column->length(); + } + + ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) { + auto type = column->type(); + ui64 bytes = 0; + bool success = SwitchTypeWithNull(type->id(), [&]<typename TType>(TTypeWrapper<TType> typeHolder) { + Y_UNUSED(typeHolder); + bytes = GetArrayDataSizeImpl<TType>(column); + return true; + }); + + // Add null bit mask overhead if any. + if (HasNulls(column)) { + bytes += column->length() / 8 + 1; + } + + Y_VERIFY_DEBUG(success, "Unsupported arrow type %s", type->ToString().data()); + return bytes; + } + + namespace { + class TFixedStringOutputStream final : public arrow::io::OutputStream { + public: + TFixedStringOutputStream(TString* out) + : Out(out) + , Position(0) + { } + + arrow::Status Close() override { + Out = nullptr; + return arrow::Status::OK(); + } + + bool closed() const override { + return Out == nullptr; + } + + arrow::Result<int64_t> Tell() const override { + return Position; + } + + arrow::Status Write(const void* data, int64_t nbytes) override { + if (Y_LIKELY(nbytes > 0)) { + Y_VERIFY(Out && Out->size() - Position >= ui64(nbytes)); + char* dst = &(*Out)[Position]; + ::memcpy(dst, data, nbytes); + Position += nbytes; + } + + return arrow::Status::OK(); + } + + size_t GetPosition() const { + return Position; + } + + private: + TString* Out; + size_t Position; + }; + } + + TString SerializeSchema(const arrow::Schema& schema) { + auto buffer = arrow::ipc::SerializeSchema(schema); + if (!buffer.ok()) { + return {}; + } + return TString((const char*)(*buffer)->data(), (*buffer)->size()); + } + + TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) { + arrow::ipc::IpcPayload payload; + auto status = arrow::ipc::GetRecordBatchPayload(*batch, options, &payload); + Y_VERIFY_OK(status); + + int32_t metadata_length = 0; + arrow::io::MockOutputStream mock; + status = arrow::ipc::WriteIpcPayload(payload, options, &mock, &metadata_length); + Y_VERIFY_OK(status); + + TString str; + str.resize(mock.GetExtentBytesWritten()); + + TFixedStringOutputStream out(&str); + status = arrow::ipc::WriteIpcPayload(payload, options, &out, &metadata_length); + Y_VERIFY_OK(status); + Y_VERIFY(out.GetPosition() == str.size()); + + return str; + } + + TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) { + auto writeOptions = arrow::ipc::IpcWriteOptions::Defaults(); + writeOptions.use_threads = false; + return SerializeBatch(batch, writeOptions); + } + +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h new file mode 100644 index 0000000000..cbc301c6f9 --- /dev/null +++ b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h @@ -0,0 +1,26 @@ +/* + This file contains code copied from core/formats/arrow_helpers.h in order to cut client dependecies +*/ + +#pragma once + +#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h> +#include <util/generic/vector.h> +#include <util/stream/file.h> +#include <util/string/builder.h> +#include <util/folder/path.h> + +namespace NYdb_cli::NArrow { + std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable); + ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch); + ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column); + TString SerializeSchema(const arrow::Schema& schema); + TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options); + TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch); + inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) { + return column->null_bitmap_data(); + } +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/import/cli_switch_type.h b/ydb/public/lib/ydb_cli/import/cli_switch_type.h new file mode 100644 index 0000000000..a97cccd914 --- /dev/null +++ b/ydb/public/lib/ydb_cli/import/cli_switch_type.h @@ -0,0 +1,96 @@ +/* + This file contains code copied from core/formats/switch_type.h in order to cut client dependecies +*/ + +#pragma once + +namespace NYdb_cli::NArrow { + +template <typename TType> +struct TTypeWrapper +{ + using T = TType; +}; + +template <typename TFunc, bool EnableNull = false> +bool SwitchType(arrow::Type::type typeId, TFunc&& f) { + switch (typeId) { + case arrow::Type::NA: { + if constexpr (EnableNull) { + return f(TTypeWrapper<arrow::NullType>()); + } + break; + } + case arrow::Type::BOOL: + return f(TTypeWrapper<arrow::BooleanType>()); + case arrow::Type::UINT8: + return f(TTypeWrapper<arrow::UInt8Type>()); + case arrow::Type::INT8: + return f(TTypeWrapper<arrow::Int8Type>()); + case arrow::Type::UINT16: + return f(TTypeWrapper<arrow::UInt16Type>()); + case arrow::Type::INT16: + return f(TTypeWrapper<arrow::Int16Type>()); + case arrow::Type::UINT32: + return f(TTypeWrapper<arrow::UInt32Type>()); + case arrow::Type::INT32: + return f(TTypeWrapper<arrow::Int32Type>()); + case arrow::Type::UINT64: + return f(TTypeWrapper<arrow::UInt64Type>()); + case arrow::Type::INT64: + return f(TTypeWrapper<arrow::Int64Type>()); + case arrow::Type::HALF_FLOAT: + return f(TTypeWrapper<arrow::HalfFloatType>()); + case arrow::Type::FLOAT: + return f(TTypeWrapper<arrow::FloatType>()); + case arrow::Type::DOUBLE: + return f(TTypeWrapper<arrow::DoubleType>()); + case arrow::Type::STRING: + return f(TTypeWrapper<arrow::StringType>()); + case arrow::Type::BINARY: + return f(TTypeWrapper<arrow::BinaryType>()); + case arrow::Type::FIXED_SIZE_BINARY: + return f(TTypeWrapper<arrow::FixedSizeBinaryType>()); + case arrow::Type::DATE32: + return f(TTypeWrapper<arrow::Date32Type>()); + case arrow::Type::DATE64: + return f(TTypeWrapper<arrow::Date64Type>()); + case arrow::Type::TIMESTAMP: + return f(TTypeWrapper<arrow::TimestampType>()); + case arrow::Type::TIME32: + return f(TTypeWrapper<arrow::Time32Type>()); + case arrow::Type::TIME64: + return f(TTypeWrapper<arrow::Time64Type>()); + case arrow::Type::INTERVAL_MONTHS: + return f(TTypeWrapper<arrow::MonthIntervalType>()); + case arrow::Type::DECIMAL: + return f(TTypeWrapper<arrow::Decimal128Type>()); + case arrow::Type::DURATION: + return f(TTypeWrapper<arrow::DurationType>()); + case arrow::Type::LARGE_STRING: + return f(TTypeWrapper<arrow::LargeStringType>()); + case arrow::Type::LARGE_BINARY: + return f(TTypeWrapper<arrow::LargeBinaryType>()); + case arrow::Type::DECIMAL256: + case arrow::Type::DENSE_UNION: + case arrow::Type::DICTIONARY: + case arrow::Type::EXTENSION: + case arrow::Type::FIXED_SIZE_LIST: + case arrow::Type::INTERVAL_DAY_TIME: + case arrow::Type::LARGE_LIST: + case arrow::Type::LIST: + case arrow::Type::MAP: + case arrow::Type::MAX_ID: + case arrow::Type::SPARSE_UNION: + case arrow::Type::STRUCT: + break; + } + + return false; +} + +template <typename TFunc> +bool SwitchTypeWithNull(arrow::Type::type typeId, TFunc&& f) { + return SwitchType<TFunc, true>(typeId, std::move(f)); +} +}
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 5843c8d0f5..6b61c6bcfd 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -19,6 +19,15 @@ #include <deque> +#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/api.h> +#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h> +#include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h> +#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h> +#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h> + namespace NYdb { namespace NConsoleClient { @@ -87,6 +96,8 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath case EOutputFormat::JsonUnicode: case EOutputFormat::JsonBase64: return UpsertJson(input, dbPath, settings); + case EOutputFormat::Parquet: + return UpsertParquet(filePath, dbPath, settings); default: ; } return MakeStatus(EStatus::BAD_REQUEST, @@ -254,6 +265,121 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath return WaitForQueue(inFlightRequests, 0); } +TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename, [[maybe_unused]]const TString& dbPath, [[maybe_unused]]const TImportFileSettings& settings) { + #if defined (_WIN64) || defined (_WIN32) || defined (__WIN32__) + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows"); + #else + std::shared_ptr<arrow::io::ReadableFile> infile; + arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename); + if (!fileResult.ok()) { + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unable to open parquet file:" << fileResult.status().ToString()); + } + std::shared_ptr<arrow::io::ReadableFile> readableFile = fileResult.ValueOrDie(); + + std::unique_ptr<parquet::arrow::FileReader> fileReader; + arrow::MemoryPool *pool = arrow::default_memory_pool(); + + arrow::Status st; + st = parquet::arrow::OpenFile(readableFile, pool, &fileReader); + if (!st.ok()) { + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString()); + } + + std::shared_ptr<parquet::FileMetaData> metaData = parquet::ReadMetaData(readableFile); + + i64 numRowGroups = metaData->num_row_groups(); + + std::vector<int> row_group_indices(numRowGroups); + for (i64 i = 0; i < numRowGroups; i++) { + row_group_indices[i] = i; + } + + std::shared_ptr<arrow::RecordBatchReader> reader; + + st = fileReader->GetRecordBatchReader(row_group_indices, &reader); + if (!st.ok()) { + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString()); + } + + std::deque<TAsyncStatus> inFlightRequests; + + auto splitUpsertBatch = [this, &inFlightRequests, dbPath, settings](const std::shared_ptr<arrow::RecordBatch> &recordBatch){ + std::vector<std::shared_ptr<arrow::RecordBatch>> slicedRecordBatches; + std::deque<std::shared_ptr<arrow::RecordBatch>> batchesDeque; + size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(recordBatch); + + size_t sliceCnt = totalSize / (size_t)settings.BytesPerRequest_; + if (totalSize % settings.BytesPerRequest_ != 0) { + sliceCnt++; + } + int64_t rowsInSlice = recordBatch->num_rows() / sliceCnt; + + for (int64_t currentRow = 0; currentRow < recordBatch->num_rows(); currentRow += rowsInSlice) { + auto nextSlice = (currentRow + rowsInSlice < recordBatch->num_rows()) ? recordBatch->Slice(currentRow, rowsInSlice) : recordBatch->Slice(currentRow); + batchesDeque.push_back(nextSlice); + } + + while (!batchesDeque.empty()) { + std::shared_ptr<arrow::RecordBatch> nextBatch = batchesDeque.front(); + batchesDeque.pop_front(); + if (NYdb_cli::NArrow::GetBatchDataSize(nextBatch) < settings.BytesPerRequest_) { + slicedRecordBatches.push_back(nextBatch); + } + else { + std::shared_ptr<arrow::RecordBatch> left = nextBatch->Slice(0, nextBatch->num_rows() / 2); + std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2); + batchesDeque.push_front(right); + batchesDeque.push_front(left); + } + } + auto schema = recordBatch->schema(); + TString strSchema = NYdb_cli::NArrow::SerializeSchema(*schema); + for (size_t i = 0; i < slicedRecordBatches.size(); i++) { + TString buffer = NYdb_cli::NArrow::SerializeBatchNoCompression(slicedRecordBatches[i]); + auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_); + if (!status.IsSuccess()) { + return status; + } + + inFlightRequests.push_back(UpsertParquetBuffer(dbPath, buffer, strSchema)); + } + + return MakeStatus(EStatus::SUCCESS); + }; + + std::shared_ptr<arrow::RecordBatch> currentBatch; + st = reader->ReadNext(¤tBatch); + if (!st.ok()) { + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString()); + } + + while(currentBatch) { + TStatus upsertStatus = splitUpsertBatch(currentBatch); + if (!upsertStatus.IsSuccess()) { + return upsertStatus; + } + st = reader->ReadNext(¤tBatch); + if (!st.ok()) { + return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString()); + } + } + + return WaitForQueue(inFlightRequests, 0); + #endif +} + +inline +TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) { + auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { + return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings) + .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { + NYdb::TStatus status = bulkUpsertResult.GetValueSync(); + return NThreading::MakeFuture(status); + }); + }; + return TableClient->RetryOperation(upsert, RetrySettings); +} + TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDescription) { TTypeBuilder typeBuilder; typeBuilder.BeginStruct(); diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index 8b20d66517..d46ec954b6 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -69,6 +69,9 @@ private: TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder); TType GetTableType(const NTable::TTableDescription& tableDescription); + + TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings); + TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema); }; } diff --git a/ydb/tests/functional/ydb_cli/canondata/result.json b/ydb/tests/functional/ydb_cli/canondata/result.json index 57776dab14..3833e7f7fc 100644 --- a/ydb/tests/functional/ydb_cli/canondata/result.json +++ b/ydb/tests/functional/ydb_cli/canondata/result.json @@ -8,6 +8,9 @@ "test_ydb_impex.TestImpex.test_format_tsv": { "uri": "file://test_ydb_impex.TestImpex.test_format_tsv/result.output" }, + "test_ydb_impex.TestImpex.test_format.parquet": { + "uri": "file://test_ydb_impex.TestImpex.test_format_parquet/result.output" + }, "test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64": { "uri": "file://test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64/result.output" }, diff --git a/ydb/tests/functional/ydb_cli/test_ydb_impex.py b/ydb/tests/functional/ydb_cli/test_ydb_impex.py index e8c41f056a..ec257da01f 100644 --- a/ydb/tests/functional/ydb_cli/test_ydb_impex.py +++ b/ydb/tests/functional/ydb_cli/test_ydb_impex.py @@ -4,12 +4,15 @@ from ydb.tests.library.common import yatest_common from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory import ydb import logging +import pyarrow as pa +import pyarrow.parquet as pq +import pandas as pd logger = logging.getLogger(__name__) -DATA_CSV = """key,id,value +DATA_CSV = """key,id,valueo 1,1111,"one" 2,2222,"two" 3,3333,"three" @@ -28,6 +31,11 @@ DATA_JSON = """{"key":1,"id":1111,"value":"one"} """ +DATAFRAME = pd.DataFrame({'key': [1, 2, 3, 5, 7], 'id': [1111, 2222, 3333, 5555, 7777], 'value': ["one", "two", "three", "five", "seven"]}).astype({'key': 'uint32', 'id': 'uint64', 'value': 'string'}) +SCHEMA = pa.schema([('key', pa.uint32()), ('id', pa.uint64()), ('value', pa.string())]) +DATA_PARQUET = pa.Table.from_pandas(DATAFRAME, schema=SCHEMA) + + def ydb_bin(): return yatest_common.binary_path("ydb/apps/ydb/ydb") @@ -111,6 +119,13 @@ class TestImpex(BaseTestTableService): output = self.execute_ydb_cli_command(["import", "file", "json", "-p", self.table_path, "-i", "tempinput.dat"]) return self.canonical_result(output) + def run_import_parquet(self, data): + self.clear_table() + with open("tempinput.dat", "w"): + pq.write_table(data, "tempinput.dat") + output = self.execute_ydb_cli_command(["import", "file", "parquet", "-p", self.table_path, "-i", "tempinput.dat"]) + return self.canonical_result(output) + def run_export(self, format): query = "SELECT `key`, `id`, `value` FROM `{}` ORDER BY `key`".format(self.table_path) output = self.execute_ydb_cli_command(["table", "query", "execute", "-q", query, "-t", "scan", "--format", format]) @@ -127,3 +142,7 @@ class TestImpex(BaseTestTableService): def test_format_json(self): self.run_import_json(DATA_JSON) return self.run_export("json-unicode") + + def test_format_parquet(self): + self.run_import_parquet(DATA_PARQUET) + return self.run_export("csv") |