aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/DataLakes/HudiMetadataParser.cpp
blob: 6857bb35001b7faa2034147f9eeadc94961798f0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <Storages/DataLakes/HudiMetadataParser.h>
#include <Common/logger_useful.h>
#include <ranges>
#include <base/find_symbols.h>
#include <Poco/String.h>
#include "clickhouse_config.h"
#include <filesystem>
#include <IO/ReadHelpers.h>

#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

template <typename Configuration, typename MetadataReadHelper>
struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
{
    /**
     * Useful links:
     * - https://hudi.apache.org/tech-specs/
     * - https://hudi.apache.org/docs/file_layouts/
     */

    /**
      * Hudi tables store metadata files and data files.
      * Metadata files are stored in .hoodie/metadata directory. Though unlike DeltaLake and Iceberg,
      * metadata is not required in order to understand which files we need to read, moreover,
      * for Hudi metadata does not always exist.
      *
      * There can be two types of data files
      * 1. base files (columnar file formats like Apache Parquet/Orc)
      * 2. log files
      * Currently we support reading only `base files`.
      * Data file name format:
      * [File Id]_[File Write Token]_[Transaction timestamp].[File Extension]
      *
      * To find needed parts we need to find out latest part file for every file group for every partition.
      * Explanation why:
      *    Hudi reads in and overwrites the entire table/partition with each update.
      *    Hudi controls the number of file groups under a single partition according to the
      *    hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group.
      *    Each file group is identified by File Id.
      */
    Strings processMetadataFiles(const Configuration & configuration)
    {
        auto * log = &Poco::Logger::get("HudiMetadataParser");

        const auto keys = MetadataReadHelper::listFiles(configuration, "", Poco::toLower(configuration.format));

        using Partition = std::string;
        using FileID = std::string;
        struct FileInfo
        {
            String key;
            UInt64 timestamp = 0;
        };
        std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> data_files;

        for (const auto & key : keys)
        {
            auto key_file = std::filesystem::path(key);
            Strings file_parts;
            const String stem = key_file.stem();
            splitInto<'_'>(file_parts, stem);
            if (file_parts.size() != 3)
                throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key);

            const auto partition = key_file.parent_path().stem();
            const auto & file_id = file_parts[0];
            const auto timestamp = parse<UInt64>(file_parts[2]);

            auto & file_info = data_files[partition][file_id];
            if (file_info.timestamp == 0 || file_info.timestamp < timestamp)
            {
                file_info.key = std::move(key);
                file_info.timestamp = timestamp;
            }
        }

        Strings result;
        for (auto & [partition, partition_data] : data_files)
        {
            LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size());
            for (auto & [file_id, file_data] : partition_data)
                result.push_back(std::move(file_data.key));
        }
        return result;
    }
};


template <typename Configuration, typename MetadataReadHelper>
HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser() : impl(std::make_unique<Impl>())
{
}

template <typename Configuration, typename MetadataReadHelper>
Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr)
{
    return impl->processMetadataFiles(configuration);
}

template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser();
template Strings HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
    const StorageS3::Configuration & configuration, ContextPtr);

}

#endif