1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
#pragma once
#include <string>
#include <Processors/IProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <IO/Progress.h>
#include <Common/Stopwatch.h>
namespace DB
{
class WriteBuffer;
/** Output format have three inputs and no outputs. It writes data from WriteBuffer.
*
* First input is for main resultset, second is for "totals" and third is for "extremes".
* It's not necessarily to connect "totals" or "extremes" ports (they may remain dangling).
*
* Data from input ports are pulled in order: first, from main input, then totals, then extremes.
*
* By default, data for "totals" and "extremes" is ignored.
*/
class IOutputFormat : public IProcessor
{
public:
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
IOutputFormat(const Block & header_, WriteBuffer & out_);
Status prepare() override;
void work() override;
/// Flush output buffers if any.
virtual void flush();
void setAutoFlush() { auto_flush = true; }
/// Value for rows_before_limit_at_least field.
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
/// Counter to calculate rows_before_limit_at_least in processors pipeline.
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_counter.swap(counter); }
/// Notify about progress. Method could be called from different threads.
/// Passed value are delta, that must be summarized.
virtual void onProgress(const Progress & /*progress*/) {}
/// Content-Type to set when sending HTTP response.
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
InputPort & getPort(PortKind kind) { return *std::next(inputs.begin(), kind); }
/// Compatibility with old interface.
/// TODO: separate formats and processors.
void write(const Block & block);
void finalize();
virtual bool expectMaterializedColumns() const { return true; }
void setTotals(const Block & totals)
{
writeSuffixIfNeeded();
consumeTotals(Chunk(totals.getColumns(), totals.rows()));
are_totals_written = true;
}
void setExtremes(const Block & extremes)
{
writeSuffixIfNeeded();
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }
void doNotWritePrefix() { need_write_prefix = false; }
void resetFormatter()
{
need_write_prefix = true;
need_write_suffix = true;
finalized = false;
resetFormatterImpl();
}
/// Reset the statistics watch to a specific point in time
/// If set to not running it will stop on the call (elapsed = now() - given start)
void setStartTime(UInt64 start, bool is_running)
{
statistics.watch = Stopwatch(CLOCK_MONOTONIC, start, true);
if (!is_running)
statistics.watch.stop();
}
void writePrefixIfNeeded()
{
if (need_write_prefix)
{
writePrefix();
need_write_prefix = false;
}
}
protected:
friend class ParallelFormattingOutputFormat;
void writeSuffixIfNeeded()
{
if (need_write_suffix)
{
writeSuffix();
need_write_suffix = false;
}
}
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void finalizeBuffers() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
virtual void resetFormatterImpl() {}
/// Methods-helpers for parallel formatting.
/// Set the number of rows that was already read in
/// parallel formatting before creating this formatter.
void setRowsReadBefore(size_t first_row_number_)
{
rows_read_before = first_row_number_;
onRowsReadBeforeUpdate();
}
size_t getRowsReadBefore() const { return rows_read_before; }
/// Update state according to new rows_read_before.
virtual void onRowsReadBeforeUpdate() {}
/// Some formats outputs some statistics after the data,
/// in parallel formatting we collect these statistics outside the
/// underling format and then set it to format before finalizing.
struct Statistics
{
Stopwatch watch;
Progress progress;
bool applied_limit = false;
size_t rows_before_limit = 0;
Chunk totals;
Chunk extremes;
};
/// In some formats the way we print extremes depends on
/// were totals printed or not. In this case in parallel formatting
/// we should notify underling format if totals were printed.
void setTotalsAreWritten() { are_totals_written = true; }
bool areTotalsWritten() const { return are_totals_written; }
/// Return true if format saves totals and extremes in consumeTotals/consumeExtremes and
/// outputs them in finalize() method.
virtual bool areTotalsAndExtremesUsedInFinalize() const { return false; }
WriteBuffer & out;
Chunk current_chunk;
PortKind current_block_kind = PortKind::Main;
bool has_input = false;
bool finished = false;
bool finalized = false;
/// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready.
bool auto_flush = false;
bool need_write_prefix = true;
bool need_write_suffix = true;
RowsBeforeLimitCounterPtr rows_before_limit_counter;
Statistics statistics;
private:
size_t rows_read_before = 0;
bool are_totals_written = false;
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;
};
}
|