1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
#include <iomanip>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(
size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_)
: sources(num_inputs), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_)
{
if (num_inputs == 0)
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather");
}
void ColumnGathererStream::initialize(Inputs inputs)
{
for (size_t i = 0; i < inputs.size(); ++i)
{
if (inputs[i].chunk)
{
sources[i].update(inputs[i].chunk.detachColumns().at(0));
if (!result_column)
result_column = sources[i].column->cloneEmpty();
}
}
}
IMergingAlgorithm::Status ColumnGathererStream::merge()
{
/// Nothing to read after initialize.
if (!result_column)
return Status(Chunk(), true);
if (source_to_fully_copy) /// Was set on a previous iteration
{
Chunk res;
res.addColumn(source_to_fully_copy->column);
merged_rows += source_to_fully_copy->size;
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
}
/// Special case: single source and there are no skipped rows
/// Note: looks like this should never happen because row_sources_buf cannot just skip row info.
if (sources.size() == 1 && row_sources_buf.eof())
{
if (sources.front().pos < sources.front().size)
{
next_required_source = 0;
Chunk res;
merged_rows += sources.front().column->size();
merged_bytes += sources.front().column->allocatedBytes();
res.addColumn(std::move(sources.front().column));
sources.front().pos = sources.front().size = 0;
return Status(std::move(res));
}
if (next_required_source == -1)
return Status(Chunk(), true);
next_required_source = 0;
return Status(next_required_source);
}
if (next_required_source != -1 && sources[next_required_source].size == 0)
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Cannot fetch required block. Source {}", toString(next_required_source));
/// Surprisingly this call may directly change some internal state of ColumnGathererStream.
/// output_column. See ColumnGathererStream::gather.
result_column->gather(*this);
if (next_required_source != -1)
return Status(next_required_source);
if (source_to_fully_copy && result_column->empty())
{
Chunk res;
merged_rows += source_to_fully_copy->column->size();
merged_bytes += source_to_fully_copy->column->allocatedBytes();
res.addColumn(source_to_fully_copy->column);
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
}
auto col = result_column->cloneEmpty();
result_column.swap(col);
Chunk res;
merged_rows += col->size();
merged_bytes += col->allocatedBytes();
res.addColumn(std::move(col));
return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy);
}
void ColumnGathererStream::consume(Input & input, size_t source_num)
{
auto & source = sources[source_num];
if (input.chunk)
source.update(input.chunk.getColumns().at(0));
if (0 == source.size)
{
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Fetched block is empty. Source {}", source_num);
}
}
ColumnGathererTransform::ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_)
, log(&Poco::Logger::get("ColumnGathererStream"))
{
if (header.columns() != 1)
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Header should have 1 column, but contains {}",
toString(header.columns()));
}
void ColumnGathererTransform::work()
{
Stopwatch stopwatch;
IMergingTransform<ColumnGathererStream>::work();
elapsed_ns += stopwatch.elapsedNanoseconds();
}
void ColumnGathererTransform::onFinish()
{
auto merged_rows = algorithm.getMergedRows();
auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows)
if (merged_rows < 10000000)
return;
double seconds = static_cast<double>(elapsed_ns) / 1000000000ULL;
const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
if (seconds == 0.0)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows, seconds,
merged_rows / seconds, ReadableSize(merged_bytes / seconds));
}
}
|