blob: eb048f8ab091c484f9f1a324f2f2a6537386a022 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
#pragma once
#include <Processors/Executors/ExecutingGraph.h>
#include <queue>
#include <condition_variable>
namespace DB
{
class ReadProgressCallback;
/// Context for each executing thread of PipelineExecutor.
class ExecutionThreadContext
{
private:
/// A queue of async tasks. Task is added to queue when waited.
std::queue<ExecutingGraph::Node *> async_tasks;
std::atomic_bool has_async_tasks = false;
/// This objects are used to wait for next available task.
std::condition_variable condvar;
std::mutex mutex;
bool wake_flag = false;
/// Currently processing node.
ExecutingGraph::Node * node = nullptr;
/// Exception from executing thread itself.
std::exception_ptr exception;
/// Callback for read progress.
ReadProgressCallback * read_progress_callback = nullptr;
public:
#ifndef NDEBUG
/// Time for different processing stages.
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
#endif
const size_t thread_number;
const bool profile_processors;
const bool trace_processors;
void wait(std::atomic_bool & finished);
void wakeUp();
/// Methods to access/change currently executing task.
bool hasTask() const { return node != nullptr; }
void setTask(ExecutingGraph::Node * task) { node = task; }
bool executeTask();
uint64_t getProcessorID() const { return node->processors_id; }
/// Methods to manage async tasks.
ExecutingGraph::Node * tryPopAsyncTask();
void pushAsyncTask(ExecutingGraph::Node * async_task);
bool hasAsyncTasks() const { return has_async_tasks; }
std::unique_lock<std::mutex> lockStatus() const { return std::unique_lock(node->status_mutex); }
void setException(std::exception_ptr exception_) { exception = exception_; }
void rethrowExceptionIfHas();
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback)
: read_progress_callback(callback)
, thread_number(thread_number_)
, profile_processors(profile_processors_)
, trace_processors(trace_processors_)
{}
};
}
|