aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Executors/PollingQueue.h
blob: 100d762b7319bb690bb4f00b520bddcc183a8c1a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#pragma once
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <atomic>
#include <unordered_map>
#include <Common/Epoll.h>

namespace DB
{

#if defined(OS_LINUX)

/// This queue is used to poll descriptors. Generally, just a wrapper over epoll.
class PollingQueue
{
public:
    struct TaskData
    {
        size_t thread_num = 0;

        void * data = nullptr;
        int fd = -1;

        explicit operator bool() const { return data; }
    };

private:
    Epoll epoll;
    int pipe_fd[2];
    std::atomic_bool is_finished = false;
    std::unordered_map<std::uintptr_t, TaskData> tasks;

public:
    PollingQueue();
    ~PollingQueue();

    size_t size() const { return tasks.size(); }
    bool empty() const { return tasks.empty(); }

    /// Add new task to queue.
    void addTask(size_t thread_number, void * data, int fd);

    /// Wait for any descriptor. If no descriptors in queue, blocks.
    /// Returns ptr which was inserted into queue or nullptr if finished was called.
    /// Lock is unlocked during waiting.
    TaskData wait(std::unique_lock<std::mutex> & lock);

    /// Interrupt waiting.
    void finish();
};
#else
class PollingQueue
{
public:
    bool empty() { return true; }
    void finish() {}
};
#endif

}