1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parquet
import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/parquet/compress"
)
// Constants for default property values used for the default reader, writer and column props.
const (
// Default Buffer size used for the Reader
DefaultBufSize int64 = 4096 * 4
// Default data page size limit is 1K it's not guaranteed, but we will try to
// cut data pages off at this size where possible.
DefaultDataPageSize int64 = 1024 * 1024
// Default is for dictionary encoding to be turned on, use WithDictionaryDefault
// writer property to change that.
DefaultDictionaryEnabled = true
// If the dictionary reaches the size of this limitation, the writer will use
// the fallback encoding (usually plain) instead of continuing to build the
// dictionary index.
DefaultDictionaryPageSizeLimit = DefaultDataPageSize
// In order to attempt to facilitate data page size limits for writing,
// data is written in batches. Increasing the batch size may improve performance
// but the larger the batch size, the easier it is to overshoot the datapage limit.
DefaultWriteBatchSize int64 = 1024
// Default maximum number of rows for a single row group
DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024
// Default is to have stats enabled for all columns, use writer properties to
// change the default, or to enable/disable for specific columns.
DefaultStatsEnabled = true
// If the stats are larger than 4K the writer will skip writing them out anyways.
DefaultMaxStatsSize int64 = 4096
DefaultCreatedBy = "parquet-go version 13.0.0-SNAPSHOT"
DefaultRootName = "schema"
)
// ColumnProperties defines the encoding, codec, and so on for a given column.
type ColumnProperties struct {
Encoding Encoding
Codec compress.Compression
DictionaryEnabled bool
StatsEnabled bool
MaxStatsSize int64
CompressionLevel int
}
// DefaultColumnProperties returns the default properties which get utilized for writing.
//
// The default column properties are the following constants:
// Encoding: Encodings.Plain
// Codec: compress.Codecs.Uncompressed
// DictionaryEnabled: DefaultDictionaryEnabled
// StatsEnabled: DefaultStatsEnabled
// MaxStatsSize: DefaultMaxStatsSize
// CompressionLevel: compress.DefaultCompressionLevel
func DefaultColumnProperties() ColumnProperties {
return ColumnProperties{
Encoding: Encodings.Plain,
Codec: compress.Codecs.Uncompressed,
DictionaryEnabled: DefaultDictionaryEnabled,
StatsEnabled: DefaultStatsEnabled,
MaxStatsSize: DefaultMaxStatsSize,
CompressionLevel: compress.DefaultCompressionLevel,
}
}
type writerPropConfig struct {
wr *WriterProperties
encodings map[string]Encoding
codecs map[string]compress.Compression
compressLevel map[string]int
dictEnabled map[string]bool
statsEnabled map[string]bool
}
// WriterProperty is used as the options for building a writer properties instance
type WriterProperty func(*writerPropConfig)
// WithAllocator specifies the writer to use the given allocator
func WithAllocator(mem memory.Allocator) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.mem = mem
}
}
// WithDictionaryDefault sets the default value for whether to enable dictionary encoding
func WithDictionaryDefault(dict bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.DictionaryEnabled = dict
}
}
// WithDictionaryFor allows enabling or disabling dictionary encoding for a given column path string
func WithDictionaryFor(path string, dict bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.dictEnabled[path] = dict
}
}
// WithDictionaryPath is like WithDictionaryFor, but takes a ColumnPath type
func WithDictionaryPath(path ColumnPath, dict bool) WriterProperty {
return WithDictionaryFor(path.String(), dict)
}
// WithDictionaryPageSizeLimit is the limit of the dictionary at which the writer
// will fallback to plain encoding instead
func WithDictionaryPageSizeLimit(limit int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.dictPagesize = limit
}
}
// WithBatchSize specifies the number of rows to use for batch writes to columns
func WithBatchSize(batch int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.batchSize = batch
}
}
// WithMaxRowGroupLength specifies the number of rows as the maximum number of rows for a given row group in the writer.
func WithMaxRowGroupLength(nrows int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.maxRowGroupLen = nrows
}
}
// WithDataPageSize specifies the size to use for splitting data pages for column writing.
func WithDataPageSize(pgsize int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.pageSize = pgsize
}
}
// WithDataPageVersion specifies whether to use Version 1 or Version 2 of the DataPage spec
func WithDataPageVersion(version DataPageVersion) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.dataPageVersion = version
}
}
// WithVersion specifies which Parquet Spec version to utilize for writing.
func WithVersion(version Version) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.parquetVersion = version
}
}
// WithCreatedBy specifies the "created by" string to use for the writer
func WithCreatedBy(createdby string) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.createdBy = createdby
}
}
// WithRootName enables customization of the name used for the root schema node. This is required
// to maintain compatibility with other tools.
func WithRootName(name string) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.rootName = name
}
}
// WithRootRepetition enables customization of the repetition used for the root schema node.
// This is required to maintain compatibility with other tools.
func WithRootRepetition(repetition Repetition) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.rootRepetition = repetition
}
}
// WithEncoding defines the encoding that is used when we aren't using dictionary encoding.
//
// This is either applied if dictionary encoding is disabled, or if we fallback if the dictionary
// grew too large.
func WithEncoding(encoding Encoding) WriterProperty {
return func(cfg *writerPropConfig) {
if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict {
panic("parquet: can't use dictionary encoding as fallback encoding")
}
cfg.wr.defColumnProps.Encoding = encoding
}
}
// WithEncodingFor is for defining the encoding only for a specific column path. This encoding will be used
// if dictionary encoding is disabled for the column or if we fallback because the dictionary grew too large
func WithEncodingFor(path string, encoding Encoding) WriterProperty {
return func(cfg *writerPropConfig) {
if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict {
panic("parquet: can't use dictionary encoding as fallback encoding")
}
cfg.encodings[path] = encoding
}
}
// WithEncodingPath is the same as WithEncodingFor but takes a ColumnPath directly.
func WithEncodingPath(path ColumnPath, encoding Encoding) WriterProperty {
return WithEncodingFor(path.String(), encoding)
}
// WithCompression specifies the default compression type to use for column writing.
func WithCompression(codec compress.Compression) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.Codec = codec
}
}
// WithCompressionFor specifies the compression type for the given column.
func WithCompressionFor(path string, codec compress.Compression) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.codecs[path] = codec
}
}
// WithCompressionPath is the same as WithCompressionFor but takes a ColumnPath directly.
func WithCompressionPath(path ColumnPath, codec compress.Compression) WriterProperty {
return WithCompressionFor(path.String(), codec)
}
// WithMaxStatsSize sets a maximum size for the statistics before we decide not to include them.
func WithMaxStatsSize(maxStatsSize int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.MaxStatsSize = maxStatsSize
}
}
// WithCompressionLevel specifies the default compression level for the compressor in every column.
//
// The provided compression level is compressor specific. The user would have to know what the available
// levels are for the selected compressor. If the compressor does not allow for selecting different
// compression levels, then this function will have no effect. Parquet and Arrow will not validate the
// passed compression level. If no level is selected by the user or if the special compress.DefaultCompressionLevel
// value is used, then parquet will select the compression level.
func WithCompressionLevel(level int) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.CompressionLevel = level
}
}
// WithCompressionLevelFor is like WithCompressionLevel but only for the given column path.
func WithCompressionLevelFor(path string, level int) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.compressLevel[path] = level
}
}
// WithCompressionLevelPath is the same as WithCompressionLevelFor but takes a ColumnPath
func WithCompressionLevelPath(path ColumnPath, level int) WriterProperty {
return WithCompressionLevelFor(path.String(), level)
}
// WithStats specifies a default for whether or not to enable column statistics.
func WithStats(enabled bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.StatsEnabled = enabled
}
}
// WithStatsFor specifies a per column value as to enable or disable statistics in the resulting file.
func WithStatsFor(path string, enabled bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.statsEnabled[path] = enabled
}
}
// WithStatsPath is the same as WithStatsFor but takes a ColumnPath
func WithStatsPath(path ColumnPath, enabled bool) WriterProperty {
return WithStatsFor(path.String(), enabled)
}
// WithEncryptionProperties specifies the file level encryption handling for writing the file.
func WithEncryptionProperties(props *FileEncryptionProperties) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.encryptionProps = props
}
}
// WriterProperties is the collection of properties to use for writing a parquet file. The values are
// read only once it has been constructed.
type WriterProperties struct {
mem memory.Allocator
dictPagesize int64
batchSize int64
maxRowGroupLen int64
pageSize int64
parquetVersion Version
createdBy string
dataPageVersion DataPageVersion
rootName string
rootRepetition Repetition
defColumnProps ColumnProperties
columnProps map[string]*ColumnProperties
encryptionProps *FileEncryptionProperties
}
func defaultWriterProperties() *WriterProperties {
return &WriterProperties{
mem: memory.DefaultAllocator,
dictPagesize: DefaultDictionaryPageSizeLimit,
batchSize: DefaultWriteBatchSize,
maxRowGroupLen: DefaultMaxRowGroupLen,
pageSize: DefaultDataPageSize,
parquetVersion: V2_LATEST,
dataPageVersion: DataPageV1,
createdBy: DefaultCreatedBy,
rootName: DefaultRootName,
rootRepetition: Repetitions.Repeated,
defColumnProps: DefaultColumnProperties(),
}
}
// NewWriterProperties takes a list of options for building the properties. If multiple options are used which conflict
// then the last option is the one which will take effect. If no WriterProperty options are provided, then the default
// properties will be utilized for writing.
//
// The Default properties use the following constants:
// Allocator: memory.DefaultAllocator
// DictionaryPageSize: DefaultDictionaryPageSizeLimit
// BatchSize: DefaultWriteBatchSize
// MaxRowGroupLength: DefaultMaxRowGroupLen
// PageSize: DefaultDataPageSize
// ParquetVersion: V1
// DataPageVersion: DataPageV1
// CreatedBy: DefaultCreatedBy
func NewWriterProperties(opts ...WriterProperty) *WriterProperties {
cfg := writerPropConfig{
wr: defaultWriterProperties(),
encodings: make(map[string]Encoding),
codecs: make(map[string]compress.Compression),
compressLevel: make(map[string]int),
dictEnabled: make(map[string]bool),
statsEnabled: make(map[string]bool),
}
for _, o := range opts {
o(&cfg)
}
cfg.wr.columnProps = make(map[string]*ColumnProperties)
get := func(key string) *ColumnProperties {
if p, ok := cfg.wr.columnProps[key]; ok {
return p
}
cfg.wr.columnProps[key] = new(ColumnProperties)
*cfg.wr.columnProps[key] = cfg.wr.defColumnProps
return cfg.wr.columnProps[key]
}
for key, value := range cfg.encodings {
get(key).Encoding = value
}
for key, value := range cfg.codecs {
get(key).Codec = value
}
for key, value := range cfg.compressLevel {
get(key).CompressionLevel = value
}
for key, value := range cfg.dictEnabled {
get(key).DictionaryEnabled = value
}
for key, value := range cfg.statsEnabled {
get(key).StatsEnabled = value
}
return cfg.wr
}
// FileEncryptionProperties returns the current encryption properties that were
// used to create the writer properties.
func (w *WriterProperties) FileEncryptionProperties() *FileEncryptionProperties {
return w.encryptionProps
}
func (w *WriterProperties) Allocator() memory.Allocator { return w.mem }
func (w *WriterProperties) CreatedBy() string { return w.createdBy }
func (w *WriterProperties) RootName() string { return w.rootName }
func (w *WriterProperties) RootRepetition() Repetition { return w.rootRepetition }
func (w *WriterProperties) WriteBatchSize() int64 { return w.batchSize }
func (w *WriterProperties) DataPageSize() int64 { return w.pageSize }
func (w *WriterProperties) DictionaryPageSizeLimit() int64 { return w.dictPagesize }
func (w *WriterProperties) Version() Version { return w.parquetVersion }
func (w *WriterProperties) DataPageVersion() DataPageVersion { return w.dataPageVersion }
func (w *WriterProperties) MaxRowGroupLength() int64 { return w.maxRowGroupLen }
// Compression returns the default compression type that will be used for any columns that don't
// have a specific compression defined.
func (w *WriterProperties) Compression() compress.Compression { return w.defColumnProps.Codec }
// CompressionFor will return the compression type that is specified for the given column path, or
// the default compression codec if there isn't one specific to this column.
func (w *WriterProperties) CompressionFor(path string) compress.Compression {
if p, ok := w.columnProps[path]; ok {
return p.Codec
}
return w.defColumnProps.Codec
}
//CompressionPath is the same as CompressionFor but takes a ColumnPath
func (w *WriterProperties) CompressionPath(path ColumnPath) compress.Compression {
return w.CompressionFor(path.String())
}
// CompressionLevel returns the default compression level that will be used for any column
// that doesn't have a compression level specified for it.
func (w *WriterProperties) CompressionLevel() int { return w.defColumnProps.CompressionLevel }
// CompressionLevelFor returns the compression level that will be utilized for the given column,
// or the default compression level if the column doesn't have a specific level specified.
func (w *WriterProperties) CompressionLevelFor(path string) int {
if p, ok := w.columnProps[path]; ok {
return p.CompressionLevel
}
return w.defColumnProps.CompressionLevel
}
// CompressionLevelPath is the same as CompressionLevelFor but takes a ColumnPath object
func (w *WriterProperties) CompressionLevelPath(path ColumnPath) int {
return w.CompressionLevelFor(path.String())
}
// Encoding returns the default encoding that will be utilized for any columns which don't have a different value
// specified.
func (w *WriterProperties) Encoding() Encoding { return w.defColumnProps.Encoding }
// EncodingFor returns the encoding that will be used for the given column path, or the default encoding if there
// isn't one specified for this column.
func (w *WriterProperties) EncodingFor(path string) Encoding {
if p, ok := w.columnProps[path]; ok {
return p.Encoding
}
return w.defColumnProps.Encoding
}
// EncodingPath is the same as EncodingFor but takes a ColumnPath object
func (w *WriterProperties) EncodingPath(path ColumnPath) Encoding {
return w.EncodingFor(path.String())
}
// DictionaryIndexEncoding returns which encoding will be used for the Dictionary Index values based on the
// parquet version. V1 uses PlainDict and V2 uses RLEDict
func (w *WriterProperties) DictionaryIndexEncoding() Encoding {
if w.parquetVersion == V1_0 {
return Encodings.PlainDict
}
return Encodings.RLEDict
}
// DictionaryPageEncoding returns the encoding that will be utilized for the DictionaryPage itself based on the parquet
// version. V1 uses PlainDict, v2 uses Plain
func (w *WriterProperties) DictionaryPageEncoding() Encoding {
if w.parquetVersion == V1_0 {
return Encodings.PlainDict
}
return Encodings.Plain
}
// DictionaryEnabled returns the default value as for whether or not dictionary encoding will be utilized for columns
// that aren't separately specified.
func (w *WriterProperties) DictionaryEnabled() bool { return w.defColumnProps.DictionaryEnabled }
// DictionaryEnabledFor returns whether or not dictionary encoding will be used for the specified column when writing
// or the default value if the column was not separately specified.
func (w *WriterProperties) DictionaryEnabledFor(path string) bool {
if p, ok := w.columnProps[path]; ok {
return p.DictionaryEnabled
}
return w.defColumnProps.DictionaryEnabled
}
// DictionaryEnabledPath is the same as DictionaryEnabledFor but takes a ColumnPath object.
func (w *WriterProperties) DictionaryEnabledPath(path ColumnPath) bool {
return w.DictionaryEnabledFor(path.String())
}
// StatisticsEnabled returns the default value for whether or not stats are enabled to be written for columns
// that aren't separately specified.
func (w *WriterProperties) StatisticsEnabled() bool { return w.defColumnProps.StatsEnabled }
// StatisticsEnabledFor returns whether stats will be written for the given column path, or the default value if
// it wasn't separately specified.
func (w *WriterProperties) StatisticsEnabledFor(path string) bool {
if p, ok := w.columnProps[path]; ok {
return p.StatsEnabled
}
return w.defColumnProps.StatsEnabled
}
// StatisticsEnabledPath is the same as StatisticsEnabledFor but takes a ColumnPath object.
func (w *WriterProperties) StatisticsEnabledPath(path ColumnPath) bool {
return w.StatisticsEnabledFor(path.String())
}
// MaxStatsSize returns the default maximum size for stats
func (w *WriterProperties) MaxStatsSize() int64 { return w.defColumnProps.MaxStatsSize }
// MaxStatsSizeFor returns the maximum stat size for the given column path
func (w *WriterProperties) MaxStatsSizeFor(path string) int64 {
if p, ok := w.columnProps[path]; ok {
return p.MaxStatsSize
}
return w.defColumnProps.MaxStatsSize
}
// MaxStatsSizePath is the same as MaxStatsSizeFor but takes a ColumnPath
func (w *WriterProperties) MaxStatsSizePath(path ColumnPath) int64 {
return w.MaxStatsSizeFor(path.String())
}
// ColumnEncryptionProperties returns the specific properties for encryption that will be used for the given column path
func (w *WriterProperties) ColumnEncryptionProperties(path string) *ColumnEncryptionProperties {
if w.encryptionProps != nil {
return w.encryptionProps.ColumnEncryptionProperties(path)
}
return nil
}
|