aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/LineAsStringRowInputFormat.cpp
blob: 036539c87e7ba7b4c4b9d781c85269b9504c7133 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include <Processors/Formats/Impl/LineAsStringRowInputFormat.h>
#include <base/find_symbols.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnString.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int INCORRECT_QUERY;
    extern const int LOGICAL_ERROR;
}

LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
    IRowInputFormat(header_, in_, std::move(params_))
{
    if (header_.columns() != 1
        || !typeid_cast<const ColumnString *>(header_.getByPosition(0).column.get()))
    {
        throw Exception(ErrorCodes::INCORRECT_QUERY, "This input format is only suitable for tables with a single column of type String.");
    }
}

void LineAsStringRowInputFormat::resetParser()
{
    IRowInputFormat::resetParser();
}

void LineAsStringRowInputFormat::readLineObject(IColumn & column)
{
    ColumnString & column_string = assert_cast<ColumnString &>(column);
    auto & chars = column_string.getChars();
    auto & offsets = column_string.getOffsets();

    readStringUntilNewlineInto(chars, *in);
    chars.push_back(0);
    offsets.push_back(chars.size());

    if (!in->eof())
        in->ignore(); /// Skip '\n'
}

bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
    if (in->eof())
        return false;

    readLineObject(*columns[0]);
    return true;
}

size_t LineAsStringRowInputFormat::countRows(size_t max_block_size)
{
    size_t num_rows = 0;
    while (!in->eof() && num_rows < max_block_size)
    {
        skipToNextLineOrEOF(*in);
        ++num_rows;
    }

    return num_rows;
}

void registerInputFormatLineAsString(FormatFactory & factory)
{
    factory.registerInputFormat("LineAsString", [](
        ReadBuffer & buf,
        const Block & sample,
        const RowInputFormatParams & params,
        const FormatSettings &)
    {
        return std::make_shared<LineAsStringRowInputFormat>(sample, buf, params);
    });
}


static std::pair<bool, size_t> segmentationEngine(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
    char * pos = in.position();
    bool need_more_data = true;
    size_t number_of_rows = 0;

    while (loadAtPosition(in, memory, pos) && need_more_data)
    {
        pos = find_first_symbols<'\n'>(pos, in.buffer().end());
        if (pos > in.buffer().end())
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Position in buffer is out of bounds. There must be a bug.");
        else if (pos == in.buffer().end())
            continue;

        ++number_of_rows;
        if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows))
            need_more_data = false;

        if (*pos == '\n')
            ++pos;
    }

    saveUpToPosition(in, memory, pos);

    return {loadAtPosition(in, memory, pos), number_of_rows};
}

void registerFileSegmentationEngineLineAsString(FormatFactory & factory)
{
    factory.registerFileSegmentationEngine("LineAsString", &segmentationEngine);
}


void registerLineAsStringSchemaReader(FormatFactory & factory)
{
    factory.registerExternalSchemaReader("LineAsString", [](
        const FormatSettings &)
    {
        return std::make_shared<LinaAsStringSchemaReader>();
    });
}

}