1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <array>
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "arrow/util/optional.h"
#include "arrow/util/string_view.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
namespace parquet {
/// \brief A class for writing Parquet files using an output stream type API.
///
/// The values given must be of the correct type i.e. the type must
/// match the file schema exactly otherwise a ParquetException will be
/// thrown.
///
/// The user must explicitly indicate the end of the row using the
/// EndRow() function or EndRow output manipulator.
///
/// A maximum row group size can be configured, the default size is
/// 512MB. Alternatively the row group size can be set to zero and the
/// user can create new row groups by calling the EndRowGroup()
/// function or using the EndRowGroup output manipulator.
///
/// Required and optional fields are supported:
/// - Required fields are written using operator<<(T)
/// - Optional fields are written using
/// operator<<(arrow::util::optional<T>).
///
/// Note that operator<<(T) can be used to write optional fields.
///
/// Similarly, operator<<(arrow::util::optional<T>) can be used to
/// write required fields. However if the optional parameter does not
/// have a value (i.e. it is nullopt) then a ParquetException will be
/// raised.
///
/// Currently there is no support for repeated fields.
///
class PARQUET_EXPORT StreamWriter {
public:
template <typename T>
using optional = ::arrow::util::optional<T>;
// N.B. Default constructed objects are not usable. This
// constructor is provided so that the object may be move
// assigned afterwards.
StreamWriter() = default;
explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer);
~StreamWriter() = default;
static void SetDefaultMaxRowGroupSize(int64_t max_size);
void SetMaxRowGroupSize(int64_t max_size);
int current_column() const { return column_index_; }
int64_t current_row() const { return current_row_; }
int num_columns() const;
// Moving is possible.
StreamWriter(StreamWriter&&) = default;
StreamWriter& operator=(StreamWriter&&) = default;
// Copying is not allowed.
StreamWriter(const StreamWriter&) = delete;
StreamWriter& operator=(const StreamWriter&) = delete;
/// \brief Output operators for required fields.
/// These can also be used for optional fields when a value must be set.
StreamWriter& operator<<(bool v);
StreamWriter& operator<<(int8_t v);
StreamWriter& operator<<(uint8_t v);
StreamWriter& operator<<(int16_t v);
StreamWriter& operator<<(uint16_t v);
StreamWriter& operator<<(int32_t v);
StreamWriter& operator<<(uint32_t v);
StreamWriter& operator<<(int64_t v);
StreamWriter& operator<<(uint64_t v);
StreamWriter& operator<<(const std::chrono::milliseconds& v);
StreamWriter& operator<<(const std::chrono::microseconds& v);
StreamWriter& operator<<(float v);
StreamWriter& operator<<(double v);
StreamWriter& operator<<(char v);
/// \brief Helper class to write fixed length strings.
/// This is useful as the standard string view (such as
/// arrow::util::string_view) is for variable length data.
struct PARQUET_EXPORT FixedStringView {
FixedStringView() = default;
explicit FixedStringView(const char* data_ptr);
FixedStringView(const char* data_ptr, std::size_t data_len);
const char* data{NULLPTR};
std::size_t size{0};
};
/// \brief Output operators for fixed length strings.
template <int N>
StreamWriter& operator<<(const char (&v)[N]) {
return WriteFixedLength(v, N);
}
template <std::size_t N>
StreamWriter& operator<<(const std::array<char, N>& v) {
return WriteFixedLength(v.data(), N);
}
StreamWriter& operator<<(FixedStringView v);
/// \brief Output operators for variable length strings.
StreamWriter& operator<<(const char* v);
StreamWriter& operator<<(const std::string& v);
StreamWriter& operator<<(::arrow::util::string_view v);
/// \brief Output operator for optional fields.
template <typename T>
StreamWriter& operator<<(const optional<T>& v) {
if (v) {
return operator<<(*v);
}
SkipOptionalColumn();
return *this;
}
/// \brief Skip the next N columns of optional data. If there are
/// less than N columns remaining then the excess columns are
/// ignored.
/// \throws ParquetException if there is an attempt to skip any
/// required column.
/// \return Number of columns actually skipped.
int64_t SkipColumns(int num_columns_to_skip);
/// \brief Terminate the current row and advance to next one.
/// \throws ParquetException if all columns in the row were not
/// written or skipped.
void EndRow();
/// \brief Terminate the current row group and create new one.
void EndRowGroup();
protected:
template <typename WriterType, typename T>
StreamWriter& Write(const T v) {
auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++));
writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);
if (max_row_group_size_ > 0) {
row_group_size_ += writer->EstimatedBufferedValueBytes();
}
return *this;
}
StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len);
StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);
void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
int length = -1);
/// \brief Skip the next column which must be optional.
/// \throws ParquetException if the next column does not exist or is
/// not optional.
void SkipOptionalColumn();
void WriteNullValue(ColumnWriter* writer);
private:
using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;
struct null_deleter {
void operator()(void*) {}
};
int32_t column_index_{0};
int64_t current_row_{0};
int64_t row_group_size_{0};
int64_t max_row_group_size_{default_row_group_size_};
std::unique_ptr<ParquetFileWriter> file_writer_;
std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_;
std::vector<node_ptr_type> nodes_;
static constexpr int16_t kDefLevelZero = 0;
static constexpr int16_t kDefLevelOne = 1;
static constexpr int16_t kRepLevelZero = 0;
static constexpr int64_t kBatchSizeOne = 1;
static int64_t default_row_group_size_;
};
struct PARQUET_EXPORT EndRowType {};
constexpr EndRowType EndRow = {};
struct PARQUET_EXPORT EndRowGroupType {};
constexpr EndRowGroupType EndRowGroup = {};
PARQUET_EXPORT
StreamWriter& operator<<(StreamWriter&, EndRowType);
PARQUET_EXPORT
StreamWriter& operator<<(StreamWriter&, EndRowGroupType);
} // namespace parquet
|