aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/IExecutableTask.h
blob: 738056e0ea00b9b6c3433ea070e39f7c60bc4622 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#pragma once

#include <memory>
#include <functional>

#include <boost/noncopyable.hpp>
#include <Interpreters/StorageID.h>
#include <Common/Priority.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

/**
 * Generic interface for background operations. Simply this is self-made coroutine.
 * The main method is executeStep, which will return true
 * if the task wants to execute another 'step' in near future and false otherwise.
 *
 * Each storage assigns some operations such as merges, mutations, fetches, etc.
 * We need to ask a storage or some another entity to try to assign another operation when current operation is completed.
 *
 * Each task corresponds to a storage, that's why there is a method getStorageID.
 * This is needed to correctly shutdown a storage, e.g. we need to wait for all background operations to complete.
 */
class IExecutableTask
{
public:
    using TaskResultCallback = std::function<void(bool)>;
    virtual bool executeStep() = 0;
    virtual void onCompleted() = 0;
    virtual StorageID getStorageID() const = 0;
    virtual String getQueryId() const = 0;
    virtual Priority getPriority() const = 0;
    virtual ~IExecutableTask() = default;
};

using ExecutableTaskPtr = std::shared_ptr<IExecutableTask>;


/**
 * Some background operations won't represent a coroutines (don't want to be executed step-by-step). For this we have this wrapper.
 */
class ExecutableLambdaAdapter : public IExecutableTask, boost::noncopyable
{
public:
    template <typename Job, typename Callback>
    explicit ExecutableLambdaAdapter(
        Job && job_to_execute_,
        Callback && job_result_callback_,
        StorageID id_)
        : job_to_execute(std::forward<Job>(job_to_execute_))
        , job_result_callback(std::forward<Callback>(job_result_callback_))
        , id(id_) {}

    bool executeStep() override
    {
        res = job_to_execute();
        job_to_execute = {};
        return false;
    }

    void onCompleted() override { job_result_callback(!res); }
    StorageID getStorageID() const override { return id; }
    Priority getPriority() const override
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR, "getPriority() method is not supported by LambdaAdapter");
    }

    String getQueryId() const override { return id.getShortName() + "::lambda"; }

private:
    bool res = false;
    std::function<bool()> job_to_execute;
    IExecutableTask::TaskResultCallback job_result_callback;
    StorageID id;
};


}