diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/DataLakes | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/DataLakes')
13 files changed, 1243 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp new file mode 100644 index 0000000000..9fca42bfdc --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp @@ -0,0 +1,341 @@ +#include <Storages/DataLakes/DeltaLakeMetadataParser.h> +#include <base/JSON.h> +#include "clickhouse_config.h" +#include <set> + +#if USE_AWS_S3 && USE_PARQUET +#include <Storages/DataLakes/S3MetadataReader.h> +#include <Storages/StorageS3.h> +#error #include <parquet/file_reader.h> +#include <Processors/Formats/Impl/ArrowBufferedStreams.h> +#include <Processors/Formats/Impl/ParquetBlockInputFormat.h> +#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h> +#include <Formats/FormatFactory.h> +#include <Columns/ColumnString.h> +#include <Columns/ColumnNullable.h> +#include <IO/ReadHelpers.h> +#include <boost/algorithm/string/case_conv.hpp> +#error #include <parquet/arrow/reader.h> +#include <ranges> + +namespace fs = std::filesystem; + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; + extern const int BAD_ARGUMENTS; +} + +template <typename Configuration, typename MetadataReadHelper> +struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl +{ + /** + * Useful links: + * - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files + */ + + /** + * DeltaLake tables store metadata files and data files. + * Metadata files are stored as JSON in a directory at the root of the table named _delta_log, + * and together with checkpoints make up the log of all changes that have occurred to a table. + * + * Delta files are the unit of atomicity for a table, + * and are named using the next available version number, zero-padded to 20 digits. + * For example: + * ./_delta_log/00000000000000000000.json + */ + static constexpr auto deltalake_metadata_directory = "_delta_log"; + static constexpr auto metadata_file_suffix = ".json"; + + std::string withPadding(size_t version) + { + /// File names are zero-padded to 20 digits. + static constexpr auto padding = 20; + + const auto version_str = toString(version); + return std::string(padding - version_str.size(), '0') + version_str; + } + + /** + * A delta file, n.json, contains an atomic set of actions that should be applied to the + * previous table state (n-1.json) in order to the construct nth snapshot of the table. + * An action changes one aspect of the table's state, for example, adding or removing a file. + * Note: it is not a valid json, but a list of json's, so we read it in a while cycle. + */ + std::set<String> processMetadataFiles(const Configuration & configuration, ContextPtr context) + { + std::set<String> result_files; + const auto checkpoint_version = getCheckpointIfExists(result_files, configuration, context); + + if (checkpoint_version) + { + auto current_version = checkpoint_version; + while (true) + { + const auto filename = withPadding(++current_version) + metadata_file_suffix; + const auto file_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / filename; + + if (!MetadataReadHelper::exists(file_path, configuration)) + break; + + processMetadataFile(file_path, result_files, configuration, context); + } + + LOG_TRACE( + log, "Processed metadata files from checkpoint {} to {}", + checkpoint_version, current_version - 1); + } + else + { + const auto keys = MetadataReadHelper::listFiles( + configuration, deltalake_metadata_directory, metadata_file_suffix); + + for (const String & key : keys) + processMetadataFile(key, result_files, configuration, context); + } + + return result_files; + } + + /** + * Example of content of a single .json metadata file: + * " + * {"commitInfo":{ + * "timestamp":1679424650713, + * "operation":"WRITE", + * "operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2560"}, + * ...} + * {"protocol":{"minReaderVersion":2,"minWriterVersion":5}} + * {"metaData":{ + * "id":"bd11ad96-bc2c-40b0-be1f-6fdd90d04459", + * "format":{"provider":"parquet","options":{}}, + * "schemaString":"{...}", + * "partitionColumns":[], + * "configuration":{...}, + * "createdTime":1679424648640}} + * {"add":{ + * "path":"part-00000-ecf8ed08-d04a-4a71-a5ec-57d8bb2ab4ee-c000.parquet", + * "partitionValues":{}, + * "size":2560, + * "modificationTime":1679424649568, + * "dataChange":true, + * "stats":"{ + * \"numRecords\":100, + * \"minValues\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0}, + * \"maxValues\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":99}, + * \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}} + * " + */ + void processMetadataFile( + const String & key, + std::set<String> & result, + const Configuration & configuration, + ContextPtr context) + { + auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration); + + char c; + while (!buf->eof()) + { + /// May be some invalid characters before json. + while (buf->peek(c) && c != '{') + buf->ignore(); + + if (buf->eof()) + break; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + if (json_str.empty()) + continue; + + const JSON json(json_str); + if (json.has("add")) + { + const auto path = json["add"]["path"].getString(); + const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / path); + if (!inserted) + throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path); + } + else if (json.has("remove")) + { + const auto path = json["remove"]["path"].getString(); + const bool erase = result.erase(fs::path(configuration.getPath()) / path); + if (!erase) + throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path); + } + } + } + + /** + * Checkpoints in delta-lake are created each 10 commits by default. + * Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint + * + * _last_checkpoint contains the following: + * {"version":20, + * "size":23, + * "sizeInBytes":14057, + * "numOfAddFiles":21, + * "checkpointSchema":{...}} + * + * We need to get "version", which is the version of the checkpoint we need to read. + */ + size_t readLastCheckpointIfExists(const Configuration & configuration, ContextPtr context) + { + const auto last_checkpoint_file = fs::path(configuration.getPath()) / deltalake_metadata_directory / "_last_checkpoint"; + if (!MetadataReadHelper::exists(last_checkpoint_file, configuration)) + return 0; + + String json_str; + auto buf = MetadataReadHelper::createReadBuffer(last_checkpoint_file, context, configuration); + readJSONObjectPossiblyInvalid(json_str, *buf); + + const JSON json(json_str); + const auto version = json["version"].getUInt(); + + LOG_TRACE(log, "Last checkpoint file version: {}", version); + return version; + } + + /** + * The format of the checkpoint file name can take one of two forms: + * 1. A single checkpoint file for version n of the table will be named n.checkpoint.parquet. + * For example: + * 00000000000000000010.checkpoint.parquet + * 2. A multi-part checkpoint for version n can be fragmented into p files. Fragment o of p is + * named n.checkpoint.o.p.parquet. For example: + * 00000000000000000010.checkpoint.0000000001.0000000003.parquet + * 00000000000000000010.checkpoint.0000000002.0000000003.parquet + * 00000000000000000010.checkpoint.0000000003.0000000003.parquet + * TODO: Only (1) is supported, need to support (2). + * + * Such checkpoint files parquet contain data with the following contents: + * + * Row 1: + * ────── + * txn: (NULL,NULL,NULL) + * add: ('part-00000-1e9cd0c1-57b5-43b4-9ed8-39854287b83a-c000.parquet',{},1070,1680614340485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}') + * remove: (NULL,NULL,NULL,NULL,{},NULL,{}) + * metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL) + * protocol: (NULL,NULL) + * + * Row 2: + * ────── + * txn: (NULL,NULL,NULL) + * add: ('part-00000-8887e898-91dd-4951-a367-48f7eb7bd5fd-c000.parquet',{},1063,1680614318485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}') + * remove: (NULL,NULL,NULL,NULL,{},NULL,{}) + * metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL) + * protocol: (NULL,NULL) + * + * We need to check only `add` column, `remove` column does not have intersections with `add` column. + * ... + */ + #define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (const ::arrow::Status & _s = (status); !_s.ok()) \ + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Arrow error: {}", _s.ToString()); \ + } while (false) + + size_t getCheckpointIfExists(std::set<String> & result, const Configuration & configuration, ContextPtr context) + { + const auto version = readLastCheckpointIfExists(configuration, context); + if (!version) + return 0; + + const auto checkpoint_filename = withPadding(version) + ".checkpoint.parquet"; + const auto checkpoint_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / checkpoint_filename; + + LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string()); + + auto buf = MetadataReadHelper::createReadBuffer(checkpoint_path, context, configuration); + auto format_settings = getFormatSettings(context); + + /// Force nullable, because this parquet file for some reason does not have nullable + /// in parquet file metadata while the type are in fact nullable. + format_settings.schema_inference_make_columns_nullable = true; + auto columns = ParquetSchemaReader(*buf, format_settings).readSchema(); + + /// Read only columns that we need. + columns.filterColumns(NameSet{"add", "remove"}); + Block header; + for (const auto & column : columns) + header.insert({column.type->createColumn(), column.type, column.name}); + + std::atomic<int> is_stopped{0}; + auto arrow_file = asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES); + + std::unique_ptr<parquet::arrow::FileReader> reader; + THROW_ARROW_NOT_OK( + parquet::arrow::OpenFile( + asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), + arrow::default_memory_pool(), + &reader)); + + std::shared_ptr<arrow::Schema> schema; + THROW_ARROW_NOT_OK(reader->GetSchema(&schema)); + + ArrowColumnToCHColumn column_reader( + header, "Parquet", + format_settings.parquet.allow_missing_columns, + /* null_as_default */true, + /* case_insensitive_column_matching */false); + + Chunk res; + std::shared_ptr<arrow::Table> table; + THROW_ARROW_NOT_OK(reader->ReadTable(&table)); + + column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows()); + const auto & res_columns = res.getColumns(); + + if (res_columns.size() != 2) + { + throw Exception( + ErrorCodes::INCORRECT_DATA, + "Unexpected number of columns: {} (having: {}, expected: {})", + res_columns.size(), res.dumpStructure(), header.dumpStructure()); + } + + const auto * tuple_column = assert_cast<const ColumnTuple *>(res_columns[0].get()); + const auto & nullable_column = assert_cast<const ColumnNullable &>(tuple_column->getColumn(0)); + const auto & path_column = assert_cast<const ColumnString &>(nullable_column.getNestedColumn()); + for (size_t i = 0; i < path_column.size(); ++i) + { + const auto filename = String(path_column.getDataAt(i)); + if (filename.empty()) + continue; + LOG_TEST(log, "Adding {}", filename); + const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / filename); + if (!inserted) + throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", filename); + } + + return version; + } + + Poco::Logger * log = &Poco::Logger::get("DeltaLakeMetadataParser"); +}; + + +template <typename Configuration, typename MetadataReadHelper> +DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataParser() : impl(std::make_unique<Impl>()) +{ +} + +template <typename Configuration, typename MetadataReadHelper> +Strings DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context) +{ + auto result = impl->processMetadataFiles(configuration, context); + return Strings(result.begin(), result.end()); +} + +template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser(); +template Strings DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles( + const StorageS3::Configuration & configuration, ContextPtr); +} + +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h new file mode 100644 index 0000000000..df7276b90b --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h @@ -0,0 +1,22 @@ +#pragma once + +#include <Interpreters/Context_fwd.h> +#include <Core/Types.h> + +namespace DB +{ + +template <typename Configuration, typename MetadataReadHelper> +struct DeltaLakeMetadataParser +{ +public: + DeltaLakeMetadataParser<Configuration, MetadataReadHelper>(); + + Strings getFiles(const Configuration & configuration, ContextPtr context); + +private: + struct Impl; + std::shared_ptr<Impl> impl; +}; + +} diff --git a/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp new file mode 100644 index 0000000000..6857bb3500 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp @@ -0,0 +1,116 @@ +#include <Storages/DataLakes/HudiMetadataParser.h> +#include <Common/logger_useful.h> +#include <ranges> +#include <base/find_symbols.h> +#include <Poco/String.h> +#include "clickhouse_config.h" +#include <filesystem> +#include <IO/ReadHelpers.h> + +#if USE_AWS_S3 +#include <Storages/DataLakes/S3MetadataReader.h> +#include <Storages/StorageS3.h> + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +template <typename Configuration, typename MetadataReadHelper> +struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl +{ + /** + * Useful links: + * - https://hudi.apache.org/tech-specs/ + * - https://hudi.apache.org/docs/file_layouts/ + */ + + /** + * Hudi tables store metadata files and data files. + * Metadata files are stored in .hoodie/metadata directory. Though unlike DeltaLake and Iceberg, + * metadata is not required in order to understand which files we need to read, moreover, + * for Hudi metadata does not always exist. + * + * There can be two types of data files + * 1. base files (columnar file formats like Apache Parquet/Orc) + * 2. log files + * Currently we support reading only `base files`. + * Data file name format: + * [File Id]_[File Write Token]_[Transaction timestamp].[File Extension] + * + * To find needed parts we need to find out latest part file for every file group for every partition. + * Explanation why: + * Hudi reads in and overwrites the entire table/partition with each update. + * Hudi controls the number of file groups under a single partition according to the + * hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group. + * Each file group is identified by File Id. + */ + Strings processMetadataFiles(const Configuration & configuration) + { + auto * log = &Poco::Logger::get("HudiMetadataParser"); + + const auto keys = MetadataReadHelper::listFiles(configuration, "", Poco::toLower(configuration.format)); + + using Partition = std::string; + using FileID = std::string; + struct FileInfo + { + String key; + UInt64 timestamp = 0; + }; + std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> data_files; + + for (const auto & key : keys) + { + auto key_file = std::filesystem::path(key); + Strings file_parts; + const String stem = key_file.stem(); + splitInto<'_'>(file_parts, stem); + if (file_parts.size() != 3) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key); + + const auto partition = key_file.parent_path().stem(); + const auto & file_id = file_parts[0]; + const auto timestamp = parse<UInt64>(file_parts[2]); + + auto & file_info = data_files[partition][file_id]; + if (file_info.timestamp == 0 || file_info.timestamp < timestamp) + { + file_info.key = std::move(key); + file_info.timestamp = timestamp; + } + } + + Strings result; + for (auto & [partition, partition_data] : data_files) + { + LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size()); + for (auto & [file_id, file_data] : partition_data) + result.push_back(std::move(file_data.key)); + } + return result; + } +}; + + +template <typename Configuration, typename MetadataReadHelper> +HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser() : impl(std::make_unique<Impl>()) +{ +} + +template <typename Configuration, typename MetadataReadHelper> +Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr) +{ + return impl->processMetadataFiles(configuration); +} + +template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser(); +template Strings HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles( + const StorageS3::Configuration & configuration, ContextPtr); + +} + +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h new file mode 100644 index 0000000000..6727ba2f71 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h @@ -0,0 +1,22 @@ +#pragma once + +#include <Interpreters/Context_fwd.h> +#include <Core/Types.h> + +namespace DB +{ + +template <typename Configuration, typename MetadataReadHelper> +struct HudiMetadataParser +{ +public: + HudiMetadataParser<Configuration, MetadataReadHelper>(); + + Strings getFiles(const Configuration & configuration, ContextPtr context); + +private: + struct Impl; + std::shared_ptr<Impl> impl; +}; + +} diff --git a/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h b/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h new file mode 100644 index 0000000000..d732ff928d --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h @@ -0,0 +1,119 @@ +#pragma once + +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <Storages/IStorage.h> +#include <Common/logger_useful.h> +#include <Storages/StorageFactory.h> +#include <Formats/FormatFactory.h> +#include <filesystem> + + +namespace DB +{ + +template <typename Storage, typename Name, typename MetadataParser> +class IStorageDataLake : public Storage +{ +public: + static constexpr auto name = Name::name; + using Configuration = typename Storage::Configuration; + + template <class ...Args> + explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args) + : Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward<Args>(args)...) + , base_configuration(configuration_) + , log(&Poco::Logger::get(getName())) {} + + String getName() const override { return name; } + + static ColumnsDescription getTableStructureFromData( + Configuration & base_configuration, + const std::optional<FormatSettings> & format_settings, + ContextPtr local_context) + { + auto configuration = getConfigurationForDataRead(base_configuration, local_context); + return Storage::getTableStructureFromData(configuration, format_settings, local_context); + } + + static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context) + { + return Storage::getConfiguration(engine_args, local_context, /* get_format_from_file */false); + } + + Configuration updateConfigurationAndGetCopy(ContextPtr local_context) override + { + std::lock_guard lock(configuration_update_mutex); + updateConfigurationImpl(local_context); + return Storage::getConfiguration(); + } + + void updateConfiguration(ContextPtr local_context) override + { + std::lock_guard lock(configuration_update_mutex); + updateConfigurationImpl(local_context); + } + +private: + static Configuration getConfigurationForDataRead( + const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {}) + { + auto configuration{base_configuration}; + configuration.update(local_context); + configuration.static_configuration = true; + + if (keys.empty()) + configuration.keys = getDataFiles(configuration, local_context); + else + configuration.keys = keys; + + LOG_TRACE( + &Poco::Logger::get("DataLake"), + "New configuration path: {}, keys: {}", + configuration.getPath(), fmt::join(configuration.keys, ", ")); + + configuration.connect(local_context); + return configuration; + } + + static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context) + { + return MetadataParser().getFiles(configuration, local_context); + } + + void updateConfigurationImpl(ContextPtr local_context) + { + const bool updated = base_configuration.update(local_context); + auto new_keys = getDataFiles(base_configuration, local_context); + + if (!updated && new_keys == Storage::getConfiguration().keys) + return; + + Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys)); + } + + Configuration base_configuration; + std::mutex configuration_update_mutex; + Poco::Logger * log; +}; + + +template <typename DataLake> +static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args) +{ + auto configuration = DataLake::getConfiguration(args.engine_args, args.getLocalContext()); + + /// Data lakes use parquet format, no need for schema inference. + if (configuration.format == "auto") + configuration.format = "Parquet"; + + return std::make_shared<DataLake>( + configuration, args.getContext(), args.table_id, args.columns, args.constraints, + args.comment, getFormatSettings(args.getContext())); +} + +} + +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp new file mode 100644 index 0000000000..a5c0dcc434 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp @@ -0,0 +1,361 @@ +#include "clickhouse_config.h" + +#if USE_AWS_S3 && USE_AVRO + +#include <Common/logger_useful.h> + +#include <Columns/ColumnString.h> +#include <Columns/ColumnTuple.h> +#include <Columns/IColumn.h> +#include <Storages/DataLakes/IcebergMetadataParser.h> +#include <Storages/DataLakes/S3MetadataReader.h> +#include <Storages/StorageS3.h> +#include <Processors/Formats/Impl/AvroRowInputFormat.h> +#include <Formats/FormatFactory.h> +#include <IO/ReadHelpers.h> + +#include <Poco/JSON/Array.h> +#include <Poco/JSON/Object.h> +#include <Poco/JSON/Parser.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int ILLEGAL_COLUMN; + extern const int BAD_ARGUMENTS; +} + +template <typename Configuration, typename MetadataReadHelper> +struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl +{ + Poco::Logger * log = &Poco::Logger::get("IcebergMetadataParser"); + + /** + * Useful links: + * - https://iceberg.apache.org/spec/ + */ + + /** + * Iceberg has two format versions, currently we support only format V1. + * + * Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`. + * Metadata file - json file. + * Manifest list – a file that lists manifest files; one per snapshot. + * Manifest file – a file that lists data or delete files; a subset of a snapshot. + * All changes to table state create a new metadata file and replace the old metadata with an atomic swap. + */ + + static constexpr auto metadata_directory = "metadata"; + + /** + * Each version of table metadata is stored in a `metadata` directory and + * has format: v<V>.metadata.json, where V - metadata version. + */ + String getMetadataFile(const Configuration & configuration) + { + static constexpr auto metadata_file_suffix = ".metadata.json"; + + const auto metadata_files = MetadataReadHelper::listFiles(configuration, metadata_directory, metadata_file_suffix); + if (metadata_files.empty()) + { + throw Exception( + ErrorCodes::FILE_DOESNT_EXIST, + "The metadata file for Iceberg table with path {} doesn't exist", + configuration.url.key); + } + + /// Get the latest version of metadata file: v<V>.metadata.json + return *std::max_element(metadata_files.begin(), metadata_files.end()); + } + + /** + * In order to find out which data files to read, we need to find the `manifest list` + * which corresponds to the latest snapshot. We find it by checking a list of snapshots + * in metadata's "snapshots" section. + * + * Example of metadata.json file. + * + * { + * "format-version" : 1, + * "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5", + * "location" : "/iceberg_data/db/table_name", + * "last-updated-ms" : 1680206743150, + * "last-column-id" : 2, + * "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] }, + * "current-schema-id" : 0, + * "schemas" : [ ], + * ... + * "current-snapshot-id" : 2819310504515118887, + * "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } }, + * "snapshots" : [ { + * "snapshot-id" : 2819310504515118887, + * "timestamp-ms" : 1680206743150, + * "summary" : { + * "operation" : "append", "spark.app.id" : "local-1680206733239", + * "added-data-files" : "1", "added-records" : "100", + * "added-files-size" : "1070", "changed-partition-count" : "1", + * "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0", + * "total-position-deletes" : "0", "total-equality-deletes" : "0" + * }, + * "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro", + * "schema-id" : 0 + * } ], + * "statistics" : [ ], + * "snapshot-log" : [ ... ], + * "metadata-log" : [ ] + * } + */ + struct Metadata + { + int format_version; + String manifest_list; + Strings manifest_files; + }; + Metadata processMetadataFile(const Configuration & configuration, ContextPtr context) + { + const auto metadata_file_path = getMetadataFile(configuration); + auto buf = MetadataReadHelper::createReadBuffer(metadata_file_path, context, configuration); + String json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file + Poco::Dynamic::Var json = parser.parse(json_str); + Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>(); + + Metadata result; + result.format_version = object->getValue<int>("format-version"); + + auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id"); + auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>(); + + for (size_t i = 0; i < snapshots->size(); ++i) + { + const auto snapshot = snapshots->getObject(static_cast<UInt32>(i)); + if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id) + { + const auto path = snapshot->getValue<String>("manifest-list"); + result.manifest_list = std::filesystem::path(configuration.url.key) / metadata_directory / std::filesystem::path(path).filename(); + break; + } + } + return result; + } + + /** + * Manifest list has Avro as default format (and currently we support only Avro). + * Manifest list file format of manifest list is: snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro + * + * `manifest list` has the following contents: + * ┌─manifest_path────────────────────────────────────────────────────────────────────────────────────────┬─manifest_length─┬─partition_spec_id─┬───added_snapshot_id─┬─added_data_files_count─┬─existing_data_files_count─┬─deleted_data_files_count─┬─partitions─┬─added_rows_count─┬─existing_rows_count─┬─deleted_rows_count─┐ + * │ /iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro │ 5813 │ 0 │ 2819310504515118887 │ 1 │ 0 │ 0 │ [] │ 100 │ 0 │ 0 │ + * └──────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┴───────────────────┴─────────────────────┴────────────────────────┴───────────────────────────┴──────────────────────────┴────────────┴──────────────────┴─────────────────────┴────────────────────┘ + */ + void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context) + { + auto buf = MetadataReadHelper::createReadBuffer(metadata.manifest_list, context, configuration); + auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buf)); + + auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0)); + Block header{{data_type->createColumn(), data_type, "manifest_path"}}; + auto columns = parseAvro(*file_reader, header, getFormatSettings(context)); + auto & col = columns.at(0); + + if (col->getDataType() != TypeIndex::String) + { + throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `manifest_path` field should be String type, got {}", + col->getFamilyName()); + } + + const auto * col_str = typeid_cast<ColumnString *>(col.get()); + for (size_t i = 0; i < col_str->size(); ++i) + { + const auto file_path = col_str->getDataAt(i).toView(); + const auto filename = std::filesystem::path(file_path).filename(); + metadata.manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / metadata_directory / filename); + } + } + + /** + * Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro' + * + * `manifest file` is different in format version V1 and V2 and has the following contents: + * v1 v2 + * status req req + * snapshot_id req opt + * sequence_number opt + * file_sequence_number opt + * data_file req req + * Example format version V1: + * ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ + * │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │ + * └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ + * Example format version V2: + * ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ + * │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │ + * └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ + * In case of partitioned data we'll have extra directory partition=value: + * ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ + * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │ + * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │ + * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │ + * └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ + */ + Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context) + { + NameSet keys; + for (const auto & manifest_file : metadata.manifest_files) + { + auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration); + auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer)); + + avro::NodePtr root_node = file_reader->dataSchema().root(); + size_t leaves_num = root_node->leaves(); + size_t expected_min_num = metadata.format_version == 1 ? 3 : 2; + if (leaves_num < expected_min_num) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Unexpected number of columns {}. Expected at least {}", + root_node->leaves(), expected_min_num); + } + + avro::NodePtr status_node = root_node->leafAt(0); + if (status_node->type() != avro::Type::AVRO_INT) + { + throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `status` field should be Int type, got {}", + magic_enum::enum_name(status_node->type())); + } + + avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1); + if (data_file_node->type() != avro::Type::AVRO_RECORD) + { + throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `data_file` field should be Tuple type, got {}", + magic_enum::enum_name(data_file_node->type())); + } + + auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node); + auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node); + Block header{ + {status_col_data_type->createColumn(), status_col_data_type, "status"}, + {data_col_data_type->createColumn(), data_col_data_type, "data_file"}}; + + const auto columns = parseAvro(*file_reader, header, getFormatSettings(context)); + if (columns.size() != 2) + { + throw Exception(ErrorCodes::ILLEGAL_COLUMN, + "Unexpected number of columns. Expected 2, got {}", columns.size()); + } + + if (columns.at(0)->getDataType() != TypeIndex::Int32) + { + throw Exception(ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `status` field should be Int32 type, got {}", + columns.at(0)->getFamilyName()); + } + if (columns.at(1)->getDataType() != TypeIndex::Tuple) + { + throw Exception(ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `file_path` field should be Tuple type, got {}", + columns.at(1)->getFamilyName()); + } + + const auto status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get()); + const auto data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get()); + + if (status_int_column->size() != data_file_tuple_column->size()) + { + throw Exception(ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}", + status_int_column->size(), data_file_tuple_column->size()); + } + + const auto * data_file_name_column = metadata.format_version == 1 + ? data_file_tuple_column->getColumnPtr(0).get() + : data_file_tuple_column->getColumnPtr(1).get(); + + if (data_file_name_column->getDataType() != TypeIndex::String) + { + throw Exception(ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file of `file_path` field should be String type, got {}", + data_file_name_column->getFamilyName()); + } + auto file_name_str_column = assert_cast<const ColumnString *>(data_file_name_column); + + for (size_t i = 0; i < status_int_column->size(); ++i) + { + const auto status = status_int_column->getInt(i); + const auto data_path = std::string(file_name_str_column->getDataAt(i).toView()); + const auto pos = data_path.find(configuration.url.key); + const auto file_path = data_path.substr(pos); + if (pos == std::string::npos) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path); + + if (status == 2) + { + LOG_TEST(log, "Processing delete file for path: {}", file_path); + chassert(!keys.contains(file_path)); + } + else + keys.insert(file_path); + } + } + + return std::vector<std::string>(keys.begin(), keys.end()); + } + + MutableColumns parseAvro( + avro::DataFileReaderBase & file_reader, + const Block & header, + const FormatSettings & settings) + { + auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings); + MutableColumns columns = header.cloneEmptyColumns(); + + file_reader.init(); + RowReadExtension ext; + while (file_reader.hasMore()) + { + file_reader.decr(); + deserializer->deserializeRow(columns, file_reader.decoder(), ext); + } + return columns; + } + +}; + + +template <typename Configuration, typename MetadataReadHelper> +IcebergMetadataParser<Configuration, MetadataReadHelper>::IcebergMetadataParser() : impl(std::make_unique<Impl>()) +{ +} + +template <typename Configuration, typename MetadataReadHelper> +Strings IcebergMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context) +{ + auto metadata = impl->processMetadataFile(configuration, context); + + /// When table first created and does not have any data + if (metadata.manifest_list.empty()) + return {}; + + impl->processManifestList(metadata, configuration, context); + return impl->getFilesForRead(metadata, configuration, context); +} + + +template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser(); +template Strings IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(const StorageS3::Configuration & configuration, ContextPtr); + +} + +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h new file mode 100644 index 0000000000..226b1bd8b6 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h @@ -0,0 +1,26 @@ +#pragma once + +#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. + +#include <Interpreters/Context_fwd.h> +#include <Core/Types.h> + +namespace DB +{ + +template <typename Configuration, typename MetadataReadHelper> +struct IcebergMetadataParser +{ +public: + IcebergMetadataParser<Configuration, MetadataReadHelper>(); + + Strings getFiles(const Configuration & configuration, ContextPtr context); + +private: + struct Impl; + std::shared_ptr<Impl> impl; +}; + +} + +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp new file mode 100644 index 0000000000..5535b08e54 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp @@ -0,0 +1,86 @@ +#include <clickhouse_config.h> + +#if USE_AWS_S3 + +#include <IO/ReadBufferFromS3.h> +#include <IO/S3/Requests.h> +#include <Interpreters/Context.h> +#include <Storages/DataLakes/S3MetadataReader.h> +#include <aws/core/auth/AWSCredentials.h> +#include <aws/s3/S3Client.h> +#include <aws/s3/model/ListObjectsV2Request.h> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int S3_ERROR; +} + +std::shared_ptr<ReadBuffer> +S3DataLakeMetadataReadHelper::createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration) +{ + S3Settings::RequestSettings request_settings; + request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries; + return std::make_shared<ReadBufferFromS3>( + base_configuration.client, + base_configuration.url.bucket, + key, + base_configuration.url.version_id, + request_settings, + context->getReadSettings()); +} + +bool S3DataLakeMetadataReadHelper::exists(const String & key, const StorageS3::Configuration & configuration) +{ + return S3::objectExists(*configuration.client, configuration.url.bucket, key); +} + +std::vector<String> S3DataLakeMetadataReadHelper::listFiles( + const StorageS3::Configuration & base_configuration, const String & prefix, const String & suffix) +{ + const auto & table_path = base_configuration.url.key; + const auto & bucket = base_configuration.url.bucket; + const auto & client = base_configuration.client; + + std::vector<String> res; + S3::ListObjectsV2Request request; + Aws::S3::Model::ListObjectsV2Outcome outcome; + + request.SetBucket(bucket); + request.SetPrefix(std::filesystem::path(table_path) / prefix); + + bool is_finished{false}; + while (!is_finished) + { + outcome = client->ListObjectsV2(request); + if (!outcome.IsSuccess()) + throw S3Exception( + outcome.GetError().GetErrorType(), + "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", + quoteString(bucket), + quoteString(base_configuration.url.key), + backQuote(outcome.GetError().GetExceptionName()), + quoteString(outcome.GetError().GetMessage())); + + const auto & result_batch = outcome.GetResult().GetContents(); + for (const auto & obj : result_batch) + { + const auto & filename = obj.GetKey(); + if (filename.ends_with(suffix)) + res.push_back(filename); + } + + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + is_finished = !outcome.GetResult().GetIsTruncated(); + } + + LOG_TRACE(&Poco::Logger::get("S3DataLakeMetadataReadHelper"), "Listed {} files", res.size()); + + return res; +} + +} +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h new file mode 100644 index 0000000000..31f94ff9f7 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h @@ -0,0 +1,25 @@ +#pragma once + +#include <clickhouse_config.h> + +#if USE_AWS_S3 + +#include <Storages/StorageS3.h> + +class ReadBuffer; + +namespace DB +{ + +struct S3DataLakeMetadataReadHelper +{ + static std::shared_ptr<ReadBuffer> createReadBuffer( + const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration); + + static bool exists(const String & key, const StorageS3::Configuration & configuration); + + static std::vector<String> listFiles(const StorageS3::Configuration & configuration, const std::string & prefix = "", const std::string & suffix = ""); +}; +} + +#endif diff --git a/contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h b/contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h new file mode 100644 index 0000000000..6854dd42b3 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h @@ -0,0 +1,25 @@ +#pragma once + +#include <Storages/IStorage.h> +#include <Storages/DataLakes/IStorageDataLake.h> +#include <Storages/DataLakes/DeltaLakeMetadataParser.h> +#include "clickhouse_config.h" + +#if USE_AWS_S3 +#include <Storages/DataLakes/S3MetadataReader.h> +#include <Storages/StorageS3.h> +#endif + +namespace DB +{ + +struct StorageDeltaLakeName +{ + static constexpr auto name = "DeltaLake"; +}; + +#if USE_AWS_S3 && USE_PARQUET +using StorageDeltaLakeS3 = IStorageDataLake<StorageS3, StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>; +#endif + +} diff --git a/contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h b/contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h new file mode 100644 index 0000000000..dd17b68767 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h @@ -0,0 +1,25 @@ +#pragma once + +#include <Storages/IStorage.h> +#include <Storages/DataLakes/IStorageDataLake.h> +#include <Storages/DataLakes/HudiMetadataParser.h> +#include "clickhouse_config.h" + +#if USE_AWS_S3 +#include <Storages/DataLakes/S3MetadataReader.h> +#include <Storages/StorageS3.h> +#endif + +namespace DB +{ + +struct StorageHudiName +{ + static constexpr auto name = "Hudi"; +}; + +#if USE_AWS_S3 +using StorageHudiS3 = IStorageDataLake<StorageS3, StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>; +#endif + +} diff --git a/contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h b/contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h new file mode 100644 index 0000000000..98e75c0b5e --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h @@ -0,0 +1,25 @@ +#pragma once + +#include <Storages/IStorage.h> +#include <Storages/DataLakes/IStorageDataLake.h> +#include <Storages/DataLakes/IcebergMetadataParser.h> +#include "clickhouse_config.h" + +#if USE_AWS_S3 && USE_AVRO +#include <Storages/DataLakes/S3MetadataReader.h> +#include <Storages/StorageS3.h> +#endif + +namespace DB +{ + +struct StorageIcebergName +{ + static constexpr auto name = "Iceberg"; +}; + +#if USE_AWS_S3 && USE_AVRO +using StorageIcebergS3 = IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>; +#endif + +} diff --git a/contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp b/contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp new file mode 100644 index 0000000000..b59f847e99 --- /dev/null +++ b/contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp @@ -0,0 +1,50 @@ +#include <Storages/DataLakes/IStorageDataLake.h> +#include "clickhouse_config.h" + +#if USE_AWS_S3 + +#include <Storages/DataLakes/StorageDeltaLake.h> +#include <Storages/DataLakes/StorageIceberg.h> +#include <Storages/DataLakes/StorageHudi.h> + + +namespace DB +{ + +#define REGISTER_DATA_LAKE_STORAGE(STORAGE, NAME) \ + factory.registerStorage( \ + NAME, \ + [](const StorageFactory::Arguments & args) \ + { \ + return createDataLakeStorage<STORAGE>(args);\ + }, \ + { \ + .supports_settings = false, \ + .supports_schema_inference = true, \ + .source_access_type = AccessType::S3, \ + }); + +#if USE_PARQUET +void registerStorageDeltaLake(StorageFactory & factory) +{ + REGISTER_DATA_LAKE_STORAGE(StorageDeltaLakeS3, StorageDeltaLakeName::name) +} +#endif + +#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. + +void registerStorageIceberg(StorageFactory & factory) +{ + REGISTER_DATA_LAKE_STORAGE(StorageIcebergS3, StorageIcebergName::name) +} + +#endif + +void registerStorageHudi(StorageFactory & factory) +{ + REGISTER_DATA_LAKE_STORAGE(StorageHudiS3, StorageHudiName::name) +} + +} + +#endif |