1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/ReadProgressCallback.h>
#include <Poco/Event.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/scope_guard_safe.h>
#include <Common/CurrentThread.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct CompletedPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
std::atomic_bool is_finished = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
Poco::Event finish_event;
~Data()
{
if (thread.joinable())
thread.join();
}
};
static void threadFunction(
CompletedPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads, bool concurrency_control)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
setThreadName("QueryCompPipeEx");
try
{
if (thread_group)
CurrentThread::attachToGroup(thread_group);
data.executor->execute(num_threads, concurrency_control);
}
catch (...)
{
data.exception = std::current_exception();
data.has_exception = true;
}
data.is_finished = true;
data.finish_event.set();
}
CompletedPipelineExecutor::CompletedPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
if (!pipeline.completed())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for CompletedPipelineExecutor must be completed");
}
void CompletedPipelineExecutor::setCancelCallback(std::function<bool()> is_cancelled, size_t interactive_timeout_ms_)
{
is_cancelled_callback = is_cancelled;
interactive_timeout_ms = interactive_timeout_ms_;
}
void CompletedPipelineExecutor::execute()
{
if (interactive_timeout_ms)
{
data = std::make_unique<Data>();
data->executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
data->executor->setReadProgressCallback(pipeline.getReadProgressCallback());
/// Avoid passing this to lambda, copy ptr to data instead.
/// Destructor of unique_ptr copy raw ptr into local variable first, only then calls object destructor.
auto func = [
data_ptr = data.get(),
num_threads = pipeline.getNumThreads(),
thread_group = CurrentThread::getGroup(),
concurrency_control = pipeline.getConcurrencyControl()]
{
threadFunction(*data_ptr, thread_group, num_threads, concurrency_control);
};
data->thread = ThreadFromGlobalPool(std::move(func));
while (!data->is_finished)
{
if (data->finish_event.tryWait(interactive_timeout_ms))
break;
if (is_cancelled_callback())
data->executor->cancel();
}
if (data->has_exception)
std::rethrow_exception(data->exception);
}
else
{
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element);
executor.setReadProgressCallback(pipeline.getReadProgressCallback());
executor.execute(pipeline.getNumThreads(), pipeline.getConcurrencyControl());
}
}
CompletedPipelineExecutor::~CompletedPipelineExecutor()
{
try
{
if (data && data->executor)
data->executor->cancel();
}
catch (...)
{
tryLogCurrentException("CompletedPipelineExecutor");
}
}
}
|