blob: 9ccdf6bcc092237da810a4d25e4b6b34eb7d1d85 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
#pragma once
#include <Common/Exception.h>
#include <base/defines.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Simple struct which stores threads with numbers [0 .. num_threads - 1].
/// Allows to push and pop specified thread, or pop any thread if has.
/// All operations (except init) are O(1). No memory allocations after init happen.
struct ThreadsQueue
{
void init(size_t num_threads)
{
stack_size = 0;
stack.clear();
thread_pos_in_stack.clear();
stack.reserve(num_threads);
thread_pos_in_stack.reserve(num_threads);
for (size_t thread = 0; thread < num_threads; ++thread)
{
stack.emplace_back(thread);
thread_pos_in_stack.emplace_back(thread);
}
}
bool has(size_t thread) const { return thread_pos_in_stack[thread] < stack_size; }
size_t size() const { return stack_size; }
bool empty() const { return stack_size == 0; }
void push(size_t thread)
{
if (unlikely(has(thread)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't push thread because it is already in threads queue");
swapThreads(thread, stack[stack_size]);
++stack_size;
}
void pop(size_t thread)
{
if (unlikely(!has(thread)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop thread because it is not in threads queue");
--stack_size;
swapThreads(thread, stack[stack_size]);
}
size_t popAny()
{
if (unlikely(stack_size == 0))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop from empty queue");
--stack_size;
return stack[stack_size];
}
private:
std::vector<size_t> stack;
std::vector<size_t> thread_pos_in_stack;
size_t stack_size = 0;
void swapThreads(size_t first, size_t second)
{
std::swap(thread_pos_in_stack[first], thread_pos_in_stack[second]);
std::swap(stack[thread_pos_in_stack[first]], stack[thread_pos_in_stack[second]]);
}
};
}
|