1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#include <Storages/DataLakes/HudiMetadataParser.h>
#include <Common/logger_useful.h>
#include <ranges>
#include <base/find_symbols.h>
#include <Poco/String.h>
#include "clickhouse_config.h"
#include <filesystem>
#include <IO/ReadHelpers.h>
#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename Configuration, typename MetadataReadHelper>
struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
{
/**
* Useful links:
* - https://hudi.apache.org/tech-specs/
* - https://hudi.apache.org/docs/file_layouts/
*/
/**
* Hudi tables store metadata files and data files.
* Metadata files are stored in .hoodie/metadata directory. Though unlike DeltaLake and Iceberg,
* metadata is not required in order to understand which files we need to read, moreover,
* for Hudi metadata does not always exist.
*
* There can be two types of data files
* 1. base files (columnar file formats like Apache Parquet/Orc)
* 2. log files
* Currently we support reading only `base files`.
* Data file name format:
* [File Id]_[File Write Token]_[Transaction timestamp].[File Extension]
*
* To find needed parts we need to find out latest part file for every file group for every partition.
* Explanation why:
* Hudi reads in and overwrites the entire table/partition with each update.
* Hudi controls the number of file groups under a single partition according to the
* hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group.
* Each file group is identified by File Id.
*/
Strings processMetadataFiles(const Configuration & configuration)
{
auto * log = &Poco::Logger::get("HudiMetadataParser");
const auto keys = MetadataReadHelper::listFiles(configuration, "", Poco::toLower(configuration.format));
using Partition = std::string;
using FileID = std::string;
struct FileInfo
{
String key;
UInt64 timestamp = 0;
};
std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> data_files;
for (const auto & key : keys)
{
auto key_file = std::filesystem::path(key);
Strings file_parts;
const String stem = key_file.stem();
splitInto<'_'>(file_parts, stem);
if (file_parts.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key);
const auto partition = key_file.parent_path().stem();
const auto & file_id = file_parts[0];
const auto timestamp = parse<UInt64>(file_parts[2]);
auto & file_info = data_files[partition][file_id];
if (file_info.timestamp == 0 || file_info.timestamp < timestamp)
{
file_info.key = std::move(key);
file_info.timestamp = timestamp;
}
}
Strings result;
for (auto & [partition, partition_data] : data_files)
{
LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size());
for (auto & [file_id, file_data] : partition_data)
result.push_back(std::move(file_data.key));
}
return result;
}
};
template <typename Configuration, typename MetadataReadHelper>
HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser() : impl(std::make_unique<Impl>())
{
}
template <typename Configuration, typename MetadataReadHelper>
Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr)
{
return impl->processMetadataFiles(configuration);
}
template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser();
template Strings HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
const StorageS3::Configuration & configuration, ContextPtr);
}
#endif
|