aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h
blob: ae3cfeb99841a9147fba1a538cd963cea82b2f41 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#pragma once
#include "clickhouse_config.h"

#if USE_PARQUET

#include <Processors/Formats/IOutputFormat.h>
#error #include <Processors/Formats/Impl/Parquet/Write.h>
#include <Formats/FormatSettings.h>
#include <Common/ThreadPool.h>

namespace arrow
{
class Array;
class DataType;
}

namespace parquet
{
namespace arrow
{
    class FileWriter;
}
}

namespace DB
{

class CHColumnToArrowColumn;

class ParquetBlockOutputFormat : public IOutputFormat
{
public:
    ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
    ~ParquetBlockOutputFormat() override;

    String getName() const override { return "ParquetBlockOutputFormat"; }

    String getContentType() const override { return "application/octet-stream"; }

private:
    struct MemoryToken
    {
        ParquetBlockOutputFormat * parent;
        size_t bytes = 0;

        explicit MemoryToken(ParquetBlockOutputFormat * p, size_t b = 0) : parent(p)
        {
            set(b);
        }

        MemoryToken(MemoryToken && t)
          : parent(std::exchange(t.parent, nullptr)), bytes(std::exchange(t.bytes, 0)) {}

        MemoryToken & operator=(MemoryToken && t)
        {
            parent = std::exchange(t.parent, nullptr);
            bytes = std::exchange(t.bytes, 0);
            return *this;
        }

        ~MemoryToken()
        {
            set(0);
        }

        void set(size_t new_size)
        {
            if (new_size == bytes)
                return;
            parent->bytes_in_flight += new_size - bytes; // overflow is fine
            bytes = new_size;
        }
    };

    struct ColumnChunk
    {
        Parquet::ColumnChunkWriteState state;
        PODArray<char> serialized;

        MemoryToken mem;

        ColumnChunk(ParquetBlockOutputFormat * p) : mem(p) {}
    };

    struct RowGroupState
    {
        size_t tasks_in_flight = 0;
        std::vector<std::vector<ColumnChunk>> column_chunks;
        size_t num_rows = 0;
    };

    struct Task
    {
        RowGroupState * row_group;
        size_t column_idx;
        size_t subcolumn_idx = 0;

        MemoryToken mem;

        /// If not null, we need to call prepareColumnForWrite().
        /// Otherwise we need to call writeColumnChunkBody().
        DataTypePtr column_type;
        std::string column_name;
        std::vector<ColumnPtr> column_pieces;

        Parquet::ColumnChunkWriteState state;

        Task(RowGroupState * rg, size_t ci, ParquetBlockOutputFormat * p)
            : row_group(rg), column_idx(ci), mem(p) {}
    };

    void consume(Chunk) override;
    void finalizeImpl() override;
    void resetFormatterImpl() override;
    void onCancel() override;

    void writeRowGroup(std::vector<Chunk> chunks);
    void writeUsingArrow(std::vector<Chunk> chunks);
    void writeRowGroupInOneThread(Chunk chunk);
    void writeRowGroupInParallel(std::vector<Chunk> chunks);

    void threadFunction();
    void startMoreThreadsIfNeeded(const std::unique_lock<std::mutex> & lock);

    /// Called in single-threaded fashion. Writes to the file.
    void reapCompletedRowGroups(std::unique_lock<std::mutex> & lock);

    const FormatSettings format_settings;

    /// Chunks to squash together to form a row group.
    std::vector<Chunk> staging_chunks;
    size_t staging_rows = 0;
    size_t staging_bytes = 0;

    std::unique_ptr<parquet::arrow::FileWriter> file_writer;
    std::unique_ptr<CHColumnToArrowColumn> ch_column_to_arrow_column;

    Parquet::WriteOptions options;
    Parquet::SchemaElements schema;
    std::vector<parquet::format::RowGroup> row_groups_complete;
    size_t base_offset = 0;


    std::mutex mutex;
    std::condition_variable condvar; // wakes up consume()
    std::unique_ptr<ThreadPool> pool;

    std::atomic_bool is_stopped{false};
    std::exception_ptr background_exception = nullptr;

    /// Invariant: if there's at least one task then there's at least one thread.
    size_t threads_running = 0;
    std::atomic<size_t> bytes_in_flight{0};

    std::deque<Task> task_queue;
    std::deque<RowGroupState> row_groups;
};

}

#endif