aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstepandrey <stepandrey@yandex-team.com>2022-11-30 16:32:10 +0300
committerstepandrey <stepandrey@yandex-team.com>2022-11-30 16:32:10 +0300
commitb6b02f6683bc476c07dc34badbd641aa5a902e49 (patch)
tree15b63008edf61c5d29644a12fa5b14429524abee
parenta8f566e1db4827fefcd4a9003b9d7894aacd9a4c (diff)
downloadydb-b6b02f6683bc476c07dc34badbd641aa5a902e49.tar.gz
support Parquet in ydb cli
parquet file import implementation
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h27
-rw-r--r--contrib/libs/apache/arrow/cpp/src/arrow/python/api.h30
-rw-r--r--contrib/libs/apache/arrow/cpp/src/parquet/api/io.h20
-rw-r--r--contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h35
-rw-r--r--contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h21
-rw-r--r--contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h25
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp17
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.h10
-rw-r--r--ydb/public/lib/ydb_cli/common/formats.h1
-rw-r--r--ydb/public/lib/ydb_cli/import/CMakeLists.txt2
-rw-r--r--ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp183
-rw-r--r--ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h26
-rw-r--r--ydb/public/lib/ydb_cli/import/cli_switch_type.h96
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp126
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h3
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/result.json3
-rw-r--r--ydb/tests/functional/ydb_cli/test_ydb_impex.py21
17 files changed, 645 insertions, 1 deletions
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h
new file mode 100644
index 0000000000..c0d772c697
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/filesystem/api.h
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/util/config.h" // IWYU pragma: export
+
+#include "arrow/filesystem/filesystem.h" // IWYU pragma: export
+#include "arrow/filesystem/localfs.h" // IWYU pragma: export
+#include "arrow/filesystem/mockfs.h" // IWYU pragma: export
+#ifdef ARROW_S3
+#error #include "arrow/filesystem/s3fs.h" // IWYU pragma: export
+#endif
diff --git a/contrib/libs/apache/arrow/cpp/src/arrow/python/api.h b/contrib/libs/apache/arrow/cpp/src/arrow/python/api.h
new file mode 100644
index 0000000000..a0b13d6d13
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/arrow/python/api.h
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/arrow_to_pandas.h"
+#include "arrow/python/common.h"
+#include "arrow/python/datetime.h"
+#include "arrow/python/deserialize.h"
+#include "arrow/python/helpers.h"
+#include "arrow/python/inference.h"
+#include "arrow/python/io.h"
+#include "arrow/python/numpy_convert.h"
+#include "arrow/python/numpy_to_arrow.h"
+#include "arrow/python/python_to_arrow.h"
+#include "arrow/python/serialize.h"
diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/io.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/io.h
new file mode 100644
index 0000000000..28a00f12a7
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/io.h
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "parquet/exception.h"
diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h
new file mode 100644
index 0000000000..7e746e8c5b
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/reader.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+// Column reader API
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/platform.h"
+#include "parquet/printer.h"
+#include "parquet/properties.h"
+#include "parquet/statistics.h"
+
+// Schemas
+#include "parquet/api/schema.h"
+
+// IO
+#include "parquet/api/io.h"
diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h
new file mode 100644
index 0000000000..7ca714f47b
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/schema.h
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+// Schemas
+#include "parquet/schema.h"
diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h b/contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h
new file mode 100644
index 0000000000..b072dcf74d
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/parquet/api/writer.h
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "parquet/api/io.h"
+#include "parquet/api/schema.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_writer.h"
+#include "parquet/statistics.h"
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 63420950a2..42747d12d8 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -140,6 +140,7 @@ TCommandImportFromFile::TCommandImportFromFile()
AddCommand(std::make_unique<TCommandImportFromCsv>());
AddCommand(std::make_unique<TCommandImportFromTsv>());
AddCommand(std::make_unique<TCommandImportFromJson>());
+ AddCommand(std::make_unique<TCommandImportFromParquet>());
}
/// Import File Shared Config
@@ -255,5 +256,21 @@ int TCommandImportFromJson::Run(TConfig& config) {
return EXIT_SUCCESS;
}
+/// Import Parquet
+void TCommandImportFromParquet::Config(TConfig& config) {
+ TCommandImportFileBase::Config(config);
+}
+
+int TCommandImportFromParquet::Run(TConfig& config) {
+ TImportFileSettings settings;
+ settings.Format(InputFormat);
+ settings.MaxInFlightRequests(MaxInFlightRequests);
+ settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
+
+ TImportFileClient client(CreateDriver(config));
+ ThrowOnError(client.Import(FilePath, Path, settings));
+
+ return EXIT_SUCCESS;
+}
}
}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
index af304b92a6..ddb2c2c272 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
@@ -101,5 +101,15 @@ public:
int Run(TConfig& config) override;
};
+class TCommandImportFromParquet : public TCommandImportFileBase {
+public:
+ TCommandImportFromParquet(const TString& cmd = "parquet", const TString& cmdDescription = "Import data from Parquet file")
+ : TCommandImportFileBase(cmd, cmdDescription)
+ {
+ InputFormat = EOutputFormat::Parquet;
+ }
+ void Config(TConfig& config) override;
+ int Run(TConfig& config) override;
+};
}
}
diff --git a/ydb/public/lib/ydb_cli/common/formats.h b/ydb/public/lib/ydb_cli/common/formats.h
index 09a893db58..4756529273 100644
--- a/ydb/public/lib/ydb_cli/common/formats.h
+++ b/ydb/public/lib/ydb_cli/common/formats.h
@@ -16,6 +16,7 @@ enum class EOutputFormat {
ProtoJsonBase64 /* "proto-json-base64" */,
Csv /* "csv" */,
Tsv /* "tsv" */,
+ Parquet /* "parquet" */,
};
// EMessagingFormat to be used in both input and output when working with files/pipes in operations related to messaging
diff --git a/ydb/public/lib/ydb_cli/import/CMakeLists.txt b/ydb/public/lib/ydb_cli/import/CMakeLists.txt
index d28016ad5b..7e7520d171 100644
--- a/ydb/public/lib/ydb_cli/import/CMakeLists.txt
+++ b/ydb/public/lib/ydb_cli/import/CMakeLists.txt
@@ -15,7 +15,9 @@ target_link_libraries(lib-ydb_cli-import PUBLIC
common
cpp-client-ydb_proto
public-lib-json_value
+ libs-apache-arrow
)
target_sources(lib-ydb_cli-import PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/import/import.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp
)
diff --git a/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp
new file mode 100644
index 0000000000..28a1c30087
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.cpp
@@ -0,0 +1,183 @@
+/*
+ This file contains code copied from core/formats/arrow_helpers.cpp in order to cut client dependecies
+*/
+
+
+#include "cli_arrow_helpers.h"
+#include "cli_switch_type.h"
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
+#include <util/generic/vector.h>
+#include <util/stream/file.h>
+#include <util/string/builder.h>
+#include <util/folder/path.h>
+
+#define Y_VERIFY_OK(status) Y_VERIFY(status.ok(), "%s", status.ToString().c_str())
+
+namespace NYdb_cli::NArrow {
+ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& table) {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(table->num_columns());
+ for (auto& col : table->columns()) {
+ Y_VERIFY(col->num_chunks() == 1);
+ columns.push_back(col->chunk(0));
+ }
+ return arrow::RecordBatch::Make(table->schema(), table->num_rows(), columns);
+ }
+ ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch) {
+ if (!batch) {
+ return 0;
+ }
+ ui64 bytes = 0;
+ for (auto& column : batch->columns()) { // TODO: use column_data() instead of columns()
+ bytes += GetArrayDataSize(column);
+ }
+ return bytes;
+ }
+
+ template <typename TType>
+ ui64 GetArrayDataSizeImpl(const std::shared_ptr<arrow::Array>& column) {
+ return sizeof(typename TType::c_type) * column->length();
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::NullType>(const std::shared_ptr<arrow::Array>& column) {
+ return column->length() * 8; // Special value for empty lines
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::StringType>(const std::shared_ptr<arrow::Array>& column) {
+ auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column);
+ return typedColumn->total_values_length();
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::LargeStringType>(const std::shared_ptr<arrow::Array>& column) {
+ auto typedColumn = std::static_pointer_cast<arrow::StringArray>(column);
+ return typedColumn->total_values_length();
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::BinaryType>(const std::shared_ptr<arrow::Array>& column) {
+ auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column);
+ return typedColumn->total_values_length();
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::LargeBinaryType>(const std::shared_ptr<arrow::Array>& column) {
+ auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(column);
+ return typedColumn->total_values_length();
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::FixedSizeBinaryType>(const std::shared_ptr<arrow::Array>& column) {
+ auto typedColumn = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(column);
+ return typedColumn->byte_width() * typedColumn->length();
+ }
+
+ template <>
+ ui64 GetArrayDataSizeImpl<arrow::Decimal128Type>(const std::shared_ptr<arrow::Array>& column) {
+ return sizeof(ui64) * 2 * column->length();
+ }
+
+ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
+ auto type = column->type();
+ ui64 bytes = 0;
+ bool success = SwitchTypeWithNull(type->id(), [&]<typename TType>(TTypeWrapper<TType> typeHolder) {
+ Y_UNUSED(typeHolder);
+ bytes = GetArrayDataSizeImpl<TType>(column);
+ return true;
+ });
+
+ // Add null bit mask overhead if any.
+ if (HasNulls(column)) {
+ bytes += column->length() / 8 + 1;
+ }
+
+ Y_VERIFY_DEBUG(success, "Unsupported arrow type %s", type->ToString().data());
+ return bytes;
+ }
+
+ namespace {
+ class TFixedStringOutputStream final : public arrow::io::OutputStream {
+ public:
+ TFixedStringOutputStream(TString* out)
+ : Out(out)
+ , Position(0)
+ { }
+
+ arrow::Status Close() override {
+ Out = nullptr;
+ return arrow::Status::OK();
+ }
+
+ bool closed() const override {
+ return Out == nullptr;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return Position;
+ }
+
+ arrow::Status Write(const void* data, int64_t nbytes) override {
+ if (Y_LIKELY(nbytes > 0)) {
+ Y_VERIFY(Out && Out->size() - Position >= ui64(nbytes));
+ char* dst = &(*Out)[Position];
+ ::memcpy(dst, data, nbytes);
+ Position += nbytes;
+ }
+
+ return arrow::Status::OK();
+ }
+
+ size_t GetPosition() const {
+ return Position;
+ }
+
+ private:
+ TString* Out;
+ size_t Position;
+ };
+ }
+
+ TString SerializeSchema(const arrow::Schema& schema) {
+ auto buffer = arrow::ipc::SerializeSchema(schema);
+ if (!buffer.ok()) {
+ return {};
+ }
+ return TString((const char*)(*buffer)->data(), (*buffer)->size());
+ }
+
+ TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) {
+ arrow::ipc::IpcPayload payload;
+ auto status = arrow::ipc::GetRecordBatchPayload(*batch, options, &payload);
+ Y_VERIFY_OK(status);
+
+ int32_t metadata_length = 0;
+ arrow::io::MockOutputStream mock;
+ status = arrow::ipc::WriteIpcPayload(payload, options, &mock, &metadata_length);
+ Y_VERIFY_OK(status);
+
+ TString str;
+ str.resize(mock.GetExtentBytesWritten());
+
+ TFixedStringOutputStream out(&str);
+ status = arrow::ipc::WriteIpcPayload(payload, options, &out, &metadata_length);
+ Y_VERIFY_OK(status);
+ Y_VERIFY(out.GetPosition() == str.size());
+
+ return str;
+ }
+
+ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) {
+ auto writeOptions = arrow::ipc::IpcWriteOptions::Defaults();
+ writeOptions.use_threads = false;
+ return SerializeBatch(batch, writeOptions);
+ }
+
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h
new file mode 100644
index 0000000000..cbc301c6f9
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h
@@ -0,0 +1,26 @@
+/*
+ This file contains code copied from core/formats/arrow_helpers.h in order to cut client dependecies
+*/
+
+#pragma once
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
+#include <util/generic/vector.h>
+#include <util/stream/file.h>
+#include <util/string/builder.h>
+#include <util/folder/path.h>
+
+namespace NYdb_cli::NArrow {
+ std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable);
+ ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch);
+ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column);
+ TString SerializeSchema(const arrow::Schema& schema);
+ TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options);
+ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch);
+ inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
+ return column->null_bitmap_data();
+ }
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/import/cli_switch_type.h b/ydb/public/lib/ydb_cli/import/cli_switch_type.h
new file mode 100644
index 0000000000..a97cccd914
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/import/cli_switch_type.h
@@ -0,0 +1,96 @@
+/*
+ This file contains code copied from core/formats/switch_type.h in order to cut client dependecies
+*/
+
+#pragma once
+
+namespace NYdb_cli::NArrow {
+
+template <typename TType>
+struct TTypeWrapper
+{
+ using T = TType;
+};
+
+template <typename TFunc, bool EnableNull = false>
+bool SwitchType(arrow::Type::type typeId, TFunc&& f) {
+ switch (typeId) {
+ case arrow::Type::NA: {
+ if constexpr (EnableNull) {
+ return f(TTypeWrapper<arrow::NullType>());
+ }
+ break;
+ }
+ case arrow::Type::BOOL:
+ return f(TTypeWrapper<arrow::BooleanType>());
+ case arrow::Type::UINT8:
+ return f(TTypeWrapper<arrow::UInt8Type>());
+ case arrow::Type::INT8:
+ return f(TTypeWrapper<arrow::Int8Type>());
+ case arrow::Type::UINT16:
+ return f(TTypeWrapper<arrow::UInt16Type>());
+ case arrow::Type::INT16:
+ return f(TTypeWrapper<arrow::Int16Type>());
+ case arrow::Type::UINT32:
+ return f(TTypeWrapper<arrow::UInt32Type>());
+ case arrow::Type::INT32:
+ return f(TTypeWrapper<arrow::Int32Type>());
+ case arrow::Type::UINT64:
+ return f(TTypeWrapper<arrow::UInt64Type>());
+ case arrow::Type::INT64:
+ return f(TTypeWrapper<arrow::Int64Type>());
+ case arrow::Type::HALF_FLOAT:
+ return f(TTypeWrapper<arrow::HalfFloatType>());
+ case arrow::Type::FLOAT:
+ return f(TTypeWrapper<arrow::FloatType>());
+ case arrow::Type::DOUBLE:
+ return f(TTypeWrapper<arrow::DoubleType>());
+ case arrow::Type::STRING:
+ return f(TTypeWrapper<arrow::StringType>());
+ case arrow::Type::BINARY:
+ return f(TTypeWrapper<arrow::BinaryType>());
+ case arrow::Type::FIXED_SIZE_BINARY:
+ return f(TTypeWrapper<arrow::FixedSizeBinaryType>());
+ case arrow::Type::DATE32:
+ return f(TTypeWrapper<arrow::Date32Type>());
+ case arrow::Type::DATE64:
+ return f(TTypeWrapper<arrow::Date64Type>());
+ case arrow::Type::TIMESTAMP:
+ return f(TTypeWrapper<arrow::TimestampType>());
+ case arrow::Type::TIME32:
+ return f(TTypeWrapper<arrow::Time32Type>());
+ case arrow::Type::TIME64:
+ return f(TTypeWrapper<arrow::Time64Type>());
+ case arrow::Type::INTERVAL_MONTHS:
+ return f(TTypeWrapper<arrow::MonthIntervalType>());
+ case arrow::Type::DECIMAL:
+ return f(TTypeWrapper<arrow::Decimal128Type>());
+ case arrow::Type::DURATION:
+ return f(TTypeWrapper<arrow::DurationType>());
+ case arrow::Type::LARGE_STRING:
+ return f(TTypeWrapper<arrow::LargeStringType>());
+ case arrow::Type::LARGE_BINARY:
+ return f(TTypeWrapper<arrow::LargeBinaryType>());
+ case arrow::Type::DECIMAL256:
+ case arrow::Type::DENSE_UNION:
+ case arrow::Type::DICTIONARY:
+ case arrow::Type::EXTENSION:
+ case arrow::Type::FIXED_SIZE_LIST:
+ case arrow::Type::INTERVAL_DAY_TIME:
+ case arrow::Type::LARGE_LIST:
+ case arrow::Type::LIST:
+ case arrow::Type::MAP:
+ case arrow::Type::MAX_ID:
+ case arrow::Type::SPARSE_UNION:
+ case arrow::Type::STRUCT:
+ break;
+ }
+
+ return false;
+}
+
+template <typename TFunc>
+bool SwitchTypeWithNull(arrow::Type::type typeId, TFunc&& f) {
+ return SwitchType<TFunc, true>(typeId, std::move(f));
+}
+} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index 5843c8d0f5..6b61c6bcfd 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -19,6 +19,15 @@
#include <deque>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/api.h>
+#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
+#include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
+#include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
+#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
+
namespace NYdb {
namespace NConsoleClient {
@@ -87,6 +96,8 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath
case EOutputFormat::JsonUnicode:
case EOutputFormat::JsonBase64:
return UpsertJson(input, dbPath, settings);
+ case EOutputFormat::Parquet:
+ return UpsertParquet(filePath, dbPath, settings);
default: ;
}
return MakeStatus(EStatus::BAD_REQUEST,
@@ -254,6 +265,121 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
return WaitForQueue(inFlightRequests, 0);
}
+TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename, [[maybe_unused]]const TString& dbPath, [[maybe_unused]]const TImportFileSettings& settings) {
+ #if defined (_WIN64) || defined (_WIN32) || defined (__WIN32__)
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
+ #else
+ std::shared_ptr<arrow::io::ReadableFile> infile;
+ arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename);
+ if (!fileResult.ok()) {
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unable to open parquet file:" << fileResult.status().ToString());
+ }
+ std::shared_ptr<arrow::io::ReadableFile> readableFile = fileResult.ValueOrDie();
+
+ std::unique_ptr<parquet::arrow::FileReader> fileReader;
+ arrow::MemoryPool *pool = arrow::default_memory_pool();
+
+ arrow::Status st;
+ st = parquet::arrow::OpenFile(readableFile, pool, &fileReader);
+ if (!st.ok()) {
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString());
+ }
+
+ std::shared_ptr<parquet::FileMetaData> metaData = parquet::ReadMetaData(readableFile);
+
+ i64 numRowGroups = metaData->num_row_groups();
+
+ std::vector<int> row_group_indices(numRowGroups);
+ for (i64 i = 0; i < numRowGroups; i++) {
+ row_group_indices[i] = i;
+ }
+
+ std::shared_ptr<arrow::RecordBatchReader> reader;
+
+ st = fileReader->GetRecordBatchReader(row_group_indices, &reader);
+ if (!st.ok()) {
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString());
+ }
+
+ std::deque<TAsyncStatus> inFlightRequests;
+
+ auto splitUpsertBatch = [this, &inFlightRequests, dbPath, settings](const std::shared_ptr<arrow::RecordBatch> &recordBatch){
+ std::vector<std::shared_ptr<arrow::RecordBatch>> slicedRecordBatches;
+ std::deque<std::shared_ptr<arrow::RecordBatch>> batchesDeque;
+ size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(recordBatch);
+
+ size_t sliceCnt = totalSize / (size_t)settings.BytesPerRequest_;
+ if (totalSize % settings.BytesPerRequest_ != 0) {
+ sliceCnt++;
+ }
+ int64_t rowsInSlice = recordBatch->num_rows() / sliceCnt;
+
+ for (int64_t currentRow = 0; currentRow < recordBatch->num_rows(); currentRow += rowsInSlice) {
+ auto nextSlice = (currentRow + rowsInSlice < recordBatch->num_rows()) ? recordBatch->Slice(currentRow, rowsInSlice) : recordBatch->Slice(currentRow);
+ batchesDeque.push_back(nextSlice);
+ }
+
+ while (!batchesDeque.empty()) {
+ std::shared_ptr<arrow::RecordBatch> nextBatch = batchesDeque.front();
+ batchesDeque.pop_front();
+ if (NYdb_cli::NArrow::GetBatchDataSize(nextBatch) < settings.BytesPerRequest_) {
+ slicedRecordBatches.push_back(nextBatch);
+ }
+ else {
+ std::shared_ptr<arrow::RecordBatch> left = nextBatch->Slice(0, nextBatch->num_rows() / 2);
+ std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2);
+ batchesDeque.push_front(right);
+ batchesDeque.push_front(left);
+ }
+ }
+ auto schema = recordBatch->schema();
+ TString strSchema = NYdb_cli::NArrow::SerializeSchema(*schema);
+ for (size_t i = 0; i < slicedRecordBatches.size(); i++) {
+ TString buffer = NYdb_cli::NArrow::SerializeBatchNoCompression(slicedRecordBatches[i]);
+ auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_);
+ if (!status.IsSuccess()) {
+ return status;
+ }
+
+ inFlightRequests.push_back(UpsertParquetBuffer(dbPath, buffer, strSchema));
+ }
+
+ return MakeStatus(EStatus::SUCCESS);
+ };
+
+ std::shared_ptr<arrow::RecordBatch> currentBatch;
+ st = reader->ReadNext(&currentBatch);
+ if (!st.ok()) {
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
+ }
+
+ while(currentBatch) {
+ TStatus upsertStatus = splitUpsertBatch(currentBatch);
+ if (!upsertStatus.IsSuccess()) {
+ return upsertStatus;
+ }
+ st = reader->ReadNext(&currentBatch);
+ if (!st.ok()) {
+ return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
+ }
+ }
+
+ return WaitForQueue(inFlightRequests, 0);
+ #endif
+}
+
+inline
+TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) {
+ auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
+ return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings)
+ .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
+ NYdb::TStatus status = bulkUpsertResult.GetValueSync();
+ return NThreading::MakeFuture(status);
+ });
+ };
+ return TableClient->RetryOperation(upsert, RetrySettings);
+}
+
TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDescription) {
TTypeBuilder typeBuilder;
typeBuilder.BeginStruct();
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index 8b20d66517..d46ec954b6 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -69,6 +69,9 @@ private:
TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);
TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder);
TType GetTableType(const NTable::TTableDescription& tableDescription);
+
+ TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings);
+ TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema);
};
}
diff --git a/ydb/tests/functional/ydb_cli/canondata/result.json b/ydb/tests/functional/ydb_cli/canondata/result.json
index 57776dab14..3833e7f7fc 100644
--- a/ydb/tests/functional/ydb_cli/canondata/result.json
+++ b/ydb/tests/functional/ydb_cli/canondata/result.json
@@ -8,6 +8,9 @@
"test_ydb_impex.TestImpex.test_format_tsv": {
"uri": "file://test_ydb_impex.TestImpex.test_format_tsv/result.output"
},
+ "test_ydb_impex.TestImpex.test_format.parquet": {
+ "uri": "file://test_ydb_impex.TestImpex.test_format_parquet/result.output"
+ },
"test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64": {
"uri": "file://test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64/result.output"
},
diff --git a/ydb/tests/functional/ydb_cli/test_ydb_impex.py b/ydb/tests/functional/ydb_cli/test_ydb_impex.py
index e8c41f056a..ec257da01f 100644
--- a/ydb/tests/functional/ydb_cli/test_ydb_impex.py
+++ b/ydb/tests/functional/ydb_cli/test_ydb_impex.py
@@ -4,12 +4,15 @@ from ydb.tests.library.common import yatest_common
from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory
import ydb
import logging
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pandas as pd
logger = logging.getLogger(__name__)
-DATA_CSV = """key,id,value
+DATA_CSV = """key,id,valueo
1,1111,"one"
2,2222,"two"
3,3333,"three"
@@ -28,6 +31,11 @@ DATA_JSON = """{"key":1,"id":1111,"value":"one"}
"""
+DATAFRAME = pd.DataFrame({'key': [1, 2, 3, 5, 7], 'id': [1111, 2222, 3333, 5555, 7777], 'value': ["one", "two", "three", "five", "seven"]}).astype({'key': 'uint32', 'id': 'uint64', 'value': 'string'})
+SCHEMA = pa.schema([('key', pa.uint32()), ('id', pa.uint64()), ('value', pa.string())])
+DATA_PARQUET = pa.Table.from_pandas(DATAFRAME, schema=SCHEMA)
+
+
def ydb_bin():
return yatest_common.binary_path("ydb/apps/ydb/ydb")
@@ -111,6 +119,13 @@ class TestImpex(BaseTestTableService):
output = self.execute_ydb_cli_command(["import", "file", "json", "-p", self.table_path, "-i", "tempinput.dat"])
return self.canonical_result(output)
+ def run_import_parquet(self, data):
+ self.clear_table()
+ with open("tempinput.dat", "w"):
+ pq.write_table(data, "tempinput.dat")
+ output = self.execute_ydb_cli_command(["import", "file", "parquet", "-p", self.table_path, "-i", "tempinput.dat"])
+ return self.canonical_result(output)
+
def run_export(self, format):
query = "SELECT `key`, `id`, `value` FROM `{}` ORDER BY `key`".format(self.table_path)
output = self.execute_ydb_cli_command(["table", "query", "execute", "-q", query, "-t", "scan", "--format", format])
@@ -127,3 +142,7 @@ class TestImpex(BaseTestTableService):
def test_format_json(self):
self.run_import_json(DATA_JSON)
return self.run_export("json-unicode")
+
+ def test_format_parquet(self):
+ self.run_import_parquet(DATA_PARQUET)
+ return self.run_export("csv")