blob: 0fa7e0b552f98e03b9d03411e0bb223003d173c9 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
#include <Processors/Executors/ExecutionThreadContext.h>
#include <QueryPipeline/ReadProgressCallback.h>
#include <Common/Stopwatch.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int QUOTA_EXCEEDED;
extern const int QUERY_WAS_CANCELLED;
}
void ExecutionThreadContext::wait(std::atomic_bool & finished)
{
std::unique_lock lock(mutex);
condvar.wait(lock, [&]
{
return finished || wake_flag;
});
wake_flag = false;
}
void ExecutionThreadContext::wakeUp()
{
std::lock_guard guard(mutex);
wake_flag = true;
condvar.notify_one();
}
static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception)
{
/// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests).
return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES
&& exception.code() != ErrorCodes::QUOTA_EXCEEDED
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback)
{
try
{
node->processor->work();
/// Update read progress only for source nodes.
bool is_source = node->back_edges.empty();
if (is_source && read_progress_callback)
{
if (auto read_progress = node->processor->getReadProgress())
{
if (read_progress->counters.total_rows_approx)
read_progress_callback->addTotalRowsApprox(read_progress->counters.total_rows_approx);
if (read_progress->counters.total_bytes)
read_progress_callback->addTotalBytes(read_progress->counters.total_bytes);
if (!read_progress_callback->onProgress(read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits))
node->processor->cancel();
}
}
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + node->processor->getName());
throw;
}
}
bool ExecutionThreadContext::executeTask()
{
std::unique_ptr<OpenTelemetry::SpanHolder> span;
if (trace_processors)
{
span = std::make_unique<OpenTelemetry::SpanHolder>(node->processor->getName());
span->addAttribute("thread_number", thread_number);
}
std::optional<Stopwatch> execution_time_watch;
#ifndef NDEBUG
execution_time_watch.emplace();
#else
if (profile_processors)
execution_time_watch.emplace();
#endif
try
{
executeJob(node, read_progress_callback);
++node->num_executed_jobs;
}
catch (...)
{
node->exception = std::current_exception();
}
if (profile_processors)
{
UInt64 elapsed_microseconds = execution_time_watch->elapsedMicroseconds();
node->processor->elapsed_us += elapsed_microseconds;
if (trace_processors)
span->addAttribute("execution_time_ms", elapsed_microseconds);
}
#ifndef NDEBUG
execution_time_ns += execution_time_watch->elapsed();
if (trace_processors)
span->addAttribute("execution_time_ns", execution_time_watch->elapsed());
#endif
return node->exception == nullptr;
}
void ExecutionThreadContext::rethrowExceptionIfHas()
{
if (exception)
std::rethrow_exception(exception);
}
ExecutingGraph::Node * ExecutionThreadContext::tryPopAsyncTask()
{
ExecutingGraph::Node * task = nullptr;
if (!async_tasks.empty())
{
task = async_tasks.front();
async_tasks.pop();
if (async_tasks.empty())
has_async_tasks = false;
}
return task;
}
void ExecutionThreadContext::pushAsyncTask(ExecutingGraph::Node * async_task)
{
async_tasks.push(async_task);
has_async_tasks = true;
}
}
|