aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Executors/TasksQueue.h
blob: bb8996fc1a6bf77026e27fa649bb815c9dce5d9f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#pragma once
#include <vector>
#include <queue>
#include <Common/Exception.h>

namespace DB
{
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

template <typename Task>
class TaskQueue
{
public:
    void init(size_t num_threads) { queues.resize(num_threads); }

    void push(Task * task, size_t thread_num)
    {
        queues[thread_num].push(task);
        ++num_tasks;
    }

    size_t getAnyThreadWithTasks(size_t from_thread = 0)
    {
        if (num_tasks == 0)
            throw Exception(ErrorCodes::LOGICAL_ERROR, "TaskQueue is empty");

        for (size_t i = 0; i < queues.size(); ++i)
        {
            if (!queues[from_thread].empty())
                return from_thread;

            ++from_thread;
            if (from_thread >= queues.size())
                from_thread = 0;
        }

        throw Exception(ErrorCodes::LOGICAL_ERROR, "TaskQueue is empty");
    }

    Task * pop(size_t thread_num)
    {
        auto thread_with_tasks = getAnyThreadWithTasks(thread_num);

        Task * task = queues[thread_with_tasks].front();
        queues[thread_with_tasks].pop();

        --num_tasks;
        return task;
    }

    size_t size() const { return num_tasks; }
    bool empty() const { return num_tasks == 0; }

private:
    using Queue = std::queue<Task *>;
    std::vector<Queue> queues;
    size_t num_tasks = 0;
};

}