blob: bb8996fc1a6bf77026e27fa649bb815c9dce5d9f (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
#pragma once
#include <vector>
#include <queue>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename Task>
class TaskQueue
{
public:
void init(size_t num_threads) { queues.resize(num_threads); }
void push(Task * task, size_t thread_num)
{
queues[thread_num].push(task);
++num_tasks;
}
size_t getAnyThreadWithTasks(size_t from_thread = 0)
{
if (num_tasks == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TaskQueue is empty");
for (size_t i = 0; i < queues.size(); ++i)
{
if (!queues[from_thread].empty())
return from_thread;
++from_thread;
if (from_thread >= queues.size())
from_thread = 0;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "TaskQueue is empty");
}
Task * pop(size_t thread_num)
{
auto thread_with_tasks = getAnyThreadWithTasks(thread_num);
Task * task = queues[thread_with_tasks].front();
queues[thread_with_tasks].pop();
--num_tasks;
return task;
}
size_t size() const { return num_tasks; }
bool empty() const { return num_tasks == 0; }
private:
using Queue = std::queue<Task *>;
std::vector<Queue> queues;
size_t num_tasks = 0;
};
}
|