aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/DataLakes/IStorageDataLake.h
blob: d732ff928d5ecd2ae4defffe00e6ac87f488a587 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#pragma once

#include "clickhouse_config.h"

#if USE_AWS_S3

#include <Storages/IStorage.h>
#include <Common/logger_useful.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include <filesystem>


namespace DB
{

template <typename Storage, typename Name, typename MetadataParser>
class IStorageDataLake : public Storage
{
public:
    static constexpr auto name = Name::name;
    using Configuration = typename Storage::Configuration;

    template <class ...Args>
    explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args)
        : Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward<Args>(args)...)
        , base_configuration(configuration_)
        , log(&Poco::Logger::get(getName())) {}

    String getName() const override { return name; }

    static ColumnsDescription getTableStructureFromData(
        Configuration & base_configuration,
        const std::optional<FormatSettings> & format_settings,
        ContextPtr local_context)
    {
        auto configuration = getConfigurationForDataRead(base_configuration, local_context);
        return Storage::getTableStructureFromData(configuration, format_settings, local_context);
    }

    static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
    {
        return Storage::getConfiguration(engine_args, local_context, /* get_format_from_file */false);
    }

    Configuration updateConfigurationAndGetCopy(ContextPtr local_context) override
    {
        std::lock_guard lock(configuration_update_mutex);
        updateConfigurationImpl(local_context);
        return Storage::getConfiguration();
    }

    void updateConfiguration(ContextPtr local_context) override
    {
        std::lock_guard lock(configuration_update_mutex);
        updateConfigurationImpl(local_context);
    }

private:
    static Configuration getConfigurationForDataRead(
        const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {})
    {
        auto configuration{base_configuration};
        configuration.update(local_context);
        configuration.static_configuration = true;

        if (keys.empty())
            configuration.keys = getDataFiles(configuration, local_context);
        else
            configuration.keys = keys;

        LOG_TRACE(
            &Poco::Logger::get("DataLake"),
            "New configuration path: {}, keys: {}",
            configuration.getPath(), fmt::join(configuration.keys, ", "));

        configuration.connect(local_context);
        return configuration;
    }

    static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context)
    {
        return MetadataParser().getFiles(configuration, local_context);
    }

    void updateConfigurationImpl(ContextPtr local_context)
    {
        const bool updated = base_configuration.update(local_context);
        auto new_keys = getDataFiles(base_configuration, local_context);

        if (!updated && new_keys == Storage::getConfiguration().keys)
            return;

        Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));
    }

    Configuration base_configuration;
    std::mutex configuration_update_mutex;
    Poco::Logger * log;
};


template <typename DataLake>
static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
{
    auto configuration = DataLake::getConfiguration(args.engine_args, args.getLocalContext());

    /// Data lakes use parquet format, no need for schema inference.
    if (configuration.format == "auto")
        configuration.format = "Parquet";

    return std::make_shared<DataLake>(
        configuration, args.getContext(), args.table_id, args.columns, args.constraints,
        args.comment, getFormatSettings(args.getContext()));
}

}

#endif