aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp
blob: c163757349c0df505c30eba64ad29ef7cf676c5e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include "ArrowBlockOutputFormat.h"

#if USE_ARROW

#include <Formats/FormatFactory.h>
#error #include <arrow/ipc/writer.h>
#error #include <arrow/table.h>
#error #include <arrow/result.h>
#include "ArrowBufferedStreams.h"
#include "CHColumnToArrowColumn.h"


namespace DB
{
namespace ErrorCodes
{
    extern const int UNKNOWN_EXCEPTION;
}

namespace
{

arrow::Compression::type getArrowCompression(FormatSettings::ArrowCompression method)
{
    switch (method)
    {
        case FormatSettings::ArrowCompression::NONE:
            return arrow::Compression::type::UNCOMPRESSED;
        case FormatSettings::ArrowCompression::ZSTD:
            return arrow::Compression::type::ZSTD;
        case FormatSettings::ArrowCompression::LZ4_FRAME:
            return arrow::Compression::type::LZ4_FRAME;
    }
}

}

ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
    : IOutputFormat(header_, out_)
    , stream{stream_}
    , format_settings{format_settings_}
{
}

void ArrowBlockOutputFormat::consume(Chunk chunk)
{
    const size_t columns_num = chunk.getNumColumns();
    std::shared_ptr<arrow::Table> arrow_table;

    if (!ch_column_to_arrow_column)
    {
        const Block & header = getPort(PortKind::Main).getHeader();
        ch_column_to_arrow_column = std::make_unique<CHColumnToArrowColumn>(
            header,
            "Arrow",
            format_settings.arrow.low_cardinality_as_dictionary,
            format_settings.arrow.output_string_as_string,
            format_settings.arrow.output_fixed_string_as_fixed_byte_array);
    }

    auto chunks = std::vector<Chunk>();
    chunks.push_back(std::move(chunk));
    ch_column_to_arrow_column->chChunkToArrowTable(arrow_table, chunks, columns_num);

    if (!writer)
        prepareWriter(arrow_table->schema());

    // TODO: calculate row_group_size depending on a number of rows and table size
    auto status = writer->WriteTable(*arrow_table, format_settings.arrow.row_group_size);

    if (!status.ok())
        throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
            "Error while writing a table: {}", status.ToString());
}

void ArrowBlockOutputFormat::finalizeImpl()
{
    if (!writer)
    {
        Block header = materializeBlock(getPort(PortKind::Main).getHeader());

        consume(Chunk(header.getColumns(), 0));
    }

    auto status = writer->Close();
    if (!status.ok())
        throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
            "Error while closing a table: {}", status.ToString());
}

void ArrowBlockOutputFormat::resetFormatterImpl()
{
    writer.reset();
    arrow_ostream.reset();
}

void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
{
    arrow_ostream = std::make_shared<ArrowBufferedOutputStream>(out);
    arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
    arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
    options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method));
    options.emit_dictionary_deltas = true;

    // TODO: should we use arrow::ipc::IpcOptions::alignment?
    if (stream)
        writer_status = arrow::ipc::MakeStreamWriter(arrow_ostream.get(), schema, options);
    else
        writer_status = arrow::ipc::MakeFileWriter(arrow_ostream.get(), schema,options);

    if (!writer_status.ok())
        throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
            "Error while opening a table writer: {}", writer_status.status().ToString());

    writer = *writer_status;
}

void registerOutputFormatArrow(FormatFactory & factory)
{
    factory.registerOutputFormat(
        "Arrow",
        [](WriteBuffer & buf,
           const Block & sample,
           const FormatSettings & format_settings)
        {
            return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
        });
    factory.markFormatHasNoAppendSupport("Arrow");

    factory.registerOutputFormat(
        "ArrowStream",
        [](WriteBuffer & buf,
           const Block & sample,
           const FormatSettings & format_settings)
        {
            return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
        });
    factory.markFormatHasNoAppendSupport("ArrowStream");
    factory.markOutputFormatPrefersLargeBlocks("ArrowStream");
}

}

#else

namespace DB
{
class FormatFactory;
void registerOutputFormatArrow(FormatFactory &)
{
}
}

#endif