aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.cpp
blob: 7c0428834e0dff2bd0f719b56cd7557787931c05 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <Core/Block.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeLowCardinality.h>


namespace DB
{
ODBCDriver2BlockOutputFormat::ODBCDriver2BlockOutputFormat(
    WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
    : IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations())
{
}

static void writeODBCString(WriteBuffer & out, const std::string & str)
{
    writeBinaryLittleEndian(Int32(str.size()), out);
    out.write(str.data(), str.size());
}

void ODBCDriver2BlockOutputFormat::writeRow(const Columns & columns, size_t row_idx, std::string & buffer)
{
    size_t num_columns = columns.size();
    for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
    {
        buffer.clear();
        const auto & column = columns[column_idx];

        if (column->isNullAt(row_idx))
        {
            writeBinaryLittleEndian(Int32(-1), out);
        }
        else
        {
            {
                WriteBufferFromString text_out(buffer);
                serializations[column_idx]->serializeText(*column, row_idx, text_out, format_settings);
            }
            writeODBCString(out, buffer);
        }
    }
}

void ODBCDriver2BlockOutputFormat::write(Chunk chunk, PortKind)
{
    String text_value;
    const auto & columns = chunk.getColumns();

    const size_t rows = chunk.getNumRows();
    for (size_t i = 0; i < rows; ++i)
        writeRow(columns, i, text_value);
}

void ODBCDriver2BlockOutputFormat::consume(Chunk chunk)
{
    write(std::move(chunk), PortKind::Main);
}

void ODBCDriver2BlockOutputFormat::consumeTotals(Chunk chunk)
{
    write(std::move(chunk), PortKind::Totals);
}

void ODBCDriver2BlockOutputFormat::writePrefix()
{
    const auto & header = getPort(PortKind::Main).getHeader();
    const size_t columns = header.columns();

    /// Number of header rows.
    writeBinaryLittleEndian(Int32(2), out);

    /// Names of columns.
    /// Number of columns + 1 for first name column.
    writeBinaryLittleEndian(Int32(columns + 1), out);
    writeODBCString(out, "name");
    for (size_t i = 0; i < columns; ++i)
    {
        const ColumnWithTypeAndName & col = header.getByPosition(i);
        writeODBCString(out, col.name);
    }

    /// Types of columns.
    writeBinaryLittleEndian(Int32(columns + 1), out);
    writeODBCString(out, "type");
    for (size_t i = 0; i < columns; ++i)
    {
        auto type = header.getByPosition(i).type;
        if (type->lowCardinality())
            type = recursiveRemoveLowCardinality(type);
        writeODBCString(out, type->getName());
    }
}


void registerOutputFormatODBCDriver2(FormatFactory & factory)
{
    factory.registerOutputFormat(
        "ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings)
        {
            return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
        });
}

}