diff options
author | thegeorg <thegeorg@yandex-team.ru> | 2022-02-10 16:45:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:08 +0300 |
commit | 4e839db24a3bbc9f1c610c43d6faaaa99824dcca (patch) | |
tree | 506dac10f5df94fab310584ee51b24fc5a081c22 /contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h | |
parent | 2d37894b1b037cf24231090eda8589bbb44fb6fc (diff) | |
download | ydb-4e839db24a3bbc9f1c610c43d6faaaa99824dcca.tar.gz |
Restoring authorship annotation for <thegeorg@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h')
-rw-r--r-- | contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h | 524 |
1 files changed, 262 insertions, 262 deletions
diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h b/contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h index d53435f03c..61d0884140 100644 --- a/contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h +++ b/contrib/libs/apache/arrow/cpp/src/parquet/column_scanner.h @@ -1,262 +1,262 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <stdio.h> - -#include <cstdint> -#include <memory> -#include <ostream> -#include <string> -#include <utility> -#include <vector> - -#include "parquet/column_reader.h" -#include "parquet/exception.h" -#include "parquet/platform.h" -#include "parquet/schema.h" -#include "parquet/types.h" - -namespace parquet { - -static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; - -class PARQUET_EXPORT Scanner { - public: - explicit Scanner(std::shared_ptr<ColumnReader> reader, - int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : batch_size_(batch_size), - level_offset_(0), - levels_buffered_(0), - value_buffer_(AllocateBuffer(pool)), - value_offset_(0), - values_buffered_(0), - reader_(std::move(reader)) { - def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0); - rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0); - } - - virtual ~Scanner() {} - - static std::shared_ptr<Scanner> Make( - std::shared_ptr<ColumnReader> col_reader, - int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); - - virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0; - - bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } - - const ColumnDescriptor* descr() const { return reader_->descr(); } - - int64_t batch_size() const { return batch_size_; } - - void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } - - protected: - int64_t batch_size_; - - std::vector<int16_t> def_levels_; - std::vector<int16_t> rep_levels_; - int level_offset_; - int levels_buffered_; - - std::shared_ptr<ResizableBuffer> value_buffer_; - int value_offset_; - int64_t values_buffered_; - std::shared_ptr<ColumnReader> reader_; -}; - -template <typename DType> -class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { - public: - typedef typename DType::c_type T; - - explicit TypedScanner(std::shared_ptr<ColumnReader> reader, - int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Scanner(std::move(reader), batch_size, pool) { - typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get()); - int value_byte_size = type_traits<DType::type_num>::value_byte_size; - PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); - values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); - } - - virtual ~TypedScanner() {} - - bool NextLevels(int16_t* def_level, int16_t* rep_level) { - if (level_offset_ == levels_buffered_) { - levels_buffered_ = static_cast<int>( - typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), - rep_levels_.data(), values_, &values_buffered_)); - - value_offset_ = 0; - level_offset_ = 0; - if (!levels_buffered_) { - return false; - } - } - *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; - *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; - level_offset_++; - return true; - } - - bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { - if (level_offset_ == levels_buffered_) { - if (!HasNext()) { - // Out of data pages - return false; - } - } - - NextLevels(def_level, rep_level); - *is_null = *def_level < descr()->max_definition_level(); - - if (*is_null) { - return true; - } - - if (value_offset_ == values_buffered_) { - throw ParquetException("Value was non-null, but has not been buffered"); - } - *val = values_[value_offset_++]; - return true; - } - - // Returns true if there is a next value - bool NextValue(T* val, bool* is_null) { - if (level_offset_ == levels_buffered_) { - if (!HasNext()) { - // Out of data pages - return false; - } - } - - // Out of values - int16_t def_level = -1; - int16_t rep_level = -1; - NextLevels(&def_level, &rep_level); - *is_null = def_level < descr()->max_definition_level(); - - if (*is_null) { - return true; - } - - if (value_offset_ == values_buffered_) { - throw ParquetException("Value was non-null, but has not been buffered"); - } - *val = values_[value_offset_++]; - return true; - } - - virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) { - T val{}; - int16_t def_level = -1; - int16_t rep_level = -1; - bool is_null = false; - char buffer[80]; - - if (!Next(&val, &def_level, &rep_level, &is_null)) { - throw ParquetException("No more values buffered"); - } - - if (with_levels) { - out << " D:" << def_level << " R:" << rep_level << " "; - if (!is_null) { - out << "V:"; - } - } - - if (is_null) { - std::string null_fmt = format_fwf<ByteArrayType>(width); - snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL"); - } else { - FormatValue(&val, buffer, sizeof(buffer), width); - } - out << buffer; - } - - private: - // The ownership of this object is expressed through the reader_ variable in the base - TypedColumnReader<DType>* typed_reader_; - - inline void FormatValue(void* val, char* buffer, int bufsize, int width); - - T* values_; -}; - -template <typename DType> -inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, - int width) { - std::string fmt = format_fwf<DType>(width); - snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); -} - -template <> -inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, - int width) { - std::string fmt = format_fwf<Int96Type>(width); - std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); - snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); -} - -template <> -inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, - int width) { - std::string fmt = format_fwf<ByteArrayType>(width); - std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); - snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); -} - -template <> -inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, - int width) { - std::string fmt = format_fwf<FLBAType>(width); - std::string result = FixedLenByteArrayToString( - *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); - snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); -} - -typedef TypedScanner<BooleanType> BoolScanner; -typedef TypedScanner<Int32Type> Int32Scanner; -typedef TypedScanner<Int64Type> Int64Scanner; -typedef TypedScanner<Int96Type> Int96Scanner; -typedef TypedScanner<FloatType> FloatScanner; -typedef TypedScanner<DoubleType> DoubleScanner; -typedef TypedScanner<ByteArrayType> ByteArrayScanner; -typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; - -template <typename RType> -int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, - uint8_t* values, int64_t* values_buffered, - parquet::ColumnReader* reader) { - typedef typename RType::T Type; - auto typed_reader = static_cast<RType*>(reader); - auto vals = reinterpret_cast<Type*>(&values[0]); - return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, - values_buffered); -} - -int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, - int16_t* rep_levels, uint8_t* values, - int64_t* values_buffered, - parquet::ColumnReader* reader); - -} // namespace parquet +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdio.h> + +#include <cstdint> +#include <memory> +#include <ostream> +#include <string> +#include <utility> +#include <vector> + +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/platform.h" +#include "parquet/schema.h" +#include "parquet/types.h" + +namespace parquet { + +static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; + +class PARQUET_EXPORT Scanner { + public: + explicit Scanner(std::shared_ptr<ColumnReader> reader, + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : batch_size_(batch_size), + level_offset_(0), + levels_buffered_(0), + value_buffer_(AllocateBuffer(pool)), + value_offset_(0), + values_buffered_(0), + reader_(std::move(reader)) { + def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0); + rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0); + } + + virtual ~Scanner() {} + + static std::shared_ptr<Scanner> Make( + std::shared_ptr<ColumnReader> col_reader, + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + + virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0; + + bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } + + const ColumnDescriptor* descr() const { return reader_->descr(); } + + int64_t batch_size() const { return batch_size_; } + + void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } + + protected: + int64_t batch_size_; + + std::vector<int16_t> def_levels_; + std::vector<int16_t> rep_levels_; + int level_offset_; + int levels_buffered_; + + std::shared_ptr<ResizableBuffer> value_buffer_; + int value_offset_; + int64_t values_buffered_; + std::shared_ptr<ColumnReader> reader_; +}; + +template <typename DType> +class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { + public: + typedef typename DType::c_type T; + + explicit TypedScanner(std::shared_ptr<ColumnReader> reader, + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : Scanner(std::move(reader), batch_size, pool) { + typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get()); + int value_byte_size = type_traits<DType::type_num>::value_byte_size; + PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); + values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); + } + + virtual ~TypedScanner() {} + + bool NextLevels(int16_t* def_level, int16_t* rep_level) { + if (level_offset_ == levels_buffered_) { + levels_buffered_ = static_cast<int>( + typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), + rep_levels_.data(), values_, &values_buffered_)); + + value_offset_ = 0; + level_offset_ = 0; + if (!levels_buffered_) { + return false; + } + } + *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; + *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; + level_offset_++; + return true; + } + + bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { + if (level_offset_ == levels_buffered_) { + if (!HasNext()) { + // Out of data pages + return false; + } + } + + NextLevels(def_level, rep_level); + *is_null = *def_level < descr()->max_definition_level(); + + if (*is_null) { + return true; + } + + if (value_offset_ == values_buffered_) { + throw ParquetException("Value was non-null, but has not been buffered"); + } + *val = values_[value_offset_++]; + return true; + } + + // Returns true if there is a next value + bool NextValue(T* val, bool* is_null) { + if (level_offset_ == levels_buffered_) { + if (!HasNext()) { + // Out of data pages + return false; + } + } + + // Out of values + int16_t def_level = -1; + int16_t rep_level = -1; + NextLevels(&def_level, &rep_level); + *is_null = def_level < descr()->max_definition_level(); + + if (*is_null) { + return true; + } + + if (value_offset_ == values_buffered_) { + throw ParquetException("Value was non-null, but has not been buffered"); + } + *val = values_[value_offset_++]; + return true; + } + + virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) { + T val{}; + int16_t def_level = -1; + int16_t rep_level = -1; + bool is_null = false; + char buffer[80]; + + if (!Next(&val, &def_level, &rep_level, &is_null)) { + throw ParquetException("No more values buffered"); + } + + if (with_levels) { + out << " D:" << def_level << " R:" << rep_level << " "; + if (!is_null) { + out << "V:"; + } + } + + if (is_null) { + std::string null_fmt = format_fwf<ByteArrayType>(width); + snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL"); + } else { + FormatValue(&val, buffer, sizeof(buffer), width); + } + out << buffer; + } + + private: + // The ownership of this object is expressed through the reader_ variable in the base + TypedColumnReader<DType>* typed_reader_; + + inline void FormatValue(void* val, char* buffer, int bufsize, int width); + + T* values_; +}; + +template <typename DType> +inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, + int width) { + std::string fmt = format_fwf<DType>(width); + snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); +} + +template <> +inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, + int width) { + std::string fmt = format_fwf<Int96Type>(width); + std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); + snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); +} + +template <> +inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, + int width) { + std::string fmt = format_fwf<ByteArrayType>(width); + std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); + snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); +} + +template <> +inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, + int width) { + std::string fmt = format_fwf<FLBAType>(width); + std::string result = FixedLenByteArrayToString( + *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); + snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); +} + +typedef TypedScanner<BooleanType> BoolScanner; +typedef TypedScanner<Int32Type> Int32Scanner; +typedef TypedScanner<Int64Type> Int64Scanner; +typedef TypedScanner<Int96Type> Int96Scanner; +typedef TypedScanner<FloatType> FloatScanner; +typedef TypedScanner<DoubleType> DoubleScanner; +typedef TypedScanner<ByteArrayType> ByteArrayScanner; +typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; + +template <typename RType> +int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, + uint8_t* values, int64_t* values_buffered, + parquet::ColumnReader* reader) { + typedef typename RType::T Type; + auto typed_reader = static_cast<RType*>(reader); + auto vals = reinterpret_cast<Type*>(&values[0]); + return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, + values_buffered); +} + +int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, + int16_t* rep_levels, uint8_t* values, + int64_t* values_buffered, + parquet::ColumnReader* reader); + +} // namespace parquet |