aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/FileLog/StorageFileLog.h
blob: c0c5ac904b53b854f20f2f7a9118447740b75d0f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#pragma once

#include <Disks/IDisk.h>

#include <Storages/FileLog/Buffer_fwd.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/FileLogSettings.h>

#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Common/SettingsChanges.h>

#include <atomic>
#include <condition_variable>
#include <filesystem>
#include <fstream>
#include <mutex>
#include <optional>

namespace DB
{
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

class FileLogDirectoryWatcher;

class StorageFileLog final : public IStorage, WithContext
{
public:
    StorageFileLog(
        const StorageID & table_id_,
        ContextPtr context_,
        const ColumnsDescription & columns_,
        const String & path_,
        const String & metadata_base_path_,
        const String & format_name_,
        std::unique_ptr<FileLogSettings> settings,
        const String & comment,
        bool attach);

    using Files = std::vector<String>;

    std::string getName() const override { return "FileLog"; }

    bool noPushingToViews() const override { return true; }

    void startup() override;
    void shutdown() override;

    Pipe read(
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams) override;

    void drop() override;

    const auto & getFormatName() const { return format_name; }

    enum class FileStatus
    {
        OPEN, /// First time open file after table start up.
        NO_CHANGE,
        UPDATED,
        REMOVED,
    };

    struct FileContext
    {
        FileStatus status = FileStatus::OPEN;
        UInt64 inode{};
        std::optional<std::ifstream> reader = std::nullopt;
    };

    struct FileMeta
    {
        String file_name;
        UInt64 last_writen_position = 0;
        UInt64 last_open_end = 0;
        bool operator!() const { return file_name.empty(); }
    };

    using InodeToFileMeta = std::unordered_map<UInt64, FileMeta>;
    using FileNameToContext = std::unordered_map<String, FileContext>;

    struct FileInfos
    {
        InodeToFileMeta meta_by_inode;
        FileNameToContext context_by_name;
        /// File names without path.
        Names file_names;
    };

    auto & getFileInfos() { return file_infos; }

    String getFullMetaPath(const String & file_name) const { return std::filesystem::path(metadata_base_path) / file_name; }
    String getFullDataPath(const String & file_name) const { return std::filesystem::path(root_data_path) / file_name; }

    NamesAndTypesList getVirtuals() const override;

    static Names getVirtualColumnNames();

    static UInt64 getInode(const String & file_name);

    void openFilesAndSetPos();

    /// Used in FileLogSource when finish generating all blocks.
    /// Each stream responsible for close its files and store meta.
    void closeFilesAndStoreMeta(size_t start, size_t end);

    /// Used in FileLogSource after generating every block
    void storeMetas(size_t start, size_t end);

    static void assertStreamGood(const std::ifstream & reader);

    template <typename K, typename V>
    static V & findInMap(std::unordered_map<K, V> & map, const K & key)
    {
        if (auto it = map.find(key); it != map.end())
            return it->second;
        else
            throw Exception(ErrorCodes::LOGICAL_ERROR, "The key {} doesn't exist.", key);
    }

    void increaseStreams();
    void reduceStreams();

    void wakeUp();

    const auto & getFileLogSettings() const { return filelog_settings; }

private:
    std::unique_ptr<FileLogSettings> filelog_settings;

    const String path;
    bool path_is_directory = true;

    /// If path argument of the table is a regular file, it equals to user_files_path
    /// otherwise, it equals to user_files_path/ + path_argument/, e.g. path
    String root_data_path;
    String metadata_base_path;

    FileInfos file_infos;

    const String format_name;
    Poco::Logger * log;

    DiskPtr disk;

    uint64_t milliseconds_to_wait;

    /// In order to avoid data race, using a naive trick to forbid execute two select
    /// simultaneously, although read is not useful in this engine. Using an atomic
    /// variable to records current unfinishing streams, then if have unfinishing streams,
    /// later select should forbid to execute.
    std::atomic<int> running_streams = 0;

    std::mutex mutex;
    bool has_new_events = false;
    std::condition_variable cv;

    std::atomic<bool> mv_attached = false;

    std::mutex file_infos_mutex;

    struct TaskContext
    {
        BackgroundSchedulePool::TaskHolder holder;
        std::atomic<bool> stream_cancelled {false};
        explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_))
        {
        }
    };
    std::shared_ptr<TaskContext> task;

    std::unique_ptr<FileLogDirectoryWatcher> directory_watch = nullptr;

    void loadFiles();

    void loadMetaFiles(bool attach);

    void threadFunc();

    size_t getPollMaxBatchSize() const;
    size_t getMaxBlockSize() const;
    size_t getPollTimeoutMillisecond() const;

    bool streamToViews();
    bool checkDependencies(const StorageID & table_id);

    bool updateFileInfos();

    size_t getTableDependentCount() const;

    /// Used in shutdown()
    void serialize() const;
    /// Used in FileSource closeFileAndStoreMeta(file_name).
    void serialize(UInt64 inode, const FileMeta & file_meta) const;

    void deserialize();
    void checkOffsetIsValid(const String & filename, UInt64 offset) const;

    struct ReadMetadataResult
    {
        FileMeta metadata;
        UInt64 inode = 0;
    };
    ReadMetadataResult readMetadata(const String & filename) const;
};

}