aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/System/StorageSystemDDLWorkerQueue.cpp
blob: bae7a266dcd35347b5392655b715600aa913e2ce (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#include <Storages/System/StorageSystemDDLWorkerQueue.h>
#include <Interpreters/DDLTask.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/Context.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>


namespace fs = std::filesystem;


namespace DB
{

enum class Status
{
    INACTIVE,
    ACTIVE,
    FINISHED,
    REMOVING,
    UNKNOWN,
};

using GetResponseFuture = std::future<Coordination::GetResponse>;
using ListResponseFuture = std::future<Coordination::ListResponse>;
using GetResponseFutures = std::vector<GetResponseFuture>;
using ListResponseFutures = std::vector<ListResponseFuture>;

static std::vector<std::pair<String, Int8>> getStatusEnumsAndValues()
{
    return std::vector<std::pair<String, Int8>>{
        {"Inactive",    static_cast<Int8>(Status::INACTIVE)},
        {"Active",      static_cast<Int8>(Status::ACTIVE)},
        {"Finished",    static_cast<Int8>(Status::FINISHED)},
        {"Removing",    static_cast<Int8>(Status::REMOVING)},
        {"Unknown",     static_cast<Int8>(Status::UNKNOWN)},
    };
}

NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{
    return {
        {"entry",               std::make_shared<DataTypeString>()},
        {"entry_version",       std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>())},
        {"initiator_host",      std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
        {"initiator_port",      std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>())},
        {"cluster",             std::make_shared<DataTypeString>()},
        {"query",               std::make_shared<DataTypeString>()},
        {"settings",            std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>())},
        {"query_create_time",   std::make_shared<DataTypeDateTime>()},

        {"host",                std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
        {"port",                std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>())},
        {"status",              std::make_shared<DataTypeNullable>(std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues()))},
        {"exception_code",      std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>())},
        {"exception_text",      std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
        {"query_finish_time",   std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
        {"query_duration_ms",   std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
    };
}

static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task)
{
    const char * begin = task.entry.query.data();
    const char * end = begin + task.entry.query.size();
    const auto & settings = context->getSettingsRef();

    String description = fmt::format("from {}", task.entry_path);
    ParserQuery parser_query(end, settings.allow_settings_after_format_in_insert);
    ASTPtr query = parseQuery(parser_query, begin, end, description,
                              settings.max_query_size,
                              settings.max_parser_depth);

    String cluster_name;
    if (const auto * query_on_cluster = dynamic_cast<const ASTQueryWithOnCluster *>(query.get()))
        cluster_name = query_on_cluster->cluster;

    return cluster_name;
}

static void fillCommonColumns(MutableColumns & res_columns, size_t & col, const DDLTask & task, const String & cluster_name, UInt64 query_create_time_ms)
{
    /// entry
    res_columns[col++]->insert(task.entry_name);

    /// entry_version
    res_columns[col++]->insert(task.entry.version);

    if (task.entry.initiator.empty())
    {
        /// initiator_host
        res_columns[col++]->insert(Field{});
        /// initiator_port
        res_columns[col++]->insert(Field{});
    }
    else
    {
        HostID initiator = HostID::fromString(task.entry.initiator);
        /// initiator_host
        res_columns[col++]->insert(initiator.host_name);
        /// initiator_port
        res_columns[col++]->insert(initiator.port);
    }

    /// cluster
    res_columns[col++]->insert(cluster_name);

    /// query
    res_columns[col++]->insert(task.entry.query);

    Map settings_map;
    if (task.entry.settings)
    {
        for (const auto & change : *task.entry.settings)
        {
            Tuple pair;
            pair.push_back(change.name);
            pair.push_back(toString(change.value));
            settings_map.push_back(std::move(pair));
        }
    }

    /// settings
    res_columns[col++]->insert(settings_map);

    res_columns[col++]->insert(static_cast<UInt64>(query_create_time_ms / 1000));
}

static void repeatValuesInCommonColumns(MutableColumns & res_columns, size_t num_filled_columns)
{
    if (res_columns[num_filled_columns - 1]->size() == res_columns[num_filled_columns]->size() + 1)
    {
        /// Common columns are already filled
        return;
    }

    /// Copy values from previous row
    assert(res_columns[num_filled_columns - 1]->size() == res_columns[num_filled_columns]->size());
    for (size_t filled_col = 0; filled_col < num_filled_columns; ++filled_col)
        res_columns[filled_col]->insert((*res_columns[filled_col])[res_columns[filled_col]->size() - 1]);
}

static void fillHostnameColumns(MutableColumns & res_columns, size_t & col, const HostID & host_id)
{
    /// NOTE host_id.host_name can be a domain name or an IP address
    /// We could try to resolve domain name or reverse resolve an address and add two separate columns,
    /// but seems like it's not really needed, so we show host_id.host_name as is.

    /// host
    res_columns[col++]->insert(host_id.host_name);

    /// port
    res_columns[col++]->insert(host_id.port);
}

static void fillStatusColumnsWithNulls(MutableColumns & res_columns, size_t & col, Status status)
{
    /// status
    res_columns[col++]->insert(static_cast<Int8>(status));
    /// exception_code
    res_columns[col++]->insert(Field{});
    /// exception_text
    res_columns[col++]->insert(Field{});
    /// query_finish_time
    res_columns[col++]->insert(Field{});
    /// query_duration_ms
    res_columns[col++]->insert(Field{});
}

static void fillStatusColumns(MutableColumns & res_columns, size_t & col,
                                GetResponseFuture & finished_data_future,
                                UInt64 query_create_time_ms)
{
    auto maybe_finished_status = finished_data_future.get();
    if (maybe_finished_status.error == Coordination::Error::ZNONODE)
        return fillStatusColumnsWithNulls(res_columns, col, Status::REMOVING);

    /// asyncTryGet should throw on other error codes
    assert(maybe_finished_status.error == Coordination::Error::ZOK);

    /// status
    res_columns[col++]->insert(static_cast<Int8>(Status::FINISHED));

    auto execution_status = ExecutionStatus::fromText(maybe_finished_status.data);
    /// exception_code
    res_columns[col++]->insert(execution_status.code);
    /// exception_text
    res_columns[col++]->insert(execution_status.message);

    UInt64 query_finish_time_ms = maybe_finished_status.stat.ctime;
    /// query_finish_time
    res_columns[col++]->insert(static_cast<UInt64>(query_finish_time_ms / 1000));
    /// query_duration_ms
    res_columns[col++]->insert(static_cast<UInt64>(query_finish_time_ms - query_create_time_ms));
}


void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
    auto& ddl_worker = context->getDDLWorker();
    fs::path ddl_zookeeper_path = ddl_worker.getQueueDir();
    zkutil::ZooKeeperPtr zookeeper = ddl_worker.getAndSetZooKeeper();
    Strings ddl_task_paths = zookeeper->getChildren(ddl_zookeeper_path);

    GetResponseFutures ddl_task_futures;
    ListResponseFutures active_nodes_futures;
    ListResponseFutures finished_nodes_futures;

    for (const auto & task_path : ddl_task_paths)
    {
        ddl_task_futures.push_back(zookeeper->asyncTryGet(ddl_zookeeper_path / task_path));
        /// List status dirs. Active host may become finished, so we list active first.
        active_nodes_futures.push_back(zookeeper->asyncTryGetChildrenNoThrow(ddl_zookeeper_path / task_path / "active"));
        finished_nodes_futures.push_back(zookeeper->asyncTryGetChildrenNoThrow(ddl_zookeeper_path / task_path / "finished"));
    }

    for (size_t i = 0; i < ddl_task_paths.size(); ++i)
    {
        auto maybe_task = ddl_task_futures[i].get();
        if (maybe_task.error != Coordination::Error::ZOK)
        {
            /// Task is removed
            assert(maybe_task.error == Coordination::Error::ZNONODE);
            continue;
        }

        DDLTask task{ddl_task_paths[i], ddl_zookeeper_path / ddl_task_paths[i]};
        try
        {
            task.entry.parse(maybe_task.data);
        }
        catch (Exception & e)
        {
            e.addMessage("On parsing DDL entry {}: {}", task.entry_path, maybe_task.data);
            throw;
        }

        String cluster_name = clusterNameFromDDLQuery(context, task);
        UInt64 query_create_time_ms = maybe_task.stat.ctime;

        size_t col = 0;
        fillCommonColumns(res_columns, col, task, cluster_name, query_create_time_ms);

        /// At first we process finished nodes, to avoid duplication if some host was active
        /// and suddenly become finished during status dirs listing.
        /// Then we process active (but not finished) hosts.
        /// And then we process the rest hosts from task.entry.hosts list.
        /// NOTE: It's not guaranteed that task.entry.hosts contains all host ids from status dirs.
        std::unordered_set<String> processed_hosts;

        /// Race condition with DDLWorker::cleanupQueue(...) is possible.
        /// We may get incorrect list of finished nodes if task is currently removing.
        /// To avoid showing INACTIVE status for hosts that have actually executed query,
        /// we will detect if someone is removing task and show special REMOVING status.
        /// Also we should distinguish it from another case when status dirs are not created yet (extremely rare case).
        bool is_removing_task = false;

        auto maybe_finished_hosts = finished_nodes_futures[i].get();
        if (maybe_finished_hosts.error == Coordination::Error::ZOK)
        {
            GetResponseFutures finished_status_futures;
            for (const auto & host_id_str : maybe_finished_hosts.names)
                finished_status_futures.push_back(zookeeper->asyncTryGet(fs::path(task.entry_path) / "finished" / host_id_str));

            for (size_t host_idx = 0; host_idx < maybe_finished_hosts.names.size(); ++host_idx)
            {
                const auto & host_id_str = maybe_finished_hosts.names[host_idx];
                HostID host_id = HostID::fromString(host_id_str);
                repeatValuesInCommonColumns(res_columns, col);
                size_t rest_col = col;
                fillHostnameColumns(res_columns, rest_col, host_id);
                fillStatusColumns(res_columns, rest_col, finished_status_futures[host_idx], query_create_time_ms);
                processed_hosts.insert(host_id_str);
            }
        }
        else if (maybe_finished_hosts.error == Coordination::Error::ZNONODE)
        {
            /// Rare case: Either status dirs are not created yet or already removed.
            /// We can distinguish it by checking if task node exists, because "query-xxx" and "query-xxx/finished"
            /// are removed in single multi-request
            is_removing_task = !zookeeper->exists(task.entry_path);
        }
        else
        {
            throw Coordination::Exception::fromPath(maybe_finished_hosts.error, fs::path(task.entry_path) / "finished");
        }

        /// Process active nodes
        auto maybe_active_hosts = active_nodes_futures[i].get();
        if (maybe_active_hosts.error == Coordination::Error::ZOK)
        {
            for (const auto & host_id_str : maybe_active_hosts.names)
            {
                if (processed_hosts.contains(host_id_str))
                    continue;

                HostID host_id = HostID::fromString(host_id_str);
                repeatValuesInCommonColumns(res_columns, col);
                size_t rest_col = col;
                fillHostnameColumns(res_columns, rest_col, host_id);
                fillStatusColumnsWithNulls(res_columns, rest_col, Status::ACTIVE);
                processed_hosts.insert(host_id_str);
            }
        }
        else if (maybe_active_hosts.error == Coordination::Error::ZNONODE)
        {
            /// Rare case: Either status dirs are not created yet or task is currently removing.
            /// When removing a task, at first we remove "query-xxx/active" (not recursively),
            /// then recursively remove everything except "query-xxx/finished"
            /// and then remove "query-xxx" and "query-xxx/finished".
            is_removing_task = is_removing_task ||
                (zookeeper->exists(fs::path(task.entry_path) / "finished") && !zookeeper->exists(fs::path(task.entry_path) / "active")) ||
                !zookeeper->exists(task.entry_path);
        }
        else
        {
            throw Coordination::Exception::fromPath(maybe_active_hosts.error, fs::path(task.entry_path) / "active");
        }

        /// Process the rest hosts
        for (const auto & host_id : task.entry.hosts)
        {
            if (processed_hosts.contains(host_id.toString()))
                continue;

            Status status = is_removing_task ? Status::REMOVING : Status::INACTIVE;
            repeatValuesInCommonColumns(res_columns, col);
            size_t rest_col = col;
            fillHostnameColumns(res_columns, rest_col, host_id);
            fillStatusColumnsWithNulls(res_columns, rest_col, status);
            processed_hosts.insert(host_id.toString());
        }

        if (processed_hosts.empty())
        {
            /// We don't know any hosts, just fill the rest columns with nulls.
            /// host
            res_columns[col++]->insert(Field{});
            /// port
            res_columns[col++]->insert(Field{});
            fillStatusColumnsWithNulls(res_columns, col, Status::UNKNOWN);
        }
    }
}

}