blob: d35f8de94d1522d887994c2467a9663ac27ad673 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
#pragma once
#include <Processors/Executors/ExecutionThreadContext.h>
#include <Processors/Executors/PollingQueue.h>
#include <Processors/Executors/ThreadsQueue.h>
#include <Processors/Executors/TasksQueue.h>
#include <stack>
namespace DB
{
/// Manage tasks which are ready for execution. Used in PipelineExecutor.
class ExecutorTasks
{
/// If query is finished (or cancelled).
std::atomic_bool finished = false;
/// Contexts for every executing thread.
std::vector<std::unique_ptr<ExecutionThreadContext>> executor_contexts;
/// This mutex protects only executor_contexts vector. Needed to avoid race between init() and finish().
std::mutex executor_contexts_mutex;
/// Common mutex for all the following fields.
std::mutex mutex;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue<ExecutingGraph::Node> task_queue;
/// Queue which stores tasks where processors returned Async status after prepare.
/// If multiple threads are using, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
PollingQueue async_task_queue;
/// Maximum amount of threads. Constant after initialization, based on `max_threads` setting.
size_t num_threads = 0;
/// Started thread count (allocated by `ConcurrencyControl`). Can increase during execution up to `num_threads`.
size_t use_threads = 0;
/// This is the total number of waited async tasks which are not executed yet.
/// sum(executor_contexts[i].async_tasks.size())
size_t num_waiting_async_tasks = 0;
/// A set of currently waiting threads.
ThreadsQueue threads_queue;
public:
using Stack = std::stack<UInt64>;
using Queue = std::queue<ExecutingGraph::Node *>;
void finish();
bool isFinished() const { return finished; }
void rethrowFirstThreadException();
void tryWakeUpAnyOtherThreadWithTasks(ExecutionThreadContext & self, std::unique_lock<std::mutex> & lock);
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
void fill(Queue & queue);
void upscale(size_t use_threads_);
void processAsyncTasks();
ExecutionThreadContext & getThreadContext(size_t thread_num) { return *executor_contexts[thread_num]; }
};
}
|