1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/CrashLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
ISystemLog::~ISystemLog() = default;
template <typename LogElement>
SystemLogQueue<LogElement>::SystemLogQueue(const SystemLogQueueSettings & settings_)
: log(&Poco::Logger::get("SystemLogQueue (" + settings_.database + "." +settings_.table + ")"))
, settings(settings_)
{
queue.reserve(settings.reserved_size_rows);
if (settings.turn_off_logger)
log->setLevel(0);
}
static thread_local bool recursive_push_call = false;
template <typename LogElement>
void SystemLogQueue<LogElement>::push(LogElement&& element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_push_call)
return;
recursive_push_call = true;
SCOPE_EXIT({ recursive_push_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
/// Should not log messages under mutex.
bool buffer_size_rows_flush_threshold_exceeded = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == settings.buffer_size_rows_flush_threshold)
{
buffer_size_rows_flush_threshold_exceeded = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= settings.max_size_rows)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}. max_size_rows {}",
demangle(typeid(*this).name()),
queue_front_index,
settings.max_size_rows);
}
return;
}
queue.push_back(std::move(element));
}
if (buffer_size_rows_flush_threshold_exceeded)
LOG_INFO(log, "Queue is half full for system log '{}'. buffer_size_rows_flush_threshold {}",
demangle(typeid(*this).name()), settings.buffer_size_rows_flush_threshold);
}
template <typename LogElement>
void SystemLogQueue<LogElement>::handleCrash()
{
if (settings.notify_flush_on_crash)
notifyFlush(/* force */ true);
}
template <typename LogElement>
void SystemLogQueue<LogElement>::waitFlush(uint64_t expected_flushed_up_to)
{
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= expected_flushed_up_to && !is_force_prepare_tables;
});
if (!result)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.",
toString(timeout_seconds), demangle(typeid(*this).name()));
}
}
template <typename LogElement>
uint64_t SystemLogQueue<LogElement>::notifyFlush(bool should_prepare_tables_anyway)
{
uint64_t this_thread_requested_offset;
{
std::lock_guard lock(mutex);
if (is_shutdown)
return uint64_t(-1);
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= should_prepare_tables_anyway;
requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);
return this_thread_requested_offset;
}
template <typename LogElement>
void SystemLogQueue<LogElement>::confirm(uint64_t to_flush_end)
{
std::lock_guard lock(mutex);
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
template <typename LogElement>
typename SystemLogQueue<LogElement>::Index SystemLogQueue<LogElement>::pop(std::vector<LogElement> & output,
bool & should_prepare_tables_anyway,
bool & exit_this_thread)
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(settings.flush_interval_milliseconds),
[&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
queue_front_index += queue.size();
// Swap with existing array from previous flush, to save memory
// allocations.
output.resize(0);
queue.swap(output);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
return queue_front_index;
}
template <typename LogElement>
void SystemLogQueue<LogElement>::shutdown()
{
std::unique_lock lock(mutex);
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
template <typename LogElement>
SystemLogBase<LogElement>::SystemLogBase(
const SystemLogQueueSettings & settings_,
std::shared_ptr<SystemLogQueue<LogElement>> queue_)
: queue(queue_ ? queue_ : std::make_shared<SystemLogQueue<LogElement>>(settings_))
{
}
template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset = queue->notifyFlush(force);
if (this_thread_requested_offset == uint64_t(-1))
return;
queue->waitFlush(this_thread_requested_offset);
}
template <typename LogElement>
void SystemLogBase<LogElement>::handleCrash()
{
queue->handleCrash();
}
template <typename LogElement>
void SystemLogBase<LogElement>::startup()
{
std::lock_guard lock(thread_mutex);
saving_thread = std::make_unique<ThreadFromGlobalPool>([this] { savingThreadFunction(); });
}
template <typename LogElement>
void SystemLogBase<LogElement>::add(LogElement element)
{
queue->push(std::move(element));
}
template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(bool force) { queue->notifyFlush(force); }
#define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase<ELEMENT>;
SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG_BASE)
#define INSTANTIATE_SYSTEM_LOG_QUEUE(ELEMENT) template class SystemLogQueue<ELEMENT>;
SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG_QUEUE)
}
|