1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
#pragma once
#include <atomic>
#include <cstddef>
#include <functional>
#include <base/types.h>
#include <Core/Defines.h>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
/// See Progress.
struct ProgressValues
{
UInt64 read_rows = 0;
UInt64 read_bytes = 0;
UInt64 total_rows_to_read = 0;
UInt64 total_bytes_to_read = 0;
UInt64 written_rows = 0;
UInt64 written_bytes = 0;
UInt64 result_rows = 0;
UInt64 result_bytes = 0;
UInt64 elapsed_ns = 0;
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
void writeJSON(WriteBuffer & out) const;
};
struct ReadProgress
{
UInt64 read_rows = 0;
UInt64 read_bytes = 0;
UInt64 total_rows_to_read = 0;
UInt64 total_bytes_to_read = 0;
ReadProgress(UInt64 read_rows_, UInt64 read_bytes_, UInt64 total_rows_to_read_ = 0, UInt64 total_bytes_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_), total_bytes_to_read(total_bytes_to_read_) {}
};
struct WriteProgress
{
UInt64 written_rows = 0;
UInt64 written_bytes = 0;
WriteProgress(UInt64 written_rows_, UInt64 written_bytes_)
: written_rows(written_rows_), written_bytes(written_bytes_) {}
};
struct ResultProgress
{
UInt64 result_rows = 0;
UInt64 result_bytes = 0;
ResultProgress(UInt64 result_rows_, UInt64 result_bytes_)
: result_rows(result_rows_), result_bytes(result_bytes_) {}
};
struct FileProgress
{
/// Here read_bytes (raw bytes) - do not equal ReadProgress::read_bytes, which are calculated according to column types.
UInt64 read_bytes = 0;
UInt64 total_bytes_to_read = 0;
explicit FileProgress(UInt64 read_bytes_, UInt64 total_bytes_to_read_ = 0) : read_bytes(read_bytes_), total_bytes_to_read(total_bytes_to_read_) {}
};
/** Progress of query execution.
* Values, transferred over network are deltas - how much was done after previously sent value.
* The same struct is also used for summarized values.
*/
struct Progress
{
std::atomic<UInt64> read_rows {0}; /// Rows (source) processed.
std::atomic<UInt64> read_bytes {0}; /// Bytes (uncompressed, source) processed.
/** How much rows/bytes must be processed, in total, approximately. Non-zero value is sent when there is information about
* some new part of job. Received values must be summed to get estimate of total rows to process.
*/
std::atomic<UInt64> total_rows_to_read {0};
std::atomic<UInt64> total_bytes_to_read {0};
std::atomic<UInt64> written_rows {0};
std::atomic<UInt64> written_bytes {0};
std::atomic<UInt64> result_rows {0};
std::atomic<UInt64> result_bytes {0};
std::atomic<UInt64> elapsed_ns {0};
Progress() = default;
Progress(UInt64 read_rows_, UInt64 read_bytes_, UInt64 total_rows_to_read_ = 0, UInt64 total_bytes_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_), total_bytes_to_read(total_bytes_to_read_) {}
explicit Progress(ReadProgress read_progress)
: read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {}
explicit Progress(WriteProgress write_progress)
: written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {}
explicit Progress(ResultProgress result_progress)
: result_rows(result_progress.result_rows), result_bytes(result_progress.result_bytes) {}
explicit Progress(FileProgress file_progress)
: read_bytes(file_progress.read_bytes), total_bytes_to_read(file_progress.total_bytes_to_read) {}
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
/// Progress in JSON format (single line, without whitespaces) is used in HTTP headers.
void writeJSON(WriteBuffer & out) const;
/// Each value separately is changed atomically (but not whole object).
bool incrementPiecewiseAtomically(const Progress & rhs);
void reset();
ProgressValues getValues() const;
ProgressValues fetchValuesAndResetPiecewiseAtomically();
Progress fetchAndResetPiecewiseAtomically();
Progress & operator=(Progress && other) noexcept;
Progress(Progress && other) noexcept
{
*this = std::move(other);
}
};
/** Callback to track the progress of the query.
* Used in QueryPipeline and Context.
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
using ProgressCallback = std::function<void(const Progress & progress)>;
}
|