1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include <csignal>
#include <poll.h>
#include <mutex>
#include <filesystem>
#include <unordered_map>
#include <base/scope_guard.h>
#include <Storages/System/StorageSystemStackTrace.h>
#include <Storages/VirtualColumnUtils.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Common/PipeFDs.h>
#include <Common/CurrentThread.h>
#include <Common/HashTable/Hash.h>
#include <Common/logger_useful.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <base/getThreadId.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SIGQUEUE;
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int LOGICAL_ERROR;
}
namespace
{
// Initialized in StorageSystemStackTrace's ctor and used in signalHandler.
std::atomic<pid_t> expected_pid;
const int sig = SIGRTMIN;
std::atomic<int> sequence_num = 0; /// For messages sent via pipe.
std::atomic<int> data_ready_num = 0;
std::atomic<bool> signal_latch = false; /// Only need for thread sanitizer.
/** Notes:
* Only one query from the table can be processed at the moment of time.
* This is ensured by the mutex in fillData function.
* We obtain information about threads by sending signal and receiving info from the signal handler.
* Information is passed via global variables and pipe is used for signaling.
* Actually we can send all information via pipe, but we read from it with timeout just in case,
* so it's convenient to use is only for signaling.
*/
StackTrace stack_trace{NoCapture{}};
constexpr size_t max_query_id_size = 128;
char query_id_data[max_query_id_size];
size_t query_id_size = 0;
LazyPipeFDs notification_pipe;
void signalHandler(int, siginfo_t * info, void * context)
{
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
/// In case malicious user is sending signals manually (for unknown reason).
/// If we don't check - it may break our synchronization.
if (info->si_pid != expected_pid)
return;
/// Signal received too late.
int notification_num = info->si_value.sival_int;
if (notification_num != sequence_num.load(std::memory_order_acquire))
return;
bool expected = false;
if (!signal_latch.compare_exchange_strong(expected, true, std::memory_order_acquire))
return;
/// All these methods are signal-safe.
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
stack_trace = StackTrace(signal_context);
auto query_id = CurrentThread::getQueryId();
query_id_size = std::min(query_id.size(), max_query_id_size);
if (!query_id.empty())
memcpy(query_id_data, query_id.data(), query_id_size);
/// This is unneeded (because we synchronize through pipe) but makes TSan happy.
data_ready_num.store(notification_num, std::memory_order_release);
ssize_t res = ::write(notification_pipe.fds_rw[1], ¬ification_num, sizeof(notification_num));
/// We cannot do anything if write failed.
(void)res;
errno = saved_errno;
signal_latch.store(false, std::memory_order_release);
}
/// Wait for data in pipe and read it.
bool wait(int timeout_ms)
{
while (true)
{
int fd = notification_pipe.fds_rw[0];
pollfd poll_fd{fd, POLLIN, 0};
int poll_res = poll(&poll_fd, 1, timeout_ms);
if (poll_res < 0)
{
if (errno == EINTR)
{
--timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting.
if (timeout_ms == 0)
return false;
continue;
}
throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (poll_res == 0)
return false;
int notification_num = 0;
ssize_t read_res = ::read(fd, ¬ification_num, sizeof(notification_num));
if (read_res < 0)
{
if (errno == EINTR)
continue;
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (read_res == sizeof(notification_num))
{
if (notification_num == sequence_num.load(std::memory_order_relaxed))
return true;
else
continue; /// Drain delayed notifications.
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: read wrong number of bytes from pipe");
}
}
ColumnPtr getFilteredThreadIds(ASTPtr query, ContextPtr context)
{
MutableColumnPtr all_thread_ids = ColumnUInt64::create();
std::filesystem::directory_iterator end;
/// There is no better way to enumerate threads in a process other than looking into procfs.
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
{
pid_t tid = parse<pid_t>(it->path().filename());
all_thread_ids->insert(tid);
}
Block block { ColumnWithTypeAndName(std::move(all_thread_ids), std::make_shared<DataTypeUInt64>(), "thread_id") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
return block.getByPosition(0).column;
}
using ThreadIdToName = std::unordered_map<UInt64, String, DefaultHash<UInt64>>;
ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const PaddedPODArray<UInt64> & thread_ids)
{
ThreadIdToName tid_to_name;
MutableColumnPtr all_thread_names = ColumnString::create();
for (UInt64 tid : thread_ids)
{
std::filesystem::path thread_name_path = fmt::format("/proc/self/task/{}/comm", tid);
String thread_name;
if (std::filesystem::exists(thread_name_path))
{
constexpr size_t comm_buf_size = 32; /// More than enough for thread name
ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
readEscapedStringUntilEOL(thread_name, comm);
comm.close();
}
tid_to_name[tid] = thread_name;
all_thread_names->insert(thread_name);
}
Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared<DataTypeString>(), "thread_name") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
ColumnPtr thread_names = std::move(block.getByPosition(0).column);
std::unordered_set<String> filtered_thread_names;
for (size_t i = 0; i != thread_names->size(); ++i)
{
const auto & thread_name = thread_names->getDataAt(i);
filtered_thread_names.emplace(thread_name);
}
for (auto it = tid_to_name.begin(); it != tid_to_name.end();)
{
if (!filtered_thread_names.contains(it->second))
it = tid_to_name.erase(it);
else
++it;
}
return tid_to_name;
}
}
StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
: IStorage(table_id_)
, log(&Poco::Logger::get("StorageSystemStackTrace"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription({
{ "thread_name", std::make_shared<DataTypeString>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) },
}, { /* aliases */ }));
setInMemoryMetadata(storage_metadata);
notification_pipe.open();
/// Setup signal handler.
expected_pid = getpid();
struct sigaction sa{};
sa.sa_sigaction = signalHandler;
sa.sa_flags = SA_SIGINFO;
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaddset(&sa.sa_mask, sig))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(sig, &sa, nullptr))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
}
Pipe StorageSystemStackTrace::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
int pipe_read_timeout_ms = static_cast<int>(
context->getSettingsRef().storage_system_stack_trace_pipe_read_timeout_ms.totalMilliseconds());
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
bool send_signal = names_set.contains("trace") || names_set.contains("query_id");
bool read_thread_names = names_set.contains("thread_name");
MutableColumns res_columns = sample_block.cloneEmptyColumns();
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
/// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
/// Obviously, results for different threads may be out of sync.
ColumnPtr thread_ids = getFilteredThreadIds(query_info.query, context);
const auto & thread_ids_data = assert_cast<const ColumnUInt64 &>(*thread_ids).getData();
ThreadIdToName thread_names;
if (read_thread_names)
thread_names = getFilteredThreadNames(query_info.query, context, thread_ids_data);
for (UInt64 tid : thread_ids_data)
{
size_t res_index = 0;
String thread_name;
if (read_thread_names)
{
if (auto it = thread_names.find(tid); it != thread_names.end())
thread_name = it->second;
else
continue; /// was filtered out by "thread_name" condition
}
if (!send_signal)
{
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
else
{
sigval sig_value{};
sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
if (0 != ::sigqueue(static_cast<int>(tid), sig, sig_value))
{
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (send_signal && wait(pipe_read_timeout_ms) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFramePointers()[i]));
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertData(query_id_data, query_id_size);
res_columns[res_index++]->insert(arr);
}
else
{
LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
res_columns[res_index++]->insert(thread_name);
res_columns[res_index++]->insert(tid);
res_columns[res_index++]->insertDefault();
res_columns[res_index++]->insertDefault();
}
/// Signed integer overflow is undefined behavior in both C and C++. However, according to
/// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
/// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
/// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
++sequence_num;
}
}
UInt64 num_rows = res_columns.at(0)->size();
Chunk chunk(std::move(res_columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
}
}
#endif
|