1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
|
#pragma clang system_header
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/array.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/compute/exec.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/compute/util.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/compute/util_internal.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/type.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/cpu_info.h"
#include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h"
/// This file contains lightweight containers for Arrow buffers. These containers
/// makes compromises in terms of strong ownership and the range of data types supported
/// in order to gain performance and reduced overhead.
namespace arrow20 {
namespace compute {
/// \brief Context needed by various execution engine operations
///
/// In the execution engine this context is provided by either the node or the
/// plan and the context exists for the lifetime of the plan. Defining this here
/// allows us to take advantage of these resources without coupling the logic with
/// the execution engine.
struct LightContext {
bool has_avx2() const { return (hardware_flags & arrow20::internal::CpuInfo::AVX2) > 0; }
int64_t hardware_flags;
util::TempVectorStack* stack;
};
/// \brief Description of the layout of a "key" column
///
/// A "key" column is a non-nested, non-union column.
/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
/// and no children.
///
/// This metadata object is a zero-allocation analogue of arrow20::DataType
struct ARROW_EXPORT KeyColumnMetadata {
KeyColumnMetadata() = default;
KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
bool is_null_type_in = false)
: is_fixed_length(is_fixed_length_in),
is_null_type(is_null_type_in),
fixed_length(fixed_length_in) {}
/// \brief True if the column is not a varying-length binary type
///
/// If this is true the column will have a validity buffer and
/// a data buffer and the third buffer will be unused.
bool is_fixed_length;
/// \brief True if this column is the null type(NA).
bool is_null_type;
/// \brief The number of bytes for each item
///
/// Zero has a special meaning, indicating a bit vector with one bit per value if it
/// isn't a null type column. Generally, this means that the column is a boolean type.
///
/// For a varying-length binary column this represents the number of bytes per offset.
uint32_t fixed_length;
};
/// \brief A lightweight view into a "key" array
///
/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
///
/// This metadata object is a zero-allocation analogue of arrow20::ArrayData
class ARROW_EXPORT KeyColumnArray {
public:
/// \brief Create an uninitialized KeyColumnArray
KeyColumnArray() = default;
/// \brief Create a read-only view from buffers
///
/// This is a view only and does not take ownership of the buffers. The lifetime
/// of the buffers must exceed the lifetime of this view
KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
const uint8_t* validity_buffer, const uint8_t* fixed_length_buffer,
const uint8_t* var_length_buffer, int bit_offset_validity = 0,
int bit_offset_fixed = 0);
/// \brief Create a mutable view from buffers
///
/// This is a view only and does not take ownership of the buffers. The lifetime
/// of the buffers must exceed the lifetime of this view
KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
uint8_t* var_length_buffer, int bit_offset_validity = 0,
int bit_offset_fixed = 0);
/// \brief Create a sliced view of `this`
///
/// The number of rows used in offset must be divisible by 8
/// in order to not split bit vectors within a single byte.
KeyColumnArray Slice(int64_t offset, int64_t length) const;
/// \brief Create a copy of `this` with a buffer from `other`
///
/// The copy will be identical to `this` except the buffer at buffer_id_to_replace
/// will be replaced by the corresponding buffer in `other`.
KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
int buffer_id_to_replace) const;
/// \brief Create a copy of `this` with new metadata
KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
// Constants used for accessing buffers using data() and mutable_data().
static constexpr int kValidityBuffer = 0;
static constexpr int kFixedLengthBuffer = 1;
static constexpr int kVariableLengthBuffer = 2;
/// \brief Return one of the underlying mutable buffers
uint8_t* mutable_data(int i) {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return mutable_buffers_[i];
}
/// \brief Return one of the underlying read-only buffers
const uint8_t* data(int i) const {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return buffers_[i];
}
/// \brief Return a mutable version of the offsets buffer
///
/// Only valid if this is a view into a varbinary type
uint32_t* mutable_offsets() {
DCHECK(!metadata_.is_fixed_length);
DCHECK_EQ(metadata_.fixed_length, sizeof(uint32_t));
return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
}
/// \brief Return a read-only version of the offsets buffer
///
/// Only valid if this is a view into a varbinary type
const uint32_t* offsets() const {
DCHECK(!metadata_.is_fixed_length);
DCHECK_EQ(metadata_.fixed_length, sizeof(uint32_t));
return reinterpret_cast<const uint32_t*>(data(kFixedLengthBuffer));
}
/// \brief Return a mutable version of the large-offsets buffer
///
/// Only valid if this is a view into a large varbinary type
uint64_t* mutable_large_offsets() {
DCHECK(!metadata_.is_fixed_length);
DCHECK_EQ(metadata_.fixed_length, sizeof(uint64_t));
return reinterpret_cast<uint64_t*>(mutable_data(kFixedLengthBuffer));
}
/// \brief Return a read-only version of the large-offsets buffer
///
/// Only valid if this is a view into a large varbinary type
const uint64_t* large_offsets() const {
DCHECK(!metadata_.is_fixed_length);
DCHECK_EQ(metadata_.fixed_length, sizeof(uint64_t));
return reinterpret_cast<const uint64_t*>(data(kFixedLengthBuffer));
}
/// \brief Return the type metadata
const KeyColumnMetadata& metadata() const { return metadata_; }
/// \brief Return the length (in rows) of the array
int64_t length() const { return length_; }
/// \brief Return the bit offset into the corresponding vector
///
/// if i == 1 then this must be a bool array
int bit_offset(int i) const {
ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
return bit_offset_[i];
}
private:
static constexpr int kMaxBuffers = 3;
const uint8_t* buffers_[kMaxBuffers];
uint8_t* mutable_buffers_[kMaxBuffers];
KeyColumnMetadata metadata_;
int64_t length_;
// Starting bit offset within the first byte (between 0 and 7)
// to be used when accessing buffers that store bit vectors.
int bit_offset_[kMaxBuffers - 1];
bool is_bool_type() const {
return metadata_.is_fixed_length && metadata_.fixed_length == 0 &&
!metadata_.is_null_type;
}
bool is_fixed_width_types() const {
return metadata_.is_fixed_length && metadata_.fixed_length != 0 &&
!metadata_.is_null_type;
}
bool is_binary_type() const {
return !metadata_.is_fixed_length && metadata_.fixed_length == sizeof(uint32_t) &&
!metadata_.is_null_type;
}
bool is_large_binary_type() const {
return !metadata_.is_fixed_length && metadata_.fixed_length == sizeof(uint64_t) &&
!metadata_.is_null_type;
}
bool is_null_type() const {
return metadata_.is_fixed_length && metadata_.fixed_length == 0 &&
metadata_.is_null_type;
}
};
/// \brief Create KeyColumnMetadata from a DataType
///
/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
/// the indices type
///
/// This should only be called on "key" columns. Calling this with
/// a non-key column will return Status::TypeError.
ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(
const std::shared_ptr<DataType>& type);
/// \brief Create KeyColumnArray from ArrayData
///
/// If `type` is a dictionary type then this will return the KeyColumnArray for
/// the indices array
///
/// The caller should ensure this is only called on "key" columns.
/// \see ColumnMetadataFromDataType for details
ARROW_EXPORT Result<KeyColumnArray> ColumnArrayFromArrayData(
const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t num_rows);
/// \brief Create KeyColumnArray from ArrayData and KeyColumnMetadata
///
/// If `type` is a dictionary type then this will return the KeyColumnArray for
/// the indices array
///
/// The caller should ensure this is only called on "key" columns.
/// \see ColumnMetadataFromDataType for details
ARROW_EXPORT KeyColumnArray ColumnArrayFromArrayDataAndMetadata(
const std::shared_ptr<ArrayData>& array_data, const KeyColumnMetadata& metadata,
int64_t start_row, int64_t num_rows);
/// \brief Create KeyColumnMetadata instances from an ExecBatch
///
/// column_metadatas will be resized to fit
///
/// All columns in `batch` must be eligible "key" columns and have an array shape
/// \see ColumnMetadataFromDataType for more details
ARROW_EXPORT Status ColumnMetadatasFromExecBatch(
const ExecBatch& batch, std::vector<KeyColumnMetadata>* column_metadatas);
/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
///
/// column_arrays will be resized to fit
///
/// All columns in `batch` must be eligible "key" columns and have an array shape
/// \see ColumnArrayFromArrayData for more details
ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row,
int64_t num_rows,
std::vector<KeyColumnArray>* column_arrays);
/// \brief Create KeyColumnArray instances from an ExecBatch
///
/// column_arrays will be resized to fit
///
/// All columns in `batch` must be eligible "key" columns and have an array shape
/// \see ColumnArrayFromArrayData for more details
ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch,
std::vector<KeyColumnArray>* column_arrays);
/// A lightweight resizable array for "key" columns
///
/// Unlike KeyColumnArray this instance owns its buffers
///
/// Resizing is handled by arrow20::ResizableBuffer and a doubling approach is
/// used so that resizes will always grow up to the next power of 2
class ARROW_EXPORT ResizableArrayData {
public:
/// \brief Create an uninitialized instance
///
/// Init must be called before calling any other operations
ResizableArrayData()
: log_num_rows_min_(0),
pool_(NULLPTR),
num_rows_(0),
num_rows_allocated_(0),
var_len_buf_size_(0) {}
~ResizableArrayData() { Clear(true); }
/// \brief Initialize the array
/// \param data_type The data type this array is holding data for.
/// \param pool The pool to make allocations on
/// \param log_num_rows_min All resize operations will allocate at least enough
/// space for (1 << log_num_rows_min) rows
Status Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
int log_num_rows_min);
/// \brief Resets the array back to an empty state
/// \param release_buffers If true then allocated memory is released and the
/// next resize operation will have to reallocate memory
void Clear(bool release_buffers);
/// \brief Resize the fixed length buffers
///
/// The buffers will be resized to hold at least `num_rows_new` rows of data
Status ResizeFixedLengthBuffers(int num_rows_new);
/// \brief Resize the varying length buffer if this array is a variable binary type
///
/// This must be called after offsets have been populated and the buffer will be
/// resized to hold at least as much data as the offsets require
///
/// Does nothing if the array is not a variable binary type
Status ResizeVaryingLengthBuffer();
/// \brief The current length (in rows) of the array
int num_rows() const { return num_rows_; }
/// \brief The current allocated length (in rows) of the array
int num_rows_allocated() const { return num_rows_allocated_; }
/// \brief A non-owning view into this array
KeyColumnArray column_array() const;
/// \brief A lightweight descriptor of the data held by this array
Result<KeyColumnMetadata> column_metadata() const {
return ColumnMetadataFromDataType(data_type_);
}
/// \brief Convert the data to an arrow20::ArrayData
///
/// This is a zero copy operation and the created ArrayData will reference the
/// buffers held by this instance.
std::shared_ptr<ArrayData> array_data() const;
// Constants used for accessing buffers using mutable_data().
static constexpr int kValidityBuffer = 0;
static constexpr int kFixedLengthBuffer = 1;
static constexpr int kVariableLengthBuffer = 2;
/// \brief A raw pointer to the requested buffer
///
/// If i is 0 (kValidityBuffer) then this returns the validity buffer
/// If i is 1 (kFixedLengthBuffer) then this returns the buffer used for values (if this
/// is a fixed length data type) or offsets (if this is a variable binary type)
/// If i is 2 (kVariableLengthBuffer) then this returns the buffer used for variable
/// length binary data
uint8_t* mutable_data(int i) { return buffers_[i]->mutable_data(); }
template <typename T>
T* mutable_data_as(int i) {
return reinterpret_cast<T*>(mutable_data(i));
}
private:
static constexpr int64_t kNumPaddingBytes = 64;
int log_num_rows_min_;
std::shared_ptr<DataType> data_type_;
// Would be valid if data_type_ != NULLPTR.
KeyColumnMetadata column_metadata_{};
MemoryPool* pool_;
int num_rows_;
int num_rows_allocated_;
int64_t var_len_buf_size_;
static constexpr int kMaxBuffers = 3;
std::shared_ptr<ResizableBuffer> buffers_[kMaxBuffers];
};
/// \brief A builder to concatenate batches of data into a larger batch
///
/// Will only store num_rows_max() rows
class ARROW_EXPORT ExecBatchBuilder {
public:
/// \brief Add rows from `source` into `target` column
///
/// If `target` is uninitialized or cleared it will be initialized to use
/// the given pool.
static Status AppendSelected(const std::shared_ptr<ArrayData>& source,
ResizableArrayData* target, int num_rows_to_append,
const uint16_t* row_ids, MemoryPool* pool);
/// \brief Add nulls into `target` column
///
/// If `target` is uninitialized or cleared it will be initialized to use
/// the given pool.
static Status AppendNulls(const std::shared_ptr<DataType>& type,
ResizableArrayData& target, int num_rows_to_append,
MemoryPool* pool);
/// \brief Add selected rows from `batch`
///
/// If `col_ids` is null then `num_cols` should less than batch.num_values() and
/// the first `num_cols` columns of batch will be appended.
///
/// All columns in `batch` must have array shape
Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append,
const uint16_t* row_ids, int num_cols,
const int* col_ids = NULLPTR);
/// \brief Add all-null rows
Status AppendNulls(MemoryPool* pool,
const std::vector<std::shared_ptr<DataType>>& types,
int num_rows_to_append);
/// \brief Create an ExecBatch with the data that has been appended so far
/// and clear this builder to be used again
///
/// Should only be called if num_rows() returns non-zero.
ExecBatch Flush();
int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
static constexpr int num_rows_max() { return 1 << kLogNumRows; }
private:
static constexpr int kLogNumRows = 15;
// Calculate how many rows to skip from the tail of the
// sequence of selected rows, such that the total size of skipped rows is at
// least equal to the size specified by the caller.
//
// Skipping of the tail rows
// is used to allow for faster processing by the caller of remaining rows
// without checking buffer bounds (useful with SIMD or fixed size memory loads
// and stores).
//
// The sequence of row_ids provided must be non-decreasing. In case of consecutive rows
// with the same row id, they are skipped all at once because they occupy the same
// space.
//
static int NumRowsToSkip(const std::shared_ptr<ArrayData>& column, int num_rows,
const uint16_t* row_ids, int num_tail_bytes_to_skip);
// The supplied lambda will be called for each row in the given list of rows.
// The arguments given to it will be:
// - index of a row (within the set of selected rows),
// - pointer to the value,
// - byte length of the value.
//
// The information about nulls (validity bitmap) is not used in this call and
// has to be processed separately.
//
template <class PROCESS_VALUE_FN>
static void Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn);
template <bool OUTPUT_BYTE_ALIGNED>
static void CollectBitsImp(const uint8_t* input_bits, int64_t input_bits_offset,
uint8_t* output_bits, int64_t output_bits_offset,
int num_rows, const uint16_t* row_ids);
static void CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
uint8_t* output_bits, int64_t output_bits_offset, int num_rows,
const uint16_t* row_ids);
std::vector<ResizableArrayData> values_;
};
} // namespace compute
} // namespace arrow20
|