aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/DataLakes
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/DataLakes
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/DataLakes')
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp341
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h22
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp116
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h22
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h119
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp361
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h26
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp86
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h25
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h25
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h25
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h25
-rw-r--r--contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp50
13 files changed, 1243 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp
new file mode 100644
index 0000000000..9fca42bfdc
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp
@@ -0,0 +1,341 @@
+#include <Storages/DataLakes/DeltaLakeMetadataParser.h>
+#include <base/JSON.h>
+#include "clickhouse_config.h"
+#include <set>
+
+#if USE_AWS_S3 && USE_PARQUET
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <Storages/StorageS3.h>
+#error #include <parquet/file_reader.h>
+#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
+#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
+#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
+#include <Formats/FormatFactory.h>
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnNullable.h>
+#include <IO/ReadHelpers.h>
+#include <boost/algorithm/string/case_conv.hpp>
+#error #include <parquet/arrow/reader.h>
+#include <ranges>
+
+namespace fs = std::filesystem;
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int INCORRECT_DATA;
+ extern const int BAD_ARGUMENTS;
+}
+
+template <typename Configuration, typename MetadataReadHelper>
+struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
+{
+ /**
+ * Useful links:
+ * - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
+ */
+
+ /**
+ * DeltaLake tables store metadata files and data files.
+ * Metadata files are stored as JSON in a directory at the root of the table named _delta_log,
+ * and together with checkpoints make up the log of all changes that have occurred to a table.
+ *
+ * Delta files are the unit of atomicity for a table,
+ * and are named using the next available version number, zero-padded to 20 digits.
+ * For example:
+ * ./_delta_log/00000000000000000000.json
+ */
+ static constexpr auto deltalake_metadata_directory = "_delta_log";
+ static constexpr auto metadata_file_suffix = ".json";
+
+ std::string withPadding(size_t version)
+ {
+ /// File names are zero-padded to 20 digits.
+ static constexpr auto padding = 20;
+
+ const auto version_str = toString(version);
+ return std::string(padding - version_str.size(), '0') + version_str;
+ }
+
+ /**
+ * A delta file, n.json, contains an atomic set of actions that should be applied to the
+ * previous table state (n-1.json) in order to the construct nth snapshot of the table.
+ * An action changes one aspect of the table's state, for example, adding or removing a file.
+ * Note: it is not a valid json, but a list of json's, so we read it in a while cycle.
+ */
+ std::set<String> processMetadataFiles(const Configuration & configuration, ContextPtr context)
+ {
+ std::set<String> result_files;
+ const auto checkpoint_version = getCheckpointIfExists(result_files, configuration, context);
+
+ if (checkpoint_version)
+ {
+ auto current_version = checkpoint_version;
+ while (true)
+ {
+ const auto filename = withPadding(++current_version) + metadata_file_suffix;
+ const auto file_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / filename;
+
+ if (!MetadataReadHelper::exists(file_path, configuration))
+ break;
+
+ processMetadataFile(file_path, result_files, configuration, context);
+ }
+
+ LOG_TRACE(
+ log, "Processed metadata files from checkpoint {} to {}",
+ checkpoint_version, current_version - 1);
+ }
+ else
+ {
+ const auto keys = MetadataReadHelper::listFiles(
+ configuration, deltalake_metadata_directory, metadata_file_suffix);
+
+ for (const String & key : keys)
+ processMetadataFile(key, result_files, configuration, context);
+ }
+
+ return result_files;
+ }
+
+ /**
+ * Example of content of a single .json metadata file:
+ * "
+ * {"commitInfo":{
+ * "timestamp":1679424650713,
+ * "operation":"WRITE",
+ * "operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2560"},
+ * ...}
+ * {"protocol":{"minReaderVersion":2,"minWriterVersion":5}}
+ * {"metaData":{
+ * "id":"bd11ad96-bc2c-40b0-be1f-6fdd90d04459",
+ * "format":{"provider":"parquet","options":{}},
+ * "schemaString":"{...}",
+ * "partitionColumns":[],
+ * "configuration":{...},
+ * "createdTime":1679424648640}}
+ * {"add":{
+ * "path":"part-00000-ecf8ed08-d04a-4a71-a5ec-57d8bb2ab4ee-c000.parquet",
+ * "partitionValues":{},
+ * "size":2560,
+ * "modificationTime":1679424649568,
+ * "dataChange":true,
+ * "stats":"{
+ * \"numRecords\":100,
+ * \"minValues\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0},
+ * \"maxValues\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":99},
+ * \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}}
+ * "
+ */
+ void processMetadataFile(
+ const String & key,
+ std::set<String> & result,
+ const Configuration & configuration,
+ ContextPtr context)
+ {
+ auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration);
+
+ char c;
+ while (!buf->eof())
+ {
+ /// May be some invalid characters before json.
+ while (buf->peek(c) && c != '{')
+ buf->ignore();
+
+ if (buf->eof())
+ break;
+
+ String json_str;
+ readJSONObjectPossiblyInvalid(json_str, *buf);
+
+ if (json_str.empty())
+ continue;
+
+ const JSON json(json_str);
+ if (json.has("add"))
+ {
+ const auto path = json["add"]["path"].getString();
+ const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / path);
+ if (!inserted)
+ throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
+ }
+ else if (json.has("remove"))
+ {
+ const auto path = json["remove"]["path"].getString();
+ const bool erase = result.erase(fs::path(configuration.getPath()) / path);
+ if (!erase)
+ throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path);
+ }
+ }
+ }
+
+ /**
+ * Checkpoints in delta-lake are created each 10 commits by default.
+ * Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint
+ *
+ * _last_checkpoint contains the following:
+ * {"version":20,
+ * "size":23,
+ * "sizeInBytes":14057,
+ * "numOfAddFiles":21,
+ * "checkpointSchema":{...}}
+ *
+ * We need to get "version", which is the version of the checkpoint we need to read.
+ */
+ size_t readLastCheckpointIfExists(const Configuration & configuration, ContextPtr context)
+ {
+ const auto last_checkpoint_file = fs::path(configuration.getPath()) / deltalake_metadata_directory / "_last_checkpoint";
+ if (!MetadataReadHelper::exists(last_checkpoint_file, configuration))
+ return 0;
+
+ String json_str;
+ auto buf = MetadataReadHelper::createReadBuffer(last_checkpoint_file, context, configuration);
+ readJSONObjectPossiblyInvalid(json_str, *buf);
+
+ const JSON json(json_str);
+ const auto version = json["version"].getUInt();
+
+ LOG_TRACE(log, "Last checkpoint file version: {}", version);
+ return version;
+ }
+
+ /**
+ * The format of the checkpoint file name can take one of two forms:
+ * 1. A single checkpoint file for version n of the table will be named n.checkpoint.parquet.
+ * For example:
+ * 00000000000000000010.checkpoint.parquet
+ * 2. A multi-part checkpoint for version n can be fragmented into p files. Fragment o of p is
+ * named n.checkpoint.o.p.parquet. For example:
+ * 00000000000000000010.checkpoint.0000000001.0000000003.parquet
+ * 00000000000000000010.checkpoint.0000000002.0000000003.parquet
+ * 00000000000000000010.checkpoint.0000000003.0000000003.parquet
+ * TODO: Only (1) is supported, need to support (2).
+ *
+ * Such checkpoint files parquet contain data with the following contents:
+ *
+ * Row 1:
+ * ──────
+ * txn: (NULL,NULL,NULL)
+ * add: ('part-00000-1e9cd0c1-57b5-43b4-9ed8-39854287b83a-c000.parquet',{},1070,1680614340485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}')
+ * remove: (NULL,NULL,NULL,NULL,{},NULL,{})
+ * metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL)
+ * protocol: (NULL,NULL)
+ *
+ * Row 2:
+ * ──────
+ * txn: (NULL,NULL,NULL)
+ * add: ('part-00000-8887e898-91dd-4951-a367-48f7eb7bd5fd-c000.parquet',{},1063,1680614318485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}')
+ * remove: (NULL,NULL,NULL,NULL,{},NULL,{})
+ * metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL)
+ * protocol: (NULL,NULL)
+ *
+ * We need to check only `add` column, `remove` column does not have intersections with `add` column.
+ * ...
+ */
+ #define THROW_ARROW_NOT_OK(status) \
+ do \
+ { \
+ if (const ::arrow::Status & _s = (status); !_s.ok()) \
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Arrow error: {}", _s.ToString()); \
+ } while (false)
+
+ size_t getCheckpointIfExists(std::set<String> & result, const Configuration & configuration, ContextPtr context)
+ {
+ const auto version = readLastCheckpointIfExists(configuration, context);
+ if (!version)
+ return 0;
+
+ const auto checkpoint_filename = withPadding(version) + ".checkpoint.parquet";
+ const auto checkpoint_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / checkpoint_filename;
+
+ LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string());
+
+ auto buf = MetadataReadHelper::createReadBuffer(checkpoint_path, context, configuration);
+ auto format_settings = getFormatSettings(context);
+
+ /// Force nullable, because this parquet file for some reason does not have nullable
+ /// in parquet file metadata while the type are in fact nullable.
+ format_settings.schema_inference_make_columns_nullable = true;
+ auto columns = ParquetSchemaReader(*buf, format_settings).readSchema();
+
+ /// Read only columns that we need.
+ columns.filterColumns(NameSet{"add", "remove"});
+ Block header;
+ for (const auto & column : columns)
+ header.insert({column.type->createColumn(), column.type, column.name});
+
+ std::atomic<int> is_stopped{0};
+ auto arrow_file = asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
+
+ std::unique_ptr<parquet::arrow::FileReader> reader;
+ THROW_ARROW_NOT_OK(
+ parquet::arrow::OpenFile(
+ asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES),
+ arrow::default_memory_pool(),
+ &reader));
+
+ std::shared_ptr<arrow::Schema> schema;
+ THROW_ARROW_NOT_OK(reader->GetSchema(&schema));
+
+ ArrowColumnToCHColumn column_reader(
+ header, "Parquet",
+ format_settings.parquet.allow_missing_columns,
+ /* null_as_default */true,
+ /* case_insensitive_column_matching */false);
+
+ Chunk res;
+ std::shared_ptr<arrow::Table> table;
+ THROW_ARROW_NOT_OK(reader->ReadTable(&table));
+
+ column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows());
+ const auto & res_columns = res.getColumns();
+
+ if (res_columns.size() != 2)
+ {
+ throw Exception(
+ ErrorCodes::INCORRECT_DATA,
+ "Unexpected number of columns: {} (having: {}, expected: {})",
+ res_columns.size(), res.dumpStructure(), header.dumpStructure());
+ }
+
+ const auto * tuple_column = assert_cast<const ColumnTuple *>(res_columns[0].get());
+ const auto & nullable_column = assert_cast<const ColumnNullable &>(tuple_column->getColumn(0));
+ const auto & path_column = assert_cast<const ColumnString &>(nullable_column.getNestedColumn());
+ for (size_t i = 0; i < path_column.size(); ++i)
+ {
+ const auto filename = String(path_column.getDataAt(i));
+ if (filename.empty())
+ continue;
+ LOG_TEST(log, "Adding {}", filename);
+ const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / filename);
+ if (!inserted)
+ throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", filename);
+ }
+
+ return version;
+ }
+
+ Poco::Logger * log = &Poco::Logger::get("DeltaLakeMetadataParser");
+};
+
+
+template <typename Configuration, typename MetadataReadHelper>
+DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataParser() : impl(std::make_unique<Impl>())
+{
+}
+
+template <typename Configuration, typename MetadataReadHelper>
+Strings DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
+{
+ auto result = impl->processMetadataFiles(configuration, context);
+ return Strings(result.begin(), result.end());
+}
+
+template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser();
+template Strings DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
+ const StorageS3::Configuration & configuration, ContextPtr);
+}
+
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h
new file mode 100644
index 0000000000..df7276b90b
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/DeltaLakeMetadataParser.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <Interpreters/Context_fwd.h>
+#include <Core/Types.h>
+
+namespace DB
+{
+
+template <typename Configuration, typename MetadataReadHelper>
+struct DeltaLakeMetadataParser
+{
+public:
+ DeltaLakeMetadataParser<Configuration, MetadataReadHelper>();
+
+ Strings getFiles(const Configuration & configuration, ContextPtr context);
+
+private:
+ struct Impl;
+ std::shared_ptr<Impl> impl;
+};
+
+}
diff --git a/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp
new file mode 100644
index 0000000000..6857bb3500
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp
@@ -0,0 +1,116 @@
+#include <Storages/DataLakes/HudiMetadataParser.h>
+#include <Common/logger_useful.h>
+#include <ranges>
+#include <base/find_symbols.h>
+#include <Poco/String.h>
+#include "clickhouse_config.h"
+#include <filesystem>
+#include <IO/ReadHelpers.h>
+
+#if USE_AWS_S3
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <Storages/StorageS3.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+}
+
+template <typename Configuration, typename MetadataReadHelper>
+struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
+{
+ /**
+ * Useful links:
+ * - https://hudi.apache.org/tech-specs/
+ * - https://hudi.apache.org/docs/file_layouts/
+ */
+
+ /**
+ * Hudi tables store metadata files and data files.
+ * Metadata files are stored in .hoodie/metadata directory. Though unlike DeltaLake and Iceberg,
+ * metadata is not required in order to understand which files we need to read, moreover,
+ * for Hudi metadata does not always exist.
+ *
+ * There can be two types of data files
+ * 1. base files (columnar file formats like Apache Parquet/Orc)
+ * 2. log files
+ * Currently we support reading only `base files`.
+ * Data file name format:
+ * [File Id]_[File Write Token]_[Transaction timestamp].[File Extension]
+ *
+ * To find needed parts we need to find out latest part file for every file group for every partition.
+ * Explanation why:
+ * Hudi reads in and overwrites the entire table/partition with each update.
+ * Hudi controls the number of file groups under a single partition according to the
+ * hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group.
+ * Each file group is identified by File Id.
+ */
+ Strings processMetadataFiles(const Configuration & configuration)
+ {
+ auto * log = &Poco::Logger::get("HudiMetadataParser");
+
+ const auto keys = MetadataReadHelper::listFiles(configuration, "", Poco::toLower(configuration.format));
+
+ using Partition = std::string;
+ using FileID = std::string;
+ struct FileInfo
+ {
+ String key;
+ UInt64 timestamp = 0;
+ };
+ std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> data_files;
+
+ for (const auto & key : keys)
+ {
+ auto key_file = std::filesystem::path(key);
+ Strings file_parts;
+ const String stem = key_file.stem();
+ splitInto<'_'>(file_parts, stem);
+ if (file_parts.size() != 3)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key);
+
+ const auto partition = key_file.parent_path().stem();
+ const auto & file_id = file_parts[0];
+ const auto timestamp = parse<UInt64>(file_parts[2]);
+
+ auto & file_info = data_files[partition][file_id];
+ if (file_info.timestamp == 0 || file_info.timestamp < timestamp)
+ {
+ file_info.key = std::move(key);
+ file_info.timestamp = timestamp;
+ }
+ }
+
+ Strings result;
+ for (auto & [partition, partition_data] : data_files)
+ {
+ LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size());
+ for (auto & [file_id, file_data] : partition_data)
+ result.push_back(std::move(file_data.key));
+ }
+ return result;
+ }
+};
+
+
+template <typename Configuration, typename MetadataReadHelper>
+HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser() : impl(std::make_unique<Impl>())
+{
+}
+
+template <typename Configuration, typename MetadataReadHelper>
+Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr)
+{
+ return impl->processMetadataFiles(configuration);
+}
+
+template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser();
+template Strings HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
+ const StorageS3::Configuration & configuration, ContextPtr);
+
+}
+
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h
new file mode 100644
index 0000000000..6727ba2f71
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <Interpreters/Context_fwd.h>
+#include <Core/Types.h>
+
+namespace DB
+{
+
+template <typename Configuration, typename MetadataReadHelper>
+struct HudiMetadataParser
+{
+public:
+ HudiMetadataParser<Configuration, MetadataReadHelper>();
+
+ Strings getFiles(const Configuration & configuration, ContextPtr context);
+
+private:
+ struct Impl;
+ std::shared_ptr<Impl> impl;
+};
+
+}
diff --git a/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h b/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h
new file mode 100644
index 0000000000..d732ff928d
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h
@@ -0,0 +1,119 @@
+#pragma once
+
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3
+
+#include <Storages/IStorage.h>
+#include <Common/logger_useful.h>
+#include <Storages/StorageFactory.h>
+#include <Formats/FormatFactory.h>
+#include <filesystem>
+
+
+namespace DB
+{
+
+template <typename Storage, typename Name, typename MetadataParser>
+class IStorageDataLake : public Storage
+{
+public:
+ static constexpr auto name = Name::name;
+ using Configuration = typename Storage::Configuration;
+
+ template <class ...Args>
+ explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args)
+ : Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward<Args>(args)...)
+ , base_configuration(configuration_)
+ , log(&Poco::Logger::get(getName())) {}
+
+ String getName() const override { return name; }
+
+ static ColumnsDescription getTableStructureFromData(
+ Configuration & base_configuration,
+ const std::optional<FormatSettings> & format_settings,
+ ContextPtr local_context)
+ {
+ auto configuration = getConfigurationForDataRead(base_configuration, local_context);
+ return Storage::getTableStructureFromData(configuration, format_settings, local_context);
+ }
+
+ static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
+ {
+ return Storage::getConfiguration(engine_args, local_context, /* get_format_from_file */false);
+ }
+
+ Configuration updateConfigurationAndGetCopy(ContextPtr local_context) override
+ {
+ std::lock_guard lock(configuration_update_mutex);
+ updateConfigurationImpl(local_context);
+ return Storage::getConfiguration();
+ }
+
+ void updateConfiguration(ContextPtr local_context) override
+ {
+ std::lock_guard lock(configuration_update_mutex);
+ updateConfigurationImpl(local_context);
+ }
+
+private:
+ static Configuration getConfigurationForDataRead(
+ const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {})
+ {
+ auto configuration{base_configuration};
+ configuration.update(local_context);
+ configuration.static_configuration = true;
+
+ if (keys.empty())
+ configuration.keys = getDataFiles(configuration, local_context);
+ else
+ configuration.keys = keys;
+
+ LOG_TRACE(
+ &Poco::Logger::get("DataLake"),
+ "New configuration path: {}, keys: {}",
+ configuration.getPath(), fmt::join(configuration.keys, ", "));
+
+ configuration.connect(local_context);
+ return configuration;
+ }
+
+ static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context)
+ {
+ return MetadataParser().getFiles(configuration, local_context);
+ }
+
+ void updateConfigurationImpl(ContextPtr local_context)
+ {
+ const bool updated = base_configuration.update(local_context);
+ auto new_keys = getDataFiles(base_configuration, local_context);
+
+ if (!updated && new_keys == Storage::getConfiguration().keys)
+ return;
+
+ Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));
+ }
+
+ Configuration base_configuration;
+ std::mutex configuration_update_mutex;
+ Poco::Logger * log;
+};
+
+
+template <typename DataLake>
+static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
+{
+ auto configuration = DataLake::getConfiguration(args.engine_args, args.getLocalContext());
+
+ /// Data lakes use parquet format, no need for schema inference.
+ if (configuration.format == "auto")
+ configuration.format = "Parquet";
+
+ return std::make_shared<DataLake>(
+ configuration, args.getContext(), args.table_id, args.columns, args.constraints,
+ args.comment, getFormatSettings(args.getContext()));
+}
+
+}
+
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp
new file mode 100644
index 0000000000..a5c0dcc434
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.cpp
@@ -0,0 +1,361 @@
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3 && USE_AVRO
+
+#include <Common/logger_useful.h>
+
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnTuple.h>
+#include <Columns/IColumn.h>
+#include <Storages/DataLakes/IcebergMetadataParser.h>
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <Storages/StorageS3.h>
+#include <Processors/Formats/Impl/AvroRowInputFormat.h>
+#include <Formats/FormatFactory.h>
+#include <IO/ReadHelpers.h>
+
+#include <Poco/JSON/Array.h>
+#include <Poco/JSON/Object.h>
+#include <Poco/JSON/Parser.h>
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int FILE_DOESNT_EXIST;
+ extern const int ILLEGAL_COLUMN;
+ extern const int BAD_ARGUMENTS;
+}
+
+template <typename Configuration, typename MetadataReadHelper>
+struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
+{
+ Poco::Logger * log = &Poco::Logger::get("IcebergMetadataParser");
+
+ /**
+ * Useful links:
+ * - https://iceberg.apache.org/spec/
+ */
+
+ /**
+ * Iceberg has two format versions, currently we support only format V1.
+ *
+ * Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
+ * Metadata file - json file.
+ * Manifest list – a file that lists manifest files; one per snapshot.
+ * Manifest file – a file that lists data or delete files; a subset of a snapshot.
+ * All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
+ */
+
+ static constexpr auto metadata_directory = "metadata";
+
+ /**
+ * Each version of table metadata is stored in a `metadata` directory and
+ * has format: v<V>.metadata.json, where V - metadata version.
+ */
+ String getMetadataFile(const Configuration & configuration)
+ {
+ static constexpr auto metadata_file_suffix = ".metadata.json";
+
+ const auto metadata_files = MetadataReadHelper::listFiles(configuration, metadata_directory, metadata_file_suffix);
+ if (metadata_files.empty())
+ {
+ throw Exception(
+ ErrorCodes::FILE_DOESNT_EXIST,
+ "The metadata file for Iceberg table with path {} doesn't exist",
+ configuration.url.key);
+ }
+
+ /// Get the latest version of metadata file: v<V>.metadata.json
+ return *std::max_element(metadata_files.begin(), metadata_files.end());
+ }
+
+ /**
+ * In order to find out which data files to read, we need to find the `manifest list`
+ * which corresponds to the latest snapshot. We find it by checking a list of snapshots
+ * in metadata's "snapshots" section.
+ *
+ * Example of metadata.json file.
+ *
+ * {
+ * "format-version" : 1,
+ * "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
+ * "location" : "/iceberg_data/db/table_name",
+ * "last-updated-ms" : 1680206743150,
+ * "last-column-id" : 2,
+ * "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
+ * "current-schema-id" : 0,
+ * "schemas" : [ ],
+ * ...
+ * "current-snapshot-id" : 2819310504515118887,
+ * "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
+ * "snapshots" : [ {
+ * "snapshot-id" : 2819310504515118887,
+ * "timestamp-ms" : 1680206743150,
+ * "summary" : {
+ * "operation" : "append", "spark.app.id" : "local-1680206733239",
+ * "added-data-files" : "1", "added-records" : "100",
+ * "added-files-size" : "1070", "changed-partition-count" : "1",
+ * "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
+ * "total-position-deletes" : "0", "total-equality-deletes" : "0"
+ * },
+ * "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
+ * "schema-id" : 0
+ * } ],
+ * "statistics" : [ ],
+ * "snapshot-log" : [ ... ],
+ * "metadata-log" : [ ]
+ * }
+ */
+ struct Metadata
+ {
+ int format_version;
+ String manifest_list;
+ Strings manifest_files;
+ };
+ Metadata processMetadataFile(const Configuration & configuration, ContextPtr context)
+ {
+ const auto metadata_file_path = getMetadataFile(configuration);
+ auto buf = MetadataReadHelper::createReadBuffer(metadata_file_path, context, configuration);
+ String json_str;
+ readJSONObjectPossiblyInvalid(json_str, *buf);
+
+ Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
+ Poco::Dynamic::Var json = parser.parse(json_str);
+ Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
+
+ Metadata result;
+ result.format_version = object->getValue<int>("format-version");
+
+ auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
+ auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
+
+ for (size_t i = 0; i < snapshots->size(); ++i)
+ {
+ const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
+ if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
+ {
+ const auto path = snapshot->getValue<String>("manifest-list");
+ result.manifest_list = std::filesystem::path(configuration.url.key) / metadata_directory / std::filesystem::path(path).filename();
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Manifest list has Avro as default format (and currently we support only Avro).
+ * Manifest list file format of manifest list is: snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro
+ *
+ * `manifest list` has the following contents:
+ * ┌─manifest_path────────────────────────────────────────────────────────────────────────────────────────┬─manifest_length─┬─partition_spec_id─┬───added_snapshot_id─┬─added_data_files_count─┬─existing_data_files_count─┬─deleted_data_files_count─┬─partitions─┬─added_rows_count─┬─existing_rows_count─┬─deleted_rows_count─┐
+ * │ /iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro │ 5813 │ 0 │ 2819310504515118887 │ 1 │ 0 │ 0 │ [] │ 100 │ 0 │ 0 │
+ * └──────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┴───────────────────┴─────────────────────┴────────────────────────┴───────────────────────────┴──────────────────────────┴────────────┴──────────────────┴─────────────────────┴────────────────────┘
+ */
+ void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context)
+ {
+ auto buf = MetadataReadHelper::createReadBuffer(metadata.manifest_list, context, configuration);
+ auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buf));
+
+ auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
+ Block header{{data_type->createColumn(), data_type, "manifest_path"}};
+ auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
+ auto & col = columns.at(0);
+
+ if (col->getDataType() != TypeIndex::String)
+ {
+ throw Exception(
+ ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `manifest_path` field should be String type, got {}",
+ col->getFamilyName());
+ }
+
+ const auto * col_str = typeid_cast<ColumnString *>(col.get());
+ for (size_t i = 0; i < col_str->size(); ++i)
+ {
+ const auto file_path = col_str->getDataAt(i).toView();
+ const auto filename = std::filesystem::path(file_path).filename();
+ metadata.manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / metadata_directory / filename);
+ }
+ }
+
+ /**
+ * Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
+ *
+ * `manifest file` is different in format version V1 and V2 and has the following contents:
+ * v1 v2
+ * status req req
+ * snapshot_id req opt
+ * sequence_number opt
+ * file_sequence_number opt
+ * data_file req req
+ * Example format version V1:
+ * ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
+ * │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │
+ * └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
+ * Example format version V2:
+ * ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
+ * │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │
+ * └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
+ * In case of partitioned data we'll have extra directory partition=value:
+ * ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
+ * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │
+ * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │
+ * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │
+ * └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
+ */
+ Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context)
+ {
+ NameSet keys;
+ for (const auto & manifest_file : metadata.manifest_files)
+ {
+ auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
+ auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
+
+ avro::NodePtr root_node = file_reader->dataSchema().root();
+ size_t leaves_num = root_node->leaves();
+ size_t expected_min_num = metadata.format_version == 1 ? 3 : 2;
+ if (leaves_num < expected_min_num)
+ {
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "Unexpected number of columns {}. Expected at least {}",
+ root_node->leaves(), expected_min_num);
+ }
+
+ avro::NodePtr status_node = root_node->leafAt(0);
+ if (status_node->type() != avro::Type::AVRO_INT)
+ {
+ throw Exception(
+ ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `status` field should be Int type, got {}",
+ magic_enum::enum_name(status_node->type()));
+ }
+
+ avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
+ if (data_file_node->type() != avro::Type::AVRO_RECORD)
+ {
+ throw Exception(
+ ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
+ magic_enum::enum_name(data_file_node->type()));
+ }
+
+ auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
+ auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
+ Block header{
+ {status_col_data_type->createColumn(), status_col_data_type, "status"},
+ {data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
+
+ const auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
+ if (columns.size() != 2)
+ {
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "Unexpected number of columns. Expected 2, got {}", columns.size());
+ }
+
+ if (columns.at(0)->getDataType() != TypeIndex::Int32)
+ {
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `status` field should be Int32 type, got {}",
+ columns.at(0)->getFamilyName());
+ }
+ if (columns.at(1)->getDataType() != TypeIndex::Tuple)
+ {
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
+ columns.at(1)->getFamilyName());
+ }
+
+ const auto status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
+ const auto data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());
+
+ if (status_int_column->size() != data_file_tuple_column->size())
+ {
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
+ status_int_column->size(), data_file_tuple_column->size());
+ }
+
+ const auto * data_file_name_column = metadata.format_version == 1
+ ? data_file_tuple_column->getColumnPtr(0).get()
+ : data_file_tuple_column->getColumnPtr(1).get();
+
+ if (data_file_name_column->getDataType() != TypeIndex::String)
+ {
+ throw Exception(ErrorCodes::ILLEGAL_COLUMN,
+ "The parsed column from Avro file of `file_path` field should be String type, got {}",
+ data_file_name_column->getFamilyName());
+ }
+ auto file_name_str_column = assert_cast<const ColumnString *>(data_file_name_column);
+
+ for (size_t i = 0; i < status_int_column->size(); ++i)
+ {
+ const auto status = status_int_column->getInt(i);
+ const auto data_path = std::string(file_name_str_column->getDataAt(i).toView());
+ const auto pos = data_path.find(configuration.url.key);
+ const auto file_path = data_path.substr(pos);
+ if (pos == std::string::npos)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);
+
+ if (status == 2)
+ {
+ LOG_TEST(log, "Processing delete file for path: {}", file_path);
+ chassert(!keys.contains(file_path));
+ }
+ else
+ keys.insert(file_path);
+ }
+ }
+
+ return std::vector<std::string>(keys.begin(), keys.end());
+ }
+
+ MutableColumns parseAvro(
+ avro::DataFileReaderBase & file_reader,
+ const Block & header,
+ const FormatSettings & settings)
+ {
+ auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
+ MutableColumns columns = header.cloneEmptyColumns();
+
+ file_reader.init();
+ RowReadExtension ext;
+ while (file_reader.hasMore())
+ {
+ file_reader.decr();
+ deserializer->deserializeRow(columns, file_reader.decoder(), ext);
+ }
+ return columns;
+ }
+
+};
+
+
+template <typename Configuration, typename MetadataReadHelper>
+IcebergMetadataParser<Configuration, MetadataReadHelper>::IcebergMetadataParser() : impl(std::make_unique<Impl>())
+{
+}
+
+template <typename Configuration, typename MetadataReadHelper>
+Strings IcebergMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
+{
+ auto metadata = impl->processMetadataFile(configuration, context);
+
+ /// When table first created and does not have any data
+ if (metadata.manifest_list.empty())
+ return {};
+
+ impl->processManifestList(metadata, configuration, context);
+ return impl->getFilesForRead(metadata, configuration, context);
+}
+
+
+template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser();
+template Strings IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(const StorageS3::Configuration & configuration, ContextPtr);
+
+}
+
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h
new file mode 100644
index 0000000000..226b1bd8b6
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/IcebergMetadataParser.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
+
+#include <Interpreters/Context_fwd.h>
+#include <Core/Types.h>
+
+namespace DB
+{
+
+template <typename Configuration, typename MetadataReadHelper>
+struct IcebergMetadataParser
+{
+public:
+ IcebergMetadataParser<Configuration, MetadataReadHelper>();
+
+ Strings getFiles(const Configuration & configuration, ContextPtr context);
+
+private:
+ struct Impl;
+ std::shared_ptr<Impl> impl;
+};
+
+}
+
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp
new file mode 100644
index 0000000000..5535b08e54
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.cpp
@@ -0,0 +1,86 @@
+#include <clickhouse_config.h>
+
+#if USE_AWS_S3
+
+#include <IO/ReadBufferFromS3.h>
+#include <IO/S3/Requests.h>
+#include <Interpreters/Context.h>
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int S3_ERROR;
+}
+
+std::shared_ptr<ReadBuffer>
+S3DataLakeMetadataReadHelper::createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration)
+{
+ S3Settings::RequestSettings request_settings;
+ request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
+ return std::make_shared<ReadBufferFromS3>(
+ base_configuration.client,
+ base_configuration.url.bucket,
+ key,
+ base_configuration.url.version_id,
+ request_settings,
+ context->getReadSettings());
+}
+
+bool S3DataLakeMetadataReadHelper::exists(const String & key, const StorageS3::Configuration & configuration)
+{
+ return S3::objectExists(*configuration.client, configuration.url.bucket, key);
+}
+
+std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
+ const StorageS3::Configuration & base_configuration, const String & prefix, const String & suffix)
+{
+ const auto & table_path = base_configuration.url.key;
+ const auto & bucket = base_configuration.url.bucket;
+ const auto & client = base_configuration.client;
+
+ std::vector<String> res;
+ S3::ListObjectsV2Request request;
+ Aws::S3::Model::ListObjectsV2Outcome outcome;
+
+ request.SetBucket(bucket);
+ request.SetPrefix(std::filesystem::path(table_path) / prefix);
+
+ bool is_finished{false};
+ while (!is_finished)
+ {
+ outcome = client->ListObjectsV2(request);
+ if (!outcome.IsSuccess())
+ throw S3Exception(
+ outcome.GetError().GetErrorType(),
+ "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
+ quoteString(bucket),
+ quoteString(base_configuration.url.key),
+ backQuote(outcome.GetError().GetExceptionName()),
+ quoteString(outcome.GetError().GetMessage()));
+
+ const auto & result_batch = outcome.GetResult().GetContents();
+ for (const auto & obj : result_batch)
+ {
+ const auto & filename = obj.GetKey();
+ if (filename.ends_with(suffix))
+ res.push_back(filename);
+ }
+
+ request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
+ is_finished = !outcome.GetResult().GetIsTruncated();
+ }
+
+ LOG_TRACE(&Poco::Logger::get("S3DataLakeMetadataReadHelper"), "Listed {} files", res.size());
+
+ return res;
+}
+
+}
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h
new file mode 100644
index 0000000000..31f94ff9f7
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/S3MetadataReader.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <clickhouse_config.h>
+
+#if USE_AWS_S3
+
+#include <Storages/StorageS3.h>
+
+class ReadBuffer;
+
+namespace DB
+{
+
+struct S3DataLakeMetadataReadHelper
+{
+ static std::shared_ptr<ReadBuffer> createReadBuffer(
+ const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration);
+
+ static bool exists(const String & key, const StorageS3::Configuration & configuration);
+
+ static std::vector<String> listFiles(const StorageS3::Configuration & configuration, const std::string & prefix = "", const std::string & suffix = "");
+};
+}
+
+#endif
diff --git a/contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h b/contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h
new file mode 100644
index 0000000000..6854dd42b3
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/StorageDeltaLake.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <Storages/IStorage.h>
+#include <Storages/DataLakes/IStorageDataLake.h>
+#include <Storages/DataLakes/DeltaLakeMetadataParser.h>
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <Storages/StorageS3.h>
+#endif
+
+namespace DB
+{
+
+struct StorageDeltaLakeName
+{
+ static constexpr auto name = "DeltaLake";
+};
+
+#if USE_AWS_S3 && USE_PARQUET
+using StorageDeltaLakeS3 = IStorageDataLake<StorageS3, StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
+#endif
+
+}
diff --git a/contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h b/contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h
new file mode 100644
index 0000000000..dd17b68767
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/StorageHudi.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <Storages/IStorage.h>
+#include <Storages/DataLakes/IStorageDataLake.h>
+#include <Storages/DataLakes/HudiMetadataParser.h>
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <Storages/StorageS3.h>
+#endif
+
+namespace DB
+{
+
+struct StorageHudiName
+{
+ static constexpr auto name = "Hudi";
+};
+
+#if USE_AWS_S3
+using StorageHudiS3 = IStorageDataLake<StorageS3, StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
+#endif
+
+}
diff --git a/contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h b/contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h
new file mode 100644
index 0000000000..98e75c0b5e
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/StorageIceberg.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <Storages/IStorage.h>
+#include <Storages/DataLakes/IStorageDataLake.h>
+#include <Storages/DataLakes/IcebergMetadataParser.h>
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3 && USE_AVRO
+#include <Storages/DataLakes/S3MetadataReader.h>
+#include <Storages/StorageS3.h>
+#endif
+
+namespace DB
+{
+
+struct StorageIcebergName
+{
+ static constexpr auto name = "Iceberg";
+};
+
+#if USE_AWS_S3 && USE_AVRO
+using StorageIcebergS3 = IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
+#endif
+
+}
diff --git a/contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp b/contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp
new file mode 100644
index 0000000000..b59f847e99
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/DataLakes/registerDataLakes.cpp
@@ -0,0 +1,50 @@
+#include <Storages/DataLakes/IStorageDataLake.h>
+#include "clickhouse_config.h"
+
+#if USE_AWS_S3
+
+#include <Storages/DataLakes/StorageDeltaLake.h>
+#include <Storages/DataLakes/StorageIceberg.h>
+#include <Storages/DataLakes/StorageHudi.h>
+
+
+namespace DB
+{
+
+#define REGISTER_DATA_LAKE_STORAGE(STORAGE, NAME) \
+ factory.registerStorage( \
+ NAME, \
+ [](const StorageFactory::Arguments & args) \
+ { \
+ return createDataLakeStorage<STORAGE>(args);\
+ }, \
+ { \
+ .supports_settings = false, \
+ .supports_schema_inference = true, \
+ .source_access_type = AccessType::S3, \
+ });
+
+#if USE_PARQUET
+void registerStorageDeltaLake(StorageFactory & factory)
+{
+ REGISTER_DATA_LAKE_STORAGE(StorageDeltaLakeS3, StorageDeltaLakeName::name)
+}
+#endif
+
+#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
+
+void registerStorageIceberg(StorageFactory & factory)
+{
+ REGISTER_DATA_LAKE_STORAGE(StorageIcebergS3, StorageIcebergName::name)
+}
+
+#endif
+
+void registerStorageHudi(StorageFactory & factory)
+{
+ REGISTER_DATA_LAKE_STORAGE(StorageHudiS3, StorageHudiName::name)
+}
+
+}
+
+#endif