1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#include <Processors/Formats/Impl/LineAsStringRowInputFormat.h>
#include <base/find_symbols.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
extern const int LOGICAL_ERROR;
}
LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_))
{
if (header_.columns() != 1
|| !typeid_cast<const ColumnString *>(header_.getByPosition(0).column.get()))
{
throw Exception(ErrorCodes::INCORRECT_QUERY, "This input format is only suitable for tables with a single column of type String.");
}
}
void LineAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
}
void LineAsStringRowInputFormat::readLineObject(IColumn & column)
{
ColumnString & column_string = assert_cast<ColumnString &>(column);
auto & chars = column_string.getChars();
auto & offsets = column_string.getOffsets();
readStringUntilNewlineInto(chars, *in);
chars.push_back(0);
offsets.push_back(chars.size());
if (!in->eof())
in->ignore(); /// Skip '\n'
}
bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in->eof())
return false;
readLineObject(*columns[0]);
return true;
}
size_t LineAsStringRowInputFormat::countRows(size_t max_block_size)
{
size_t num_rows = 0;
while (!in->eof() && num_rows < max_block_size)
{
skipToNextLineOrEOF(*in);
++num_rows;
}
return num_rows;
}
void registerInputFormatLineAsString(FormatFactory & factory)
{
factory.registerInputFormat("LineAsString", [](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<LineAsStringRowInputFormat>(sample, buf, params);
});
}
static std::pair<bool, size_t> segmentationEngine(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
char * pos = in.position();
bool need_more_data = true;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\n'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Position in buffer is out of bounds. There must be a bug.");
else if (pos == in.buffer().end())
continue;
++number_of_rows;
if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows))
need_more_data = false;
if (*pos == '\n')
++pos;
}
saveUpToPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
void registerFileSegmentationEngineLineAsString(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("LineAsString", &segmentationEngine);
}
void registerLineAsStringSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("LineAsString", [](
const FormatSettings &)
{
return std::make_shared<LinaAsStringSchemaReader>();
});
}
}
|