aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/AsyncTaskExecutor.h
blob: 3123990c1cd4e5683b68fd94820f8f73689a633f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#pragma once

#include <Common/Epoll.h>
#include <Common/Fiber.h>
#include <Common/FiberStack.h>
#include <Poco/Timespan.h>

#if defined(OS_LINUX)
#include <sys/epoll.h>
#endif


namespace DB
{

enum class AsyncEventTimeoutType
{
    CONNECT,
    RECEIVE,
    SEND,
    NONE,
};

using AsyncCallback = std::function<void(int, Poco::Timespan, AsyncEventTimeoutType, const std::string &, uint32_t)>;
using SuspendCallback = std::function<void()>;

struct FiberInfo
{
    const Fiber * fiber = nullptr;
    const FiberInfo * parent_fiber_info = nullptr;
};

/// Base class for a task that will be executed in a fiber.
/// It has only one method - run, that takes 2 callbacks:
/// 1) async_callback - callback that should be called when this task tries to perform
///    some operation on a file descriptor (e.g. reading from socket) that can block this task execution.
/// 2) suspend_callback - callback that can be called to suspend current fiber execution explicitly.
struct AsyncTask
{
public:
    virtual void run(AsyncCallback async_callback, SuspendCallback suspend_callback) = 0;
    virtual ~AsyncTask() = default;
};

/// Base class for executing tasks inside a fiber.
class AsyncTaskExecutor
{
public:
    AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_);

    /// Resume task execution. This method returns when task is completed or suspended.
    void resume();

    /// Cancel task execution. Fiber will be destroyed even if task wasn't finished.
    void cancel();

    /// Restart task execution. Current fiber will be destroyed
    /// and the new one will be created with the same task.
    /// The next resume() call will start the new task from the beginning
    void restart();

    bool isCancelled() const { return is_cancelled; }

    virtual ~AsyncTaskExecutor() = default;


#if defined(OS_LINUX)
    enum Event
    {
        READ = EPOLLIN,
        WRITE = EPOLLOUT,
        ERROR = EPOLLERR,
    };
#else
    enum Event
    {
        READ = 1,
        WRITE = 2,
        ERROR = 4,
    };
#endif

protected:
    /// Method that is called in resume() before actual fiber resuming.
    /// If it returns false, resume() will return immediately without actual fiber resuming.
    virtual bool checkBeforeTaskResume() = 0;

    /// Method that is called in resume() after fiber resuming (when it was finished or suspended).
    virtual void afterTaskResume() = 0;

    /// Method that is called on async event (when async callback is called) before fiber is suspended.
    virtual void processAsyncEvent(int fd, Poco::Timespan timeout, AsyncEventTimeoutType timeout_type, const std::string & fd_description, uint32_t async_events) = 0;

    /// Method that is called when task is resumed after it was suspended on async event.
    virtual void clearAsyncEvent() = 0;

    /// Process exception caught while task execution. It's called after fiber resume if exception happened.
    virtual void processException(std::exception_ptr e) { std::rethrow_exception(e); }

    /// Method that is called in cancel() before fiber destruction.
    virtual void cancelBefore() { }
    /// Method that is called in cancel() after fiber destruction.
    virtual void cancelAfter() { }

    /// Resume fiber explicitly without mutex locking.
    /// Can be called in cancelBefore().
    void resumeUnlocked();

private:
    struct Routine;

    void createFiber();
    void destroyFiber();

    FiberStack fiber_stack;
    Fiber fiber;
    std::mutex fiber_lock;
    std::exception_ptr exception;

    std::atomic_bool routine_is_finished = false;
    std::atomic_bool is_cancelled = false;

    std::unique_ptr<AsyncTask> task;
};

String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description);

}