diff options
author | iaz1607 <iaz1607@yandex-team.ru> | 2022-02-10 16:45:37 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:37 +0300 |
commit | e5437feb4ac2d2dc044e1090b9312dde5ef197e0 (patch) | |
tree | f5a238c69dd20a1fa2092127a31b8aff25020f7d /contrib/libs/apache/orc/c++/src/ColumnWriter.cc | |
parent | f4945d0a44b8770f0801de3056aa41639b0b7bd2 (diff) | |
download | ydb-e5437feb4ac2d2dc044e1090b9312dde5ef197e0.tar.gz |
Restoring authorship annotation for <iaz1607@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/apache/orc/c++/src/ColumnWriter.cc')
-rw-r--r-- | contrib/libs/apache/orc/c++/src/ColumnWriter.cc | 6024 |
1 files changed, 3012 insertions, 3012 deletions
diff --git a/contrib/libs/apache/orc/c++/src/ColumnWriter.cc b/contrib/libs/apache/orc/c++/src/ColumnWriter.cc index 1408a15457..8d4d00cc61 100644 --- a/contrib/libs/apache/orc/c++/src/ColumnWriter.cc +++ b/contrib/libs/apache/orc/c++/src/ColumnWriter.cc @@ -1,3013 +1,3013 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "orc/Int128.hh" -#include "orc/Writer.hh" - -#include "ByteRLE.hh" -#include "ColumnWriter.hh" -#include "RLE.hh" -#include "Statistics.hh" -#include "Timezone.hh" - -namespace orc { - StreamsFactory::~StreamsFactory() { - //PASS - } - - class StreamsFactoryImpl : public StreamsFactory { - public: - StreamsFactoryImpl( - const WriterOptions& writerOptions, - OutputStream* outputStream) : - options(writerOptions), - outStream(outputStream) { - } - - virtual std::unique_ptr<BufferedOutputStream> - createStream(proto::Stream_Kind kind) const override; - private: - const WriterOptions& options; - OutputStream* outStream; - }; - - std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream( - proto::Stream_Kind) const { - // In the future, we can decide compression strategy and modifier - // based on stream kind. But for now we just use the setting from - // WriterOption - return createCompressor( - options.getCompression(), - outStream, - options.getCompressionStrategy(), - // BufferedOutputStream initial capacity - 1 * 1024 * 1024, - options.getCompressionBlockSize(), - *options.getMemoryPool()); - } - - std::unique_ptr<StreamsFactory> createStreamsFactory( - const WriterOptions& options, - OutputStream* outStream) { - return std::unique_ptr<StreamsFactory>( - new StreamsFactoryImpl(options, outStream)); - } - - RowIndexPositionRecorder::~RowIndexPositionRecorder() { - // PASS - } - - proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion) - { - switch (rleVersion) - { - case RleVersion_1: - return proto::ColumnEncoding_Kind_DIRECT; - case RleVersion_2: - return proto::ColumnEncoding_Kind_DIRECT_V2; - default: - throw InvalidArgument("Invalid param"); - } - } - - ColumnWriter::ColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - columnId(type.getColumnId()), - colIndexStatistics(), - colStripeStatistics(), - colFileStatistics(), - enableIndex(options.getEnableIndex()), - rowIndex(), - rowIndexEntry(), - rowIndexPosition(), - enableBloomFilter(false), - memPool(*options.getMemoryPool()), - indexStream(), - bloomFilterStream() { - - std::unique_ptr<BufferedOutputStream> presentStream = - factory.createStream(proto::Stream_Kind_PRESENT); - notNullEncoder = createBooleanRleEncoder(std::move(presentStream)); - - colIndexStatistics = createColumnStatistics(type); - colStripeStatistics = createColumnStatistics(type); - colFileStatistics = createColumnStatistics(type); - - if (enableIndex) { - rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex()); - rowIndexEntry = - std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry()); - rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>( - new RowIndexPositionRecorder(*rowIndexEntry)); - indexStream = - factory.createStream(proto::Stream_Kind_ROW_INDEX); - - // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported - if (options.isColumnUseBloomFilter(columnId) - && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) { - enableBloomFilter = true; - bloomFilter.reset(new BloomFilterImpl( - options.getRowIndexStride(), options.getBloomFilterFPP())); - bloomFilterIndex.reset(new proto::BloomFilterIndex()); - bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8); - } - } - } - - ColumnWriter::~ColumnWriter() { - // PASS - } - - void ColumnWriter::add(ColumnVectorBatch& batch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask); - } - - void ColumnWriter::flush(std::vector<proto::Stream>& streams) { - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_PRESENT); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(notNullEncoder->flush()); - streams.push_back(stream); - } - - uint64_t ColumnWriter::getEstimatedSize() const { - return notNullEncoder->getBufferSize(); - } - - void ColumnWriter::getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - getProtoBufStatistics(stats, colStripeStatistics.get()); - } - - void ColumnWriter::mergeStripeStatsIntoFileStats() { - colFileStatistics->merge(*colStripeStatistics); - colStripeStatistics->reset(); - } - - void ColumnWriter::mergeRowGroupStatsIntoStripeStats() { - colStripeStatistics->merge(*colIndexStatistics); - colIndexStatistics->reset(); - } - - void ColumnWriter::getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - getProtoBufStatistics(stats, colFileStatistics.get()); - } - - void ColumnWriter::createRowIndexEntry() { - proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics(); - colIndexStatistics->toProtoBuf(*indexStats); - - *rowIndex->add_entry() = *rowIndexEntry; - - rowIndexEntry->clear_positions(); - rowIndexEntry->clear_statistics(); - - colStripeStatistics->merge(*colIndexStatistics); - colIndexStatistics->reset(); - - addBloomFilterEntry(); - - recordPosition(); - } - - void ColumnWriter::addBloomFilterEntry() { - if (enableBloomFilter) { - BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter()); - bloomFilter->reset(); - } - } - - void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { - // write row index to output stream - rowIndex->SerializeToZeroCopyStream(indexStream.get()); - - // construct row index stream - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_ROW_INDEX); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(indexStream->flush()); - streams.push_back(stream); - - // write BLOOM_FILTER_UTF8 stream - if (enableBloomFilter) { - if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) { - throw std::logic_error("Failed to write bloom filter stream."); - } - stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(bloomFilterStream->flush()); - streams.push_back(stream); - } - } - - void ColumnWriter::recordPosition() const { - notNullEncoder->recordPosition(rowIndexPosition.get()); - } - - void ColumnWriter::reset() { - if (enableIndex) { - // clear row index - rowIndex->clear_entry(); - rowIndexEntry->clear_positions(); - rowIndexEntry->clear_statistics(); - - // write current positions - recordPosition(); - } - - if (enableBloomFilter) { - bloomFilter->reset(); - bloomFilterIndex->clear_bloomfilter(); - } - } - - void ColumnWriter::writeDictionary() { - // PASS - } - - class StructColumnWriter : public ColumnWriter { - public: - StructColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - ~StructColumnWriter() override; - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void mergeStripeStatsIntoFileStats() override; - - virtual void mergeRowGroupStatsIntoStripeStats() override; - - virtual void createRowIndexEntry() override; - - virtual void writeIndex( - std::vector<proto::Stream> &streams) const override; - - virtual void writeDictionary() override; - - virtual void reset() override; - - private: - std::vector<ColumnWriter *> children; - }; - - StructColumnWriter::StructColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options) { - for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) { - const Type& child = *type.getSubtype(i); - children.push_back(buildWriter(child, factory, options).release()); - } - - if (enableIndex) { - recordPosition(); - } - } - - StructColumnWriter::~StructColumnWriter() { - for (uint32_t i = 0; i < children.size(); ++i) { - delete children[i]; - } - } - - void StructColumnWriter::add( - ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const StructVectorBatch* structBatch = - dynamic_cast<const StructVectorBatch *>(&rowBatch); - if (structBatch == nullptr) { - throw InvalidArgument("Failed to cast to StructVectorBatch"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - const char* notNull = structBatch->hasNulls ? - structBatch->notNull.data() + offset : nullptr; - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->add(*structBatch->fields[i], offset, numValues, notNull); - } - - // update stats - if (!notNull) { - colIndexStatistics->increase(numValues); - } else { - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull[i]) { - ++count; - } - } - colIndexStatistics->increase(count); - if (count < numValues) { - colIndexStatistics->setHasNull(true); - } - } - } - - void StructColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->flush(streams); - } - } - - void StructColumnWriter::writeIndex( - std::vector<proto::Stream> &streams) const { - ColumnWriter::writeIndex(streams); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->writeIndex(streams); - } - } - - uint64_t StructColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - for (uint32_t i = 0; i < children.size(); ++i) { - size += children[i]->getEstimatedSize(); - } - return size; - } - - void StructColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); - encoding.set_dictionarysize(0); - encodings.push_back(encoding); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->getColumnEncoding(encodings); - } - } - - void StructColumnWriter::getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getStripeStatistics(stats); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->getStripeStatistics(stats); - } - } - - void StructColumnWriter::mergeStripeStatsIntoFileStats() { - ColumnWriter::mergeStripeStatsIntoFileStats(); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->mergeStripeStatsIntoFileStats(); - } - } - - void StructColumnWriter::getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getFileStatistics(stats); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->getFileStatistics(stats); - } - } - - void StructColumnWriter::mergeRowGroupStatsIntoStripeStats() { - ColumnWriter::mergeRowGroupStatsIntoStripeStats(); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->mergeRowGroupStatsIntoStripeStats(); - } - } - - void StructColumnWriter::createRowIndexEntry() { - ColumnWriter::createRowIndexEntry(); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->createRowIndexEntry(); - } - } - - void StructColumnWriter::reset() { - ColumnWriter::reset(); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->reset(); - } - } - - void StructColumnWriter::writeDictionary() { - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->writeDictionary(); - } - } - - class IntegerColumnWriter : public ColumnWriter { - public: - IntegerColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - protected: - std::unique_ptr<RleEncoder> rleEncoder; - - private: - RleVersion rleVersion; - }; - - IntegerColumnWriter::IntegerColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(options.getRleVersion()) { - std::unique_ptr<BufferedOutputStream> dataStream = - factory.createStream(proto::Stream_Kind_DATA); - rleEncoder = createRleEncoder( - std::move(dataStream), - true, - rleVersion, - memPool, - options.getAlignedBitpacking()); - - if (enableIndex) { - recordPosition(); - } - } - - void IntegerColumnWriter::add( - ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const LongVectorBatch* longBatch = - dynamic_cast<const LongVectorBatch*>(&rowBatch); - if (longBatch == nullptr) { - throw InvalidArgument("Failed to cast to LongVectorBatch"); - } - IntegerColumnStatisticsImpl* intStats = - dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); - if (intStats == nullptr) { - throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const int64_t* data = longBatch->data.data() + offset; - const char* notNull = longBatch->hasNulls ? - longBatch->notNull.data() + offset : nullptr; - - rleEncoder->add(data, numValues, notNull); - - // update stats - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull == nullptr || notNull[i]) { - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(data[i]); - } - intStats->update(data[i], 1); - } - } - intStats->increase(count); - if (count < numValues) { - intStats->setHasNull(true); - } - } - - void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_DATA); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(rleEncoder->flush()); - streams.push_back(stream); - } - - uint64_t IntegerColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += rleEncoder->getBufferSize(); - return size; - } - - void IntegerColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(RleVersionMapper(rleVersion)); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void IntegerColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - rleEncoder->recordPosition(rowIndexPosition.get()); - } - - class ByteColumnWriter : public ColumnWriter { - public: - ByteColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - private: - std::unique_ptr<ByteRleEncoder> byteRleEncoder; - }; - - ByteColumnWriter::ByteColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options) { - std::unique_ptr<BufferedOutputStream> dataStream = - factory.createStream(proto::Stream_Kind_DATA); - byteRleEncoder = createByteRleEncoder(std::move(dataStream)); - - if (enableIndex) { - recordPosition(); - } - } - - void ByteColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); - if (byteBatch == nullptr) { - throw InvalidArgument("Failed to cast to LongVectorBatch"); - } - IntegerColumnStatisticsImpl* intStats = - dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); - if (intStats == nullptr) { - throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - int64_t* data = byteBatch->data.data() + offset; - const char* notNull = byteBatch->hasNulls ? - byteBatch->notNull.data() + offset : nullptr; - - char* byteData = reinterpret_cast<char*>(data); - for (uint64_t i = 0; i < numValues; ++i) { - byteData[i] = static_cast<char>(data[i]); - } - byteRleEncoder->add(byteData, numValues, notNull); - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull == nullptr || notNull[i]) { - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(data[i]); - } - intStats->update(static_cast<int64_t>(byteData[i]), 1); - } - } - intStats->increase(count); - if (count < numValues) { - intStats->setHasNull(true); - } - } - - void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_DATA); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(byteRleEncoder->flush()); - streams.push_back(stream); - } - - uint64_t ByteColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += byteRleEncoder->getBufferSize(); - return size; - } - - void ByteColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void ByteColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - byteRleEncoder->recordPosition(rowIndexPosition.get()); - } - - class BooleanColumnWriter : public ColumnWriter { - public: - BooleanColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - private: - std::unique_ptr<ByteRleEncoder> rleEncoder; - }; - - BooleanColumnWriter::BooleanColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options) { - std::unique_ptr<BufferedOutputStream> dataStream = - factory.createStream(proto::Stream_Kind_DATA); - rleEncoder = createBooleanRleEncoder(std::move(dataStream)); - - if (enableIndex) { - recordPosition(); - } - } - - void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); - if (byteBatch == nullptr) { - throw InvalidArgument("Failed to cast to LongVectorBatch"); - } - BooleanColumnStatisticsImpl* boolStats = - dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); - if (boolStats == nullptr) { - throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - int64_t* data = byteBatch->data.data() + offset; - const char* notNull = byteBatch->hasNulls ? - byteBatch->notNull.data() + offset : nullptr; - - char* byteData = reinterpret_cast<char*>(data); - for (uint64_t i = 0; i < numValues; ++i) { - byteData[i] = static_cast<char>(data[i]); - } - rleEncoder->add(byteData, numValues, notNull); - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull == nullptr || notNull[i]) { - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(data[i]); - } - boolStats->update(byteData[i] != 0, 1); - } - } - boolStats->increase(count); - if (count < numValues) { - boolStats->setHasNull(true); - } - } - - void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_DATA); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(rleEncoder->flush()); - streams.push_back(stream); - } - - uint64_t BooleanColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += rleEncoder->getBufferSize(); - return size; - } - - void BooleanColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void BooleanColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - rleEncoder->recordPosition(rowIndexPosition.get()); - } - - class DoubleColumnWriter : public ColumnWriter { - public: - DoubleColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options, - bool isFloat); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - private: - bool isFloat; - std::unique_ptr<AppendOnlyBufferedStream> dataStream; - DataBuffer<char> buffer; - }; - - DoubleColumnWriter::DoubleColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options, - bool isFloatType) : - ColumnWriter(type, factory, options), - isFloat(isFloatType), - buffer(*options.getMemoryPool()) { - dataStream.reset(new AppendOnlyBufferedStream( - factory.createStream(proto::Stream_Kind_DATA))); - buffer.resize(isFloat ? 4 : 8); - - if (enableIndex) { - recordPosition(); - } - } - - // Floating point types are stored using IEEE 754 floating point bit layout. - // Float columns use 4 bytes per value and double columns use 8 bytes. - template <typename FLOAT_TYPE, typename INTEGER_TYPE> - inline void encodeFloatNum(FLOAT_TYPE input, char* output) { - INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input); - for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) { - output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff); - } - } - - void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const DoubleVectorBatch* dblBatch = - dynamic_cast<const DoubleVectorBatch*>(&rowBatch); - if (dblBatch == nullptr) { - throw InvalidArgument("Failed to cast to DoubleVectorBatch"); - } - DoubleColumnStatisticsImpl* doubleStats = - dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); - if (doubleStats == nullptr) { - throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const double* doubleData = dblBatch->data.data() + offset; - const char* notNull = dblBatch->hasNulls ? - dblBatch->notNull.data() + offset : nullptr; - - size_t bytes = isFloat ? 4 : 8; - char* data = buffer.data(); - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - if (isFloat) { - encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data); - } else { - encodeFloatNum<double, int64_t>(doubleData[i], data); - } - dataStream->write(data, bytes); - ++count; - if (enableBloomFilter) { - bloomFilter->addDouble(doubleData[i]); - } - doubleStats->update(doubleData[i]); - } - } - doubleStats->increase(count); - if (count < numValues) { - doubleStats->setHasNull(true); - } - } - - void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_DATA); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(dataStream->flush()); - streams.push_back(stream); - } - - uint64_t DoubleColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += dataStream->getSize(); - return size; - } - - void DoubleColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void DoubleColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - dataStream->recordPosition(rowIndexPosition.get()); - } - - /** - * Implementation of increasing sorted string dictionary - */ - class SortedStringDictionary { - public: - struct DictEntry { - DictEntry(const char * str, size_t len):data(str),length(len) {} - const char * data; - size_t length; - }; - - SortedStringDictionary():totalLength(0) {} - - // insert a new string into dictionary, return its insertion order - size_t insert(const char * data, size_t len); - - // write dictionary data & length to output buffer - void flush(AppendOnlyBufferedStream * dataStream, - RleEncoder * lengthEncoder) const; - - // reorder input index buffer from insertion order to dictionary order - void reorder(std::vector<int64_t>& idxBuffer) const; - - // get dict entries in insertion order - void getEntriesInInsertionOrder(std::vector<const DictEntry *>&) const; - - // return count of entries - size_t size() const; - - // return total length of strings in the dictioanry - uint64_t length() const; - - void clear(); - - private: - struct LessThan { - bool operator()(const DictEntry& left, const DictEntry& right) const { - int ret = memcmp(left.data, right.data, std::min(left.length, right.length)); - if (ret != 0) { - return ret < 0; - } - return left.length < right.length; - } - }; - - std::map<DictEntry, size_t, LessThan> dict; - std::vector<std::vector<char>> data; - uint64_t totalLength; - - // use friend class here to avoid being bothered by const function calls - friend class StringColumnWriter; - friend class CharColumnWriter; - friend class VarCharColumnWriter; - // store indexes of insertion order in the dictionary for not-null rows - std::vector<int64_t> idxInDictBuffer; - }; - - // insert a new string into dictionary, return its insertion order - size_t SortedStringDictionary::insert(const char * str, size_t len) { - auto ret = dict.insert({DictEntry(str, len), dict.size()}); - if (ret.second) { - // make a copy to internal storage - data.push_back(std::vector<char>(len)); - memcpy(data.back().data(), str, len); - // update dictionary entry to link pointer to internal storage - DictEntry * entry = const_cast<DictEntry *>(&(ret.first->first)); - entry->data = data.back().data(); - totalLength += len; - } - return ret.first->second; - } - - // write dictionary data & length to output buffer - void SortedStringDictionary::flush(AppendOnlyBufferedStream * dataStream, - RleEncoder * lengthEncoder) const { - for (auto it = dict.cbegin(); it != dict.cend(); ++it) { - dataStream->write(it->first.data, it->first.length); - lengthEncoder->write(static_cast<int64_t>(it->first.length)); - } - } - - /** - * Reorder input index buffer from insertion order to dictionary order - * - * We require this function because string values are buffered by indexes - * in their insertion order. Until the entire dictionary is complete can - * we get their sorted indexes in the dictionary in that ORC specification - * demands dictionary should be ordered. Therefore this function transforms - * the indexes from insertion order to dictionary value order for final - * output. - */ - void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const { - // iterate the dictionary to get mapping from insertion order to value order - std::vector<size_t> mapping(dict.size()); - size_t dictIdx = 0; - for (auto it = dict.cbegin(); it != dict.cend(); ++it) { - mapping[it->second] = dictIdx++; - } - - // do the transformation - for (size_t i = 0; i != idxBuffer.size(); ++i) { - idxBuffer[i] = static_cast<int64_t>( - mapping[static_cast<size_t>(idxBuffer[i])]); - } - } - - // get dict entries in insertion order - void SortedStringDictionary::getEntriesInInsertionOrder( - std::vector<const DictEntry *>& entries) const { - entries.resize(dict.size()); - for (auto it = dict.cbegin(); it != dict.cend(); ++it) { - entries[it->second] = &(it->first); - } - } - - // return count of entries - size_t SortedStringDictionary::size() const { - return dict.size(); - } - - // return total length of strings in the dictioanry - uint64_t SortedStringDictionary::length() const { - return totalLength; - } - - void SortedStringDictionary::clear() { - totalLength = 0; - data.clear(); - dict.clear(); - } - - class StringColumnWriter : public ColumnWriter { - public: - StringColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - virtual void createRowIndexEntry() override; - - virtual void writeDictionary() override; - - virtual void reset() override; - - private: - /** - * dictionary related functions - */ - bool checkDictionaryKeyRatio(); - void createDirectStreams(); - void createDictStreams(); - void deleteDictStreams(); - void fallbackToDirectEncoding(); - - protected: - RleVersion rleVersion; - bool useCompression; - const StreamsFactory& streamsFactory; - bool alignedBitPacking; - - // direct encoding streams - std::unique_ptr<RleEncoder> directLengthEncoder; - std::unique_ptr<AppendOnlyBufferedStream> directDataStream; - - // dictionary encoding streams - std::unique_ptr<RleEncoder> dictDataEncoder; - std::unique_ptr<RleEncoder> dictLengthEncoder; - std::unique_ptr<AppendOnlyBufferedStream> dictStream; - - /** - * dictionary related variables - */ - SortedStringDictionary dictionary; - // whether or not dictionary checking is done - bool doneDictionaryCheck; - // whether or not it should be used - bool useDictionary; - // keys in the dictionary should not exceed this ratio - double dictSizeThreshold; - - // record start row of each row group; null rows are skipped - mutable std::vector<size_t> startOfRowGroups; - }; - - StringColumnWriter::StringColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(options.getRleVersion()), - useCompression(options.getCompression() != CompressionKind_NONE), - streamsFactory(factory), - alignedBitPacking(options.getAlignedBitpacking()), - doneDictionaryCheck(false), - useDictionary(options.getEnableDictionary()), - dictSizeThreshold(options.getDictionaryKeySizeThreshold()){ - if (type.getKind() == TypeKind::BINARY) { - useDictionary = false; - doneDictionaryCheck = true; - } - - if (useDictionary) { - createDictStreams(); - } else { - doneDictionaryCheck = true; - createDirectStreams(); - } - - if (enableIndex) { - recordPosition(); - } - } - - void StringColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const StringVectorBatch* stringBatch = - dynamic_cast<const StringVectorBatch*>(&rowBatch); - if (stringBatch == nullptr) { - throw InvalidArgument("Failed to cast to StringVectorBatch"); - } - - StringColumnStatisticsImpl* strStats = - dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); - if (strStats == nullptr) { - throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - char *const * data = stringBatch->data.data() + offset; - const int64_t* length = stringBatch->length.data() + offset; - const char* notNull = stringBatch->hasNulls ? - stringBatch->notNull.data() + offset : nullptr; - - if (!useDictionary){ - directLengthEncoder->add(length, numValues, notNull); - } - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - const size_t len = static_cast<size_t>(length[i]); - if (useDictionary) { - size_t index = dictionary.insert(data[i], len); - dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); - } else { - directDataStream->write(data[i], len); - } - if (enableBloomFilter) { - bloomFilter->addBytes(data[i], static_cast<int64_t>(len)); - } - strStats->update(data[i], len); - ++count; - } - } - strStats->increase(count); - if (count < numValues) { - strStats->setHasNull(true); - } - } - - void StringColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - if (useDictionary) { - proto::Stream data; - data.set_kind(proto::Stream_Kind_DATA); - data.set_column(static_cast<uint32_t>(columnId)); - data.set_length(dictDataEncoder->flush()); - streams.push_back(data); - - proto::Stream dict; - dict.set_kind(proto::Stream_Kind_DICTIONARY_DATA); - dict.set_column(static_cast<uint32_t>(columnId)); - dict.set_length(dictStream->flush()); - streams.push_back(dict); - - proto::Stream length; - length.set_kind(proto::Stream_Kind_LENGTH); - length.set_column(static_cast<uint32_t>(columnId)); - length.set_length(dictLengthEncoder->flush()); - streams.push_back(length); - } else { - proto::Stream length; - length.set_kind(proto::Stream_Kind_LENGTH); - length.set_column(static_cast<uint32_t>(columnId)); - length.set_length(directLengthEncoder->flush()); - streams.push_back(length); - - proto::Stream data; - data.set_kind(proto::Stream_Kind_DATA); - data.set_column(static_cast<uint32_t>(columnId)); - data.set_length(directDataStream->flush()); - streams.push_back(data); - } - } - - uint64_t StringColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - if (!useDictionary) { - size += directLengthEncoder->getBufferSize(); - size += directDataStream->getSize(); - } else { - size += dictionary.length(); - size += dictionary.size() * sizeof(int32_t); - size += dictionary.idxInDictBuffer.size() * sizeof(int32_t); - if (useCompression) { - size /= 3; // estimated ratio is 3:1 - } - } - return size; - } - - void StringColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - if (!useDictionary) { - encoding.set_kind(rleVersion == RleVersion_1 ? - proto::ColumnEncoding_Kind_DIRECT : - proto::ColumnEncoding_Kind_DIRECT_V2); - } else { - encoding.set_kind(rleVersion == RleVersion_1 ? - proto::ColumnEncoding_Kind_DICTIONARY : - proto::ColumnEncoding_Kind_DICTIONARY_V2); - } - encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size())); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void StringColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - if (!useDictionary) { - directDataStream->recordPosition(rowIndexPosition.get()); - directLengthEncoder->recordPosition(rowIndexPosition.get()); - } else { - if (enableIndex) { - startOfRowGroups.push_back(dictionary.idxInDictBuffer.size()); - } - } - } - - bool StringColumnWriter::checkDictionaryKeyRatio() { - if (!doneDictionaryCheck) { - useDictionary = dictionary.size() <= static_cast<size_t>( - static_cast<double>(dictionary.idxInDictBuffer.size()) * dictSizeThreshold); - doneDictionaryCheck = true; - } - - return useDictionary; - } - - void StringColumnWriter::createRowIndexEntry() { - if (useDictionary && !doneDictionaryCheck) { - if (!checkDictionaryKeyRatio()) { - fallbackToDirectEncoding(); - } - } - ColumnWriter::createRowIndexEntry(); - } - - void StringColumnWriter::reset() { - ColumnWriter::reset(); - - dictionary.clear(); - dictionary.idxInDictBuffer.resize(0); - startOfRowGroups.clear(); - startOfRowGroups.push_back(0); - } - - void StringColumnWriter::createDirectStreams() { - std::unique_ptr<BufferedOutputStream> directLengthStream = - streamsFactory.createStream(proto::Stream_Kind_LENGTH); - directLengthEncoder = createRleEncoder(std::move(directLengthStream), - false, - rleVersion, - memPool, - alignedBitPacking); - directDataStream.reset(new AppendOnlyBufferedStream( - streamsFactory.createStream(proto::Stream_Kind_DATA))); - } - - void StringColumnWriter::createDictStreams() { - std::unique_ptr<BufferedOutputStream> dictDataStream = - streamsFactory.createStream(proto::Stream_Kind_DATA); - dictDataEncoder = createRleEncoder(std::move(dictDataStream), - false, - rleVersion, - memPool, - alignedBitPacking); - std::unique_ptr<BufferedOutputStream> dictLengthStream = - streamsFactory.createStream(proto::Stream_Kind_LENGTH); - dictLengthEncoder = createRleEncoder(std::move(dictLengthStream), - false, - rleVersion, - memPool, - alignedBitPacking); - dictStream.reset(new AppendOnlyBufferedStream( - streamsFactory.createStream(proto::Stream_Kind_DICTIONARY_DATA))); - } - - void StringColumnWriter::deleteDictStreams() { - dictDataEncoder.reset(nullptr); - dictLengthEncoder.reset(nullptr); - dictStream.reset(nullptr); - - dictionary.clear(); - dictionary.idxInDictBuffer.clear(); - startOfRowGroups.clear(); - } - - void StringColumnWriter::writeDictionary() { - if (useDictionary && !doneDictionaryCheck) { - // when index is disabled, dictionary check happens while writing 1st stripe - if (!checkDictionaryKeyRatio()) { - fallbackToDirectEncoding(); - return; - } - } - - if (useDictionary) { - // flush dictionary data & length streams - dictionary.flush(dictStream.get(), dictLengthEncoder.get()); - - // convert index from insertion order to dictionary order - dictionary.reorder(dictionary.idxInDictBuffer); - - // write data sequences - int64_t * data = dictionary.idxInDictBuffer.data(); - if (enableIndex) { - size_t prevOffset = 0; - for (size_t i = 0; i < startOfRowGroups.size(); ++i) { - // write sequences in batch for a row group stride - size_t offset = startOfRowGroups[i]; - dictDataEncoder->add(data + prevOffset, offset - prevOffset, nullptr); - - // update index positions - int rowGroupId = static_cast<int>(i); - proto::RowIndexEntry* indexEntry = - (rowGroupId < rowIndex->entry_size()) ? - rowIndex->mutable_entry(rowGroupId) : rowIndexEntry.get(); - - // add positions for direct streams - RowIndexPositionRecorder recorder(*indexEntry); - dictDataEncoder->recordPosition(&recorder); - - prevOffset = offset; - } - - dictDataEncoder->add(data + prevOffset, - dictionary.idxInDictBuffer.size() - prevOffset, - nullptr); - } else { - dictDataEncoder->add(data, dictionary.idxInDictBuffer.size(), nullptr); - } - } - } - - void StringColumnWriter::fallbackToDirectEncoding() { - createDirectStreams(); - - if (enableIndex) { - // fallback happens at the 1st row group; - // simply complete positions for direct streams - proto::RowIndexEntry * indexEntry = rowIndexEntry.get(); - RowIndexPositionRecorder recorder(*indexEntry); - directDataStream->recordPosition(&recorder); - directLengthEncoder->recordPosition(&recorder); - } - - // get dictionary entries in insertion order - std::vector<const SortedStringDictionary::DictEntry *> entries; - dictionary.getEntriesInInsertionOrder(entries); - - // store each length of the data into a vector - const SortedStringDictionary::DictEntry * dictEntry = nullptr; - for (uint64_t i = 0; i != dictionary.idxInDictBuffer.size(); ++i) { - // write one row data in direct encoding - dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer[i])]; - directDataStream->write(dictEntry->data, dictEntry->length); - directLengthEncoder->write(static_cast<int64_t>(dictEntry->length)); - } - - deleteDictStreams(); - } - - struct Utf8Utils { - /** - * Counts how many utf-8 chars of the input data - */ - static uint64_t charLength(const char * data, uint64_t length) { - uint64_t chars = 0; - for (uint64_t i = 0; i < length; i++) { - if (isUtfStartByte(data[i])) { - chars++; - } - } - return chars; - } - - /** - * Return the number of bytes required to read at most maxCharLength - * characters in full from a utf-8 encoded byte array provided - * by data. This does not validate utf-8 data, but - * operates correctly on already valid utf-8 data. - * - * @param maxCharLength number of characters required - * @param data the bytes of UTF-8 - * @param length the length of data to truncate - */ - static uint64_t truncateBytesTo(uint64_t maxCharLength, - const char * data, - uint64_t length) { - uint64_t chars = 0; - if (length <= maxCharLength) { - return length; - } - for (uint64_t i = 0; i < length; i++) { - if (isUtfStartByte(data[i])) { - chars++; - } - if (chars > maxCharLength) { - return i; - } - } - // everything fits - return length; - } - - /** - * Checks if b is the first byte of a UTF-8 character. - */ - inline static bool isUtfStartByte(char b) { - return (b & 0xC0) != 0x80; - } - - /** - * Find the start of the last character that ends in the current string. - * @param text the bytes of the utf-8 - * @param from the first byte location - * @param until the last byte location - * @return the index of the last character - */ - static uint64_t findLastCharacter(const char * text, uint64_t from, uint64_t until) { - uint64_t posn = until; - /* we don't expect characters more than 5 bytes */ - while (posn >= from) { - if (isUtfStartByte(text[posn])) { - return posn; - } - posn -= 1; - } - /* beginning of a valid char not found */ - throw std::logic_error( - "Could not truncate string, beginning of a valid char not found"); - } - }; - - class CharColumnWriter : public StringColumnWriter { - public: - CharColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - StringColumnWriter(type, factory, options), - maxLength(type.getMaximumLength()), - padBuffer(*options.getMemoryPool()) { - // utf-8 is currently 4 bytes long, but it could be up to 6 - padBuffer.resize(maxLength * 6); - } - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - private: - uint64_t maxLength; - DataBuffer<char> padBuffer; - }; - - void CharColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); - if (charsBatch == nullptr) { - throw InvalidArgument("Failed to cast to StringVectorBatch"); - } - - StringColumnStatisticsImpl* strStats = - dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); - if (strStats == nullptr) { - throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - char** data = charsBatch->data.data() + offset; - int64_t* length = charsBatch->length.data() + offset; - const char* notNull = charsBatch->hasNulls ? - charsBatch->notNull.data() + offset : nullptr; - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - const char * charData = nullptr; - uint64_t originLength = static_cast<uint64_t>(length[i]); - uint64_t charLength = Utf8Utils::charLength(data[i], originLength); - if (charLength >= maxLength) { - charData = data[i]; - length[i] = static_cast<int64_t>( - Utf8Utils::truncateBytesTo(maxLength, data[i], originLength)); - } else { - charData = padBuffer.data(); - // the padding is exactly 1 byte per char - length[i] = length[i] + static_cast<int64_t>(maxLength - charLength); - memcpy(padBuffer.data(), data[i], originLength); - memset(padBuffer.data() + originLength, - ' ', - static_cast<size_t>(length[i]) - originLength); - } - - if (useDictionary) { - size_t index = dictionary.insert(charData, static_cast<size_t>(length[i])); - dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); - } else { - directDataStream->write(charData, static_cast<size_t>(length[i])); - } - - if (enableBloomFilter) { - bloomFilter->addBytes(data[i], length[i]); - } - strStats->update(charData, static_cast<size_t>(length[i])); - ++count; - } - } - - if (!useDictionary) { - directLengthEncoder->add(length, numValues, notNull); - } - - strStats->increase(count); - if (count < numValues) { - strStats->setHasNull(true); - } - } - - class VarCharColumnWriter : public StringColumnWriter { - public: - VarCharColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - StringColumnWriter(type, factory, options), - maxLength(type.getMaximumLength()) { - // PASS - } - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - private: - uint64_t maxLength; - }; - - void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); - if (charsBatch == nullptr) { - throw InvalidArgument("Failed to cast to StringVectorBatch"); - } - - StringColumnStatisticsImpl* strStats = - dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); - if (strStats == nullptr) { - throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - char* const* data = charsBatch->data.data() + offset; - int64_t* length = charsBatch->length.data() + offset; - const char* notNull = charsBatch->hasNulls ? - charsBatch->notNull.data() + offset : nullptr; - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - uint64_t itemLength = Utf8Utils::truncateBytesTo( - maxLength, data[i], static_cast<uint64_t>(length[i])); - length[i] = static_cast<int64_t>(itemLength); - - if (useDictionary) { - size_t index = dictionary.insert(data[i], static_cast<size_t>(length[i])); - dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); - } else { - directDataStream->write(data[i], static_cast<size_t>(length[i])); - } - - if (enableBloomFilter) { - bloomFilter->addBytes(data[i], length[i]); - } - strStats->update(data[i], static_cast<size_t>(length[i])); - ++count; - } - } - - if (!useDictionary) { - directLengthEncoder->add(length, numValues, notNull); - } - - strStats->increase(count); - if (count < numValues) { - strStats->setHasNull(true); - } - } - - class BinaryColumnWriter : public StringColumnWriter { - public: - BinaryColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - StringColumnWriter(type, factory, options) { - // PASS - } - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - }; - - void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); - if (binBatch == nullptr) { - throw InvalidArgument("Failed to cast to StringVectorBatch"); - } - - BinaryColumnStatisticsImpl* binStats = - dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get()); - if (binStats == nullptr) { - throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - char** data = binBatch->data.data() + offset; - int64_t* length = binBatch->length.data() + offset; - const char* notNull = binBatch->hasNulls ? - binBatch->notNull.data() + offset : nullptr; - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - uint64_t unsignedLength = static_cast<uint64_t>(length[i]); - if (!notNull || notNull[i]) { - directDataStream->write(data[i], unsignedLength); - - binStats->update(unsignedLength); - ++count; - } - } - directLengthEncoder->add(length, numValues, notNull); - binStats->increase(count); - if (count < numValues) { - binStats->setHasNull(true); - } - } - - class TimestampColumnWriter : public ColumnWriter { - public: - TimestampColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - protected: - std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder; - - private: - RleVersion rleVersion; - const Timezone& timezone; - }; - - TimestampColumnWriter::TimestampColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(options.getRleVersion()), - timezone(getTimezoneByName("GMT")){ - std::unique_ptr<BufferedOutputStream> dataStream = - factory.createStream(proto::Stream_Kind_DATA); - std::unique_ptr<BufferedOutputStream> secondaryStream = - factory.createStream(proto::Stream_Kind_SECONDARY); - secRleEncoder = createRleEncoder(std::move(dataStream), - true, - rleVersion, - memPool, - options.getAlignedBitpacking()); - nanoRleEncoder = createRleEncoder(std::move(secondaryStream), - false, - rleVersion, - memPool, - options.getAlignedBitpacking()); - - if (enableIndex) { - recordPosition(); - } - } - - // Because the number of nanoseconds often has a large number of trailing zeros, - // the number has trailing decimal zero digits removed and the last three bits - // are used to record how many zeros were removed if the trailing zeros are - // more than 2. Thus 1000 nanoseconds would be serialized as 0x0a and - // 100000 would be serialized as 0x0c. - static int64_t formatNano(int64_t nanos) { - if (nanos == 0) { - return 0; - } else if (nanos % 100 != 0) { - return (nanos) << 3; - } else { - nanos /= 100; - int64_t trailingZeros = 1; - while (nanos % 10 == 0 && trailingZeros < 7) { - nanos /= 10; - trailingZeros += 1; - } - return (nanos) << 3 | trailingZeros; - } - } - - void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - TimestampVectorBatch* tsBatch = - dynamic_cast<TimestampVectorBatch*>(&rowBatch); - if (tsBatch == nullptr) { - throw InvalidArgument("Failed to cast to TimestampVectorBatch"); - } - - TimestampColumnStatisticsImpl* tsStats = - dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); - if (tsStats == nullptr) { - throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const char* notNull = tsBatch->hasNulls ? - tsBatch->notNull.data() + offset : nullptr; - int64_t *secs = tsBatch->data.data() + offset; - int64_t *nanos = tsBatch->nanoseconds.data() + offset; - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull == nullptr || notNull[i]) { - // TimestampVectorBatch already stores data in UTC - int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(millsUTC); - } - tsStats->update(millsUTC); - +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "orc/Int128.hh" +#include "orc/Writer.hh" + +#include "ByteRLE.hh" +#include "ColumnWriter.hh" +#include "RLE.hh" +#include "Statistics.hh" +#include "Timezone.hh" + +namespace orc { + StreamsFactory::~StreamsFactory() { + //PASS + } + + class StreamsFactoryImpl : public StreamsFactory { + public: + StreamsFactoryImpl( + const WriterOptions& writerOptions, + OutputStream* outputStream) : + options(writerOptions), + outStream(outputStream) { + } + + virtual std::unique_ptr<BufferedOutputStream> + createStream(proto::Stream_Kind kind) const override; + private: + const WriterOptions& options; + OutputStream* outStream; + }; + + std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream( + proto::Stream_Kind) const { + // In the future, we can decide compression strategy and modifier + // based on stream kind. But for now we just use the setting from + // WriterOption + return createCompressor( + options.getCompression(), + outStream, + options.getCompressionStrategy(), + // BufferedOutputStream initial capacity + 1 * 1024 * 1024, + options.getCompressionBlockSize(), + *options.getMemoryPool()); + } + + std::unique_ptr<StreamsFactory> createStreamsFactory( + const WriterOptions& options, + OutputStream* outStream) { + return std::unique_ptr<StreamsFactory>( + new StreamsFactoryImpl(options, outStream)); + } + + RowIndexPositionRecorder::~RowIndexPositionRecorder() { + // PASS + } + + proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion) + { + switch (rleVersion) + { + case RleVersion_1: + return proto::ColumnEncoding_Kind_DIRECT; + case RleVersion_2: + return proto::ColumnEncoding_Kind_DIRECT_V2; + default: + throw InvalidArgument("Invalid param"); + } + } + + ColumnWriter::ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + columnId(type.getColumnId()), + colIndexStatistics(), + colStripeStatistics(), + colFileStatistics(), + enableIndex(options.getEnableIndex()), + rowIndex(), + rowIndexEntry(), + rowIndexPosition(), + enableBloomFilter(false), + memPool(*options.getMemoryPool()), + indexStream(), + bloomFilterStream() { + + std::unique_ptr<BufferedOutputStream> presentStream = + factory.createStream(proto::Stream_Kind_PRESENT); + notNullEncoder = createBooleanRleEncoder(std::move(presentStream)); + + colIndexStatistics = createColumnStatistics(type); + colStripeStatistics = createColumnStatistics(type); + colFileStatistics = createColumnStatistics(type); + + if (enableIndex) { + rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex()); + rowIndexEntry = + std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry()); + rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>( + new RowIndexPositionRecorder(*rowIndexEntry)); + indexStream = + factory.createStream(proto::Stream_Kind_ROW_INDEX); + + // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported + if (options.isColumnUseBloomFilter(columnId) + && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) { + enableBloomFilter = true; + bloomFilter.reset(new BloomFilterImpl( + options.getRowIndexStride(), options.getBloomFilterFPP())); + bloomFilterIndex.reset(new proto::BloomFilterIndex()); + bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8); + } + } + } + + ColumnWriter::~ColumnWriter() { + // PASS + } + + void ColumnWriter::add(ColumnVectorBatch& batch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask); + } + + void ColumnWriter::flush(std::vector<proto::Stream>& streams) { + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_PRESENT); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(notNullEncoder->flush()); + streams.push_back(stream); + } + + uint64_t ColumnWriter::getEstimatedSize() const { + return notNullEncoder->getBufferSize(); + } + + void ColumnWriter::getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + getProtoBufStatistics(stats, colStripeStatistics.get()); + } + + void ColumnWriter::mergeStripeStatsIntoFileStats() { + colFileStatistics->merge(*colStripeStatistics); + colStripeStatistics->reset(); + } + + void ColumnWriter::mergeRowGroupStatsIntoStripeStats() { + colStripeStatistics->merge(*colIndexStatistics); + colIndexStatistics->reset(); + } + + void ColumnWriter::getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + getProtoBufStatistics(stats, colFileStatistics.get()); + } + + void ColumnWriter::createRowIndexEntry() { + proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics(); + colIndexStatistics->toProtoBuf(*indexStats); + + *rowIndex->add_entry() = *rowIndexEntry; + + rowIndexEntry->clear_positions(); + rowIndexEntry->clear_statistics(); + + colStripeStatistics->merge(*colIndexStatistics); + colIndexStatistics->reset(); + + addBloomFilterEntry(); + + recordPosition(); + } + + void ColumnWriter::addBloomFilterEntry() { + if (enableBloomFilter) { + BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter()); + bloomFilter->reset(); + } + } + + void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { + // write row index to output stream + rowIndex->SerializeToZeroCopyStream(indexStream.get()); + + // construct row index stream + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_ROW_INDEX); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(indexStream->flush()); + streams.push_back(stream); + + // write BLOOM_FILTER_UTF8 stream + if (enableBloomFilter) { + if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) { + throw std::logic_error("Failed to write bloom filter stream."); + } + stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(bloomFilterStream->flush()); + streams.push_back(stream); + } + } + + void ColumnWriter::recordPosition() const { + notNullEncoder->recordPosition(rowIndexPosition.get()); + } + + void ColumnWriter::reset() { + if (enableIndex) { + // clear row index + rowIndex->clear_entry(); + rowIndexEntry->clear_positions(); + rowIndexEntry->clear_statistics(); + + // write current positions + recordPosition(); + } + + if (enableBloomFilter) { + bloomFilter->reset(); + bloomFilterIndex->clear_bloomfilter(); + } + } + + void ColumnWriter::writeDictionary() { + // PASS + } + + class StructColumnWriter : public ColumnWriter { + public: + StructColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + ~StructColumnWriter() override; + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void mergeStripeStatsIntoFileStats() override; + + virtual void mergeRowGroupStatsIntoStripeStats() override; + + virtual void createRowIndexEntry() override; + + virtual void writeIndex( + std::vector<proto::Stream> &streams) const override; + + virtual void writeDictionary() override; + + virtual void reset() override; + + private: + std::vector<ColumnWriter *> children; + }; + + StructColumnWriter::StructColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) { + for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) { + const Type& child = *type.getSubtype(i); + children.push_back(buildWriter(child, factory, options).release()); + } + + if (enableIndex) { + recordPosition(); + } + } + + StructColumnWriter::~StructColumnWriter() { + for (uint32_t i = 0; i < children.size(); ++i) { + delete children[i]; + } + } + + void StructColumnWriter::add( + ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const StructVectorBatch* structBatch = + dynamic_cast<const StructVectorBatch *>(&rowBatch); + if (structBatch == nullptr) { + throw InvalidArgument("Failed to cast to StructVectorBatch"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + const char* notNull = structBatch->hasNulls ? + structBatch->notNull.data() + offset : nullptr; + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->add(*structBatch->fields[i], offset, numValues, notNull); + } + + // update stats + if (!notNull) { + colIndexStatistics->increase(numValues); + } else { + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull[i]) { + ++count; + } + } + colIndexStatistics->increase(count); + if (count < numValues) { + colIndexStatistics->setHasNull(true); + } + } + } + + void StructColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->flush(streams); + } + } + + void StructColumnWriter::writeIndex( + std::vector<proto::Stream> &streams) const { + ColumnWriter::writeIndex(streams); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->writeIndex(streams); + } + } + + uint64_t StructColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + for (uint32_t i = 0; i < children.size(); ++i) { + size += children[i]->getEstimatedSize(); + } + return size; + } + + void StructColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->getColumnEncoding(encodings); + } + } + + void StructColumnWriter::getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getStripeStatistics(stats); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->getStripeStatistics(stats); + } + } + + void StructColumnWriter::mergeStripeStatsIntoFileStats() { + ColumnWriter::mergeStripeStatsIntoFileStats(); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->mergeStripeStatsIntoFileStats(); + } + } + + void StructColumnWriter::getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getFileStatistics(stats); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->getFileStatistics(stats); + } + } + + void StructColumnWriter::mergeRowGroupStatsIntoStripeStats() { + ColumnWriter::mergeRowGroupStatsIntoStripeStats(); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->mergeRowGroupStatsIntoStripeStats(); + } + } + + void StructColumnWriter::createRowIndexEntry() { + ColumnWriter::createRowIndexEntry(); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->createRowIndexEntry(); + } + } + + void StructColumnWriter::reset() { + ColumnWriter::reset(); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->reset(); + } + } + + void StructColumnWriter::writeDictionary() { + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->writeDictionary(); + } + } + + class IntegerColumnWriter : public ColumnWriter { + public: + IntegerColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> rleEncoder; + + private: + RleVersion rleVersion; + }; + + IntegerColumnWriter::IntegerColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(options.getRleVersion()) { + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + rleEncoder = createRleEncoder( + std::move(dataStream), + true, + rleVersion, + memPool, + options.getAlignedBitpacking()); + + if (enableIndex) { + recordPosition(); + } + } + + void IntegerColumnWriter::add( + ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const LongVectorBatch* longBatch = + dynamic_cast<const LongVectorBatch*>(&rowBatch); + if (longBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + IntegerColumnStatisticsImpl* intStats = + dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); + if (intStats == nullptr) { + throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const int64_t* data = longBatch->data.data() + offset; + const char* notNull = longBatch->hasNulls ? + longBatch->notNull.data() + offset : nullptr; + + rleEncoder->add(data, numValues, notNull); + + // update stats + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(data[i]); + } + intStats->update(data[i], 1); + } + } + intStats->increase(count); + if (count < numValues) { + intStats->setHasNull(true); + } + } + + void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t IntegerColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void IntegerColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(RleVersionMapper(rleVersion)); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void IntegerColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) { + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + byteRleEncoder = createByteRleEncoder(std::move(dataStream)); + + if (enableIndex) { + recordPosition(); + } + } + + void ByteColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); + if (byteBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + IntegerColumnStatisticsImpl* intStats = + dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); + if (intStats == nullptr) { + throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + int64_t* data = byteBatch->data.data() + offset; + const char* notNull = byteBatch->hasNulls ? + byteBatch->notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + byteRleEncoder->add(byteData, numValues, notNull); + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(data[i]); + } + intStats->update(static_cast<int64_t>(byteData[i]), 1); + } + } + intStats->increase(count); + if (count < numValues) { + intStats->setHasNull(true); + } + } + + void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(byteRleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t ByteColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += byteRleEncoder->getBufferSize(); + return size; + } + + void ByteColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void ByteColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + byteRleEncoder->recordPosition(rowIndexPosition.get()); + } + + class BooleanColumnWriter : public ColumnWriter { + public: + BooleanColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> rleEncoder; + }; + + BooleanColumnWriter::BooleanColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) { + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + rleEncoder = createBooleanRleEncoder(std::move(dataStream)); + + if (enableIndex) { + recordPosition(); + } + } + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); + if (byteBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + if (boolStats == nullptr) { + throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + int64_t* data = byteBatch->data.data() + offset; + const char* notNull = byteBatch->hasNulls ? + byteBatch->notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(data[i]); + } + boolStats->update(byteData[i] != 0, 1); + } + } + boolStats->increase(count); + if (count < numValues) { + boolStats->setHasNull(true); + } + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + // Floating point types are stored using IEEE 754 floating point bit layout. + // Float columns use 4 bytes per value and double columns use 8 bytes. + template <typename FLOAT_TYPE, typename INTEGER_TYPE> + inline void encodeFloatNum(FLOAT_TYPE input, char* output) { + INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input); + for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) { + output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const DoubleVectorBatch* dblBatch = + dynamic_cast<const DoubleVectorBatch*>(&rowBatch); + if (dblBatch == nullptr) { + throw InvalidArgument("Failed to cast to DoubleVectorBatch"); + } + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + if (doubleStats == nullptr) { + throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const double* doubleData = dblBatch->data.data() + offset; + const char* notNull = dblBatch->hasNulls ? + dblBatch->notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + if (isFloat) { + encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data); + } else { + encodeFloatNum<double, int64_t>(doubleData[i], data); + } + dataStream->write(data, bytes); + ++count; + if (enableBloomFilter) { + bloomFilter->addDouble(doubleData[i]); + } + doubleStats->update(doubleData[i]); + } + } + doubleStats->increase(count); + if (count < numValues) { + doubleStats->setHasNull(true); + } + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + /** + * Implementation of increasing sorted string dictionary + */ + class SortedStringDictionary { + public: + struct DictEntry { + DictEntry(const char * str, size_t len):data(str),length(len) {} + const char * data; + size_t length; + }; + + SortedStringDictionary():totalLength(0) {} + + // insert a new string into dictionary, return its insertion order + size_t insert(const char * data, size_t len); + + // write dictionary data & length to output buffer + void flush(AppendOnlyBufferedStream * dataStream, + RleEncoder * lengthEncoder) const; + + // reorder input index buffer from insertion order to dictionary order + void reorder(std::vector<int64_t>& idxBuffer) const; + + // get dict entries in insertion order + void getEntriesInInsertionOrder(std::vector<const DictEntry *>&) const; + + // return count of entries + size_t size() const; + + // return total length of strings in the dictioanry + uint64_t length() const; + + void clear(); + + private: + struct LessThan { + bool operator()(const DictEntry& left, const DictEntry& right) const { + int ret = memcmp(left.data, right.data, std::min(left.length, right.length)); + if (ret != 0) { + return ret < 0; + } + return left.length < right.length; + } + }; + + std::map<DictEntry, size_t, LessThan> dict; + std::vector<std::vector<char>> data; + uint64_t totalLength; + + // use friend class here to avoid being bothered by const function calls + friend class StringColumnWriter; + friend class CharColumnWriter; + friend class VarCharColumnWriter; + // store indexes of insertion order in the dictionary for not-null rows + std::vector<int64_t> idxInDictBuffer; + }; + + // insert a new string into dictionary, return its insertion order + size_t SortedStringDictionary::insert(const char * str, size_t len) { + auto ret = dict.insert({DictEntry(str, len), dict.size()}); + if (ret.second) { + // make a copy to internal storage + data.push_back(std::vector<char>(len)); + memcpy(data.back().data(), str, len); + // update dictionary entry to link pointer to internal storage + DictEntry * entry = const_cast<DictEntry *>(&(ret.first->first)); + entry->data = data.back().data(); + totalLength += len; + } + return ret.first->second; + } + + // write dictionary data & length to output buffer + void SortedStringDictionary::flush(AppendOnlyBufferedStream * dataStream, + RleEncoder * lengthEncoder) const { + for (auto it = dict.cbegin(); it != dict.cend(); ++it) { + dataStream->write(it->first.data, it->first.length); + lengthEncoder->write(static_cast<int64_t>(it->first.length)); + } + } + + /** + * Reorder input index buffer from insertion order to dictionary order + * + * We require this function because string values are buffered by indexes + * in their insertion order. Until the entire dictionary is complete can + * we get their sorted indexes in the dictionary in that ORC specification + * demands dictionary should be ordered. Therefore this function transforms + * the indexes from insertion order to dictionary value order for final + * output. + */ + void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const { + // iterate the dictionary to get mapping from insertion order to value order + std::vector<size_t> mapping(dict.size()); + size_t dictIdx = 0; + for (auto it = dict.cbegin(); it != dict.cend(); ++it) { + mapping[it->second] = dictIdx++; + } + + // do the transformation + for (size_t i = 0; i != idxBuffer.size(); ++i) { + idxBuffer[i] = static_cast<int64_t>( + mapping[static_cast<size_t>(idxBuffer[i])]); + } + } + + // get dict entries in insertion order + void SortedStringDictionary::getEntriesInInsertionOrder( + std::vector<const DictEntry *>& entries) const { + entries.resize(dict.size()); + for (auto it = dict.cbegin(); it != dict.cend(); ++it) { + entries[it->second] = &(it->first); + } + } + + // return count of entries + size_t SortedStringDictionary::size() const { + return dict.size(); + } + + // return total length of strings in the dictioanry + uint64_t SortedStringDictionary::length() const { + return totalLength; + } + + void SortedStringDictionary::clear() { + totalLength = 0; + data.clear(); + dict.clear(); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + virtual void createRowIndexEntry() override; + + virtual void writeDictionary() override; + + virtual void reset() override; + + private: + /** + * dictionary related functions + */ + bool checkDictionaryKeyRatio(); + void createDirectStreams(); + void createDictStreams(); + void deleteDictStreams(); + void fallbackToDirectEncoding(); + + protected: + RleVersion rleVersion; + bool useCompression; + const StreamsFactory& streamsFactory; + bool alignedBitPacking; + + // direct encoding streams + std::unique_ptr<RleEncoder> directLengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> directDataStream; + + // dictionary encoding streams + std::unique_ptr<RleEncoder> dictDataEncoder; + std::unique_ptr<RleEncoder> dictLengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dictStream; + + /** + * dictionary related variables + */ + SortedStringDictionary dictionary; + // whether or not dictionary checking is done + bool doneDictionaryCheck; + // whether or not it should be used + bool useDictionary; + // keys in the dictionary should not exceed this ratio + double dictSizeThreshold; + + // record start row of each row group; null rows are skipped + mutable std::vector<size_t> startOfRowGroups; + }; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(options.getRleVersion()), + useCompression(options.getCompression() != CompressionKind_NONE), + streamsFactory(factory), + alignedBitPacking(options.getAlignedBitpacking()), + doneDictionaryCheck(false), + useDictionary(options.getEnableDictionary()), + dictSizeThreshold(options.getDictionaryKeySizeThreshold()){ + if (type.getKind() == TypeKind::BINARY) { + useDictionary = false; + doneDictionaryCheck = true; + } + + if (useDictionary) { + createDictStreams(); + } else { + doneDictionaryCheck = true; + createDirectStreams(); + } + + if (enableIndex) { + recordPosition(); + } + } + + void StringColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const StringVectorBatch* stringBatch = + dynamic_cast<const StringVectorBatch*>(&rowBatch); + if (stringBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + if (strStats == nullptr) { + throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + char *const * data = stringBatch->data.data() + offset; + const int64_t* length = stringBatch->length.data() + offset; + const char* notNull = stringBatch->hasNulls ? + stringBatch->notNull.data() + offset : nullptr; + + if (!useDictionary){ + directLengthEncoder->add(length, numValues, notNull); + } + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + const size_t len = static_cast<size_t>(length[i]); + if (useDictionary) { + size_t index = dictionary.insert(data[i], len); + dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); + } else { + directDataStream->write(data[i], len); + } + if (enableBloomFilter) { + bloomFilter->addBytes(data[i], static_cast<int64_t>(len)); + } + strStats->update(data[i], len); + ++count; + } + } + strStats->increase(count); + if (count < numValues) { + strStats->setHasNull(true); + } + } + + void StringColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + if (useDictionary) { + proto::Stream data; + data.set_kind(proto::Stream_Kind_DATA); + data.set_column(static_cast<uint32_t>(columnId)); + data.set_length(dictDataEncoder->flush()); + streams.push_back(data); + + proto::Stream dict; + dict.set_kind(proto::Stream_Kind_DICTIONARY_DATA); + dict.set_column(static_cast<uint32_t>(columnId)); + dict.set_length(dictStream->flush()); + streams.push_back(dict); + + proto::Stream length; + length.set_kind(proto::Stream_Kind_LENGTH); + length.set_column(static_cast<uint32_t>(columnId)); + length.set_length(dictLengthEncoder->flush()); + streams.push_back(length); + } else { + proto::Stream length; + length.set_kind(proto::Stream_Kind_LENGTH); + length.set_column(static_cast<uint32_t>(columnId)); + length.set_length(directLengthEncoder->flush()); + streams.push_back(length); + + proto::Stream data; + data.set_kind(proto::Stream_Kind_DATA); + data.set_column(static_cast<uint32_t>(columnId)); + data.set_length(directDataStream->flush()); + streams.push_back(data); + } + } + + uint64_t StringColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + if (!useDictionary) { + size += directLengthEncoder->getBufferSize(); + size += directDataStream->getSize(); + } else { + size += dictionary.length(); + size += dictionary.size() * sizeof(int32_t); + size += dictionary.idxInDictBuffer.size() * sizeof(int32_t); + if (useCompression) { + size /= 3; // estimated ratio is 3:1 + } + } + return size; + } + + void StringColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + if (!useDictionary) { + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + } else { + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DICTIONARY : + proto::ColumnEncoding_Kind_DICTIONARY_V2); + } + encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size())); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void StringColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + if (!useDictionary) { + directDataStream->recordPosition(rowIndexPosition.get()); + directLengthEncoder->recordPosition(rowIndexPosition.get()); + } else { + if (enableIndex) { + startOfRowGroups.push_back(dictionary.idxInDictBuffer.size()); + } + } + } + + bool StringColumnWriter::checkDictionaryKeyRatio() { + if (!doneDictionaryCheck) { + useDictionary = dictionary.size() <= static_cast<size_t>( + static_cast<double>(dictionary.idxInDictBuffer.size()) * dictSizeThreshold); + doneDictionaryCheck = true; + } + + return useDictionary; + } + + void StringColumnWriter::createRowIndexEntry() { + if (useDictionary && !doneDictionaryCheck) { + if (!checkDictionaryKeyRatio()) { + fallbackToDirectEncoding(); + } + } + ColumnWriter::createRowIndexEntry(); + } + + void StringColumnWriter::reset() { + ColumnWriter::reset(); + + dictionary.clear(); + dictionary.idxInDictBuffer.resize(0); + startOfRowGroups.clear(); + startOfRowGroups.push_back(0); + } + + void StringColumnWriter::createDirectStreams() { + std::unique_ptr<BufferedOutputStream> directLengthStream = + streamsFactory.createStream(proto::Stream_Kind_LENGTH); + directLengthEncoder = createRleEncoder(std::move(directLengthStream), + false, + rleVersion, + memPool, + alignedBitPacking); + directDataStream.reset(new AppendOnlyBufferedStream( + streamsFactory.createStream(proto::Stream_Kind_DATA))); + } + + void StringColumnWriter::createDictStreams() { + std::unique_ptr<BufferedOutputStream> dictDataStream = + streamsFactory.createStream(proto::Stream_Kind_DATA); + dictDataEncoder = createRleEncoder(std::move(dictDataStream), + false, + rleVersion, + memPool, + alignedBitPacking); + std::unique_ptr<BufferedOutputStream> dictLengthStream = + streamsFactory.createStream(proto::Stream_Kind_LENGTH); + dictLengthEncoder = createRleEncoder(std::move(dictLengthStream), + false, + rleVersion, + memPool, + alignedBitPacking); + dictStream.reset(new AppendOnlyBufferedStream( + streamsFactory.createStream(proto::Stream_Kind_DICTIONARY_DATA))); + } + + void StringColumnWriter::deleteDictStreams() { + dictDataEncoder.reset(nullptr); + dictLengthEncoder.reset(nullptr); + dictStream.reset(nullptr); + + dictionary.clear(); + dictionary.idxInDictBuffer.clear(); + startOfRowGroups.clear(); + } + + void StringColumnWriter::writeDictionary() { + if (useDictionary && !doneDictionaryCheck) { + // when index is disabled, dictionary check happens while writing 1st stripe + if (!checkDictionaryKeyRatio()) { + fallbackToDirectEncoding(); + return; + } + } + + if (useDictionary) { + // flush dictionary data & length streams + dictionary.flush(dictStream.get(), dictLengthEncoder.get()); + + // convert index from insertion order to dictionary order + dictionary.reorder(dictionary.idxInDictBuffer); + + // write data sequences + int64_t * data = dictionary.idxInDictBuffer.data(); + if (enableIndex) { + size_t prevOffset = 0; + for (size_t i = 0; i < startOfRowGroups.size(); ++i) { + // write sequences in batch for a row group stride + size_t offset = startOfRowGroups[i]; + dictDataEncoder->add(data + prevOffset, offset - prevOffset, nullptr); + + // update index positions + int rowGroupId = static_cast<int>(i); + proto::RowIndexEntry* indexEntry = + (rowGroupId < rowIndex->entry_size()) ? + rowIndex->mutable_entry(rowGroupId) : rowIndexEntry.get(); + + // add positions for direct streams + RowIndexPositionRecorder recorder(*indexEntry); + dictDataEncoder->recordPosition(&recorder); + + prevOffset = offset; + } + + dictDataEncoder->add(data + prevOffset, + dictionary.idxInDictBuffer.size() - prevOffset, + nullptr); + } else { + dictDataEncoder->add(data, dictionary.idxInDictBuffer.size(), nullptr); + } + } + } + + void StringColumnWriter::fallbackToDirectEncoding() { + createDirectStreams(); + + if (enableIndex) { + // fallback happens at the 1st row group; + // simply complete positions for direct streams + proto::RowIndexEntry * indexEntry = rowIndexEntry.get(); + RowIndexPositionRecorder recorder(*indexEntry); + directDataStream->recordPosition(&recorder); + directLengthEncoder->recordPosition(&recorder); + } + + // get dictionary entries in insertion order + std::vector<const SortedStringDictionary::DictEntry *> entries; + dictionary.getEntriesInInsertionOrder(entries); + + // store each length of the data into a vector + const SortedStringDictionary::DictEntry * dictEntry = nullptr; + for (uint64_t i = 0; i != dictionary.idxInDictBuffer.size(); ++i) { + // write one row data in direct encoding + dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer[i])]; + directDataStream->write(dictEntry->data, dictEntry->length); + directLengthEncoder->write(static_cast<int64_t>(dictEntry->length)); + } + + deleteDictStreams(); + } + + struct Utf8Utils { + /** + * Counts how many utf-8 chars of the input data + */ + static uint64_t charLength(const char * data, uint64_t length) { + uint64_t chars = 0; + for (uint64_t i = 0; i < length; i++) { + if (isUtfStartByte(data[i])) { + chars++; + } + } + return chars; + } + + /** + * Return the number of bytes required to read at most maxCharLength + * characters in full from a utf-8 encoded byte array provided + * by data. This does not validate utf-8 data, but + * operates correctly on already valid utf-8 data. + * + * @param maxCharLength number of characters required + * @param data the bytes of UTF-8 + * @param length the length of data to truncate + */ + static uint64_t truncateBytesTo(uint64_t maxCharLength, + const char * data, + uint64_t length) { + uint64_t chars = 0; + if (length <= maxCharLength) { + return length; + } + for (uint64_t i = 0; i < length; i++) { + if (isUtfStartByte(data[i])) { + chars++; + } + if (chars > maxCharLength) { + return i; + } + } + // everything fits + return length; + } + + /** + * Checks if b is the first byte of a UTF-8 character. + */ + inline static bool isUtfStartByte(char b) { + return (b & 0xC0) != 0x80; + } + + /** + * Find the start of the last character that ends in the current string. + * @param text the bytes of the utf-8 + * @param from the first byte location + * @param until the last byte location + * @return the index of the last character + */ + static uint64_t findLastCharacter(const char * text, uint64_t from, uint64_t until) { + uint64_t posn = until; + /* we don't expect characters more than 5 bytes */ + while (posn >= from) { + if (isUtfStartByte(text[posn])) { + return posn; + } + posn -= 1; + } + /* beginning of a valid char not found */ + throw std::logic_error( + "Could not truncate string, beginning of a valid char not found"); + } + }; + + class CharColumnWriter : public StringColumnWriter { + public: + CharColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + StringColumnWriter(type, factory, options), + maxLength(type.getMaximumLength()), + padBuffer(*options.getMemoryPool()) { + // utf-8 is currently 4 bytes long, but it could be up to 6 + padBuffer.resize(maxLength * 6); + } + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + private: + uint64_t maxLength; + DataBuffer<char> padBuffer; + }; + + void CharColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); + if (charsBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + if (strStats == nullptr) { + throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + char** data = charsBatch->data.data() + offset; + int64_t* length = charsBatch->length.data() + offset; + const char* notNull = charsBatch->hasNulls ? + charsBatch->notNull.data() + offset : nullptr; + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + const char * charData = nullptr; + uint64_t originLength = static_cast<uint64_t>(length[i]); + uint64_t charLength = Utf8Utils::charLength(data[i], originLength); + if (charLength >= maxLength) { + charData = data[i]; + length[i] = static_cast<int64_t>( + Utf8Utils::truncateBytesTo(maxLength, data[i], originLength)); + } else { + charData = padBuffer.data(); + // the padding is exactly 1 byte per char + length[i] = length[i] + static_cast<int64_t>(maxLength - charLength); + memcpy(padBuffer.data(), data[i], originLength); + memset(padBuffer.data() + originLength, + ' ', + static_cast<size_t>(length[i]) - originLength); + } + + if (useDictionary) { + size_t index = dictionary.insert(charData, static_cast<size_t>(length[i])); + dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); + } else { + directDataStream->write(charData, static_cast<size_t>(length[i])); + } + + if (enableBloomFilter) { + bloomFilter->addBytes(data[i], length[i]); + } + strStats->update(charData, static_cast<size_t>(length[i])); + ++count; + } + } + + if (!useDictionary) { + directLengthEncoder->add(length, numValues, notNull); + } + + strStats->increase(count); + if (count < numValues) { + strStats->setHasNull(true); + } + } + + class VarCharColumnWriter : public StringColumnWriter { + public: + VarCharColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + StringColumnWriter(type, factory, options), + maxLength(type.getMaximumLength()) { + // PASS + } + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + private: + uint64_t maxLength; + }; + + void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); + if (charsBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + if (strStats == nullptr) { + throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + char* const* data = charsBatch->data.data() + offset; + int64_t* length = charsBatch->length.data() + offset; + const char* notNull = charsBatch->hasNulls ? + charsBatch->notNull.data() + offset : nullptr; + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + uint64_t itemLength = Utf8Utils::truncateBytesTo( + maxLength, data[i], static_cast<uint64_t>(length[i])); + length[i] = static_cast<int64_t>(itemLength); + + if (useDictionary) { + size_t index = dictionary.insert(data[i], static_cast<size_t>(length[i])); + dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); + } else { + directDataStream->write(data[i], static_cast<size_t>(length[i])); + } + + if (enableBloomFilter) { + bloomFilter->addBytes(data[i], length[i]); + } + strStats->update(data[i], static_cast<size_t>(length[i])); + ++count; + } + } + + if (!useDictionary) { + directLengthEncoder->add(length, numValues, notNull); + } + + strStats->increase(count); + if (count < numValues) { + strStats->setHasNull(true); + } + } + + class BinaryColumnWriter : public StringColumnWriter { + public: + BinaryColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + StringColumnWriter(type, factory, options) { + // PASS + } + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + }; + + void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); + if (binBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + BinaryColumnStatisticsImpl* binStats = + dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get()); + if (binStats == nullptr) { + throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + char** data = binBatch->data.data() + offset; + int64_t* length = binBatch->length.data() + offset; + const char* notNull = binBatch->hasNulls ? + binBatch->notNull.data() + offset : nullptr; + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + uint64_t unsignedLength = static_cast<uint64_t>(length[i]); + if (!notNull || notNull[i]) { + directDataStream->write(data[i], unsignedLength); + + binStats->update(unsignedLength); + ++count; + } + } + directLengthEncoder->add(length, numValues, notNull); + binStats->increase(count); + if (count < numValues) { + binStats->setHasNull(true); + } + } + + class TimestampColumnWriter : public ColumnWriter { + public: + TimestampColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder; + + private: + RleVersion rleVersion; + const Timezone& timezone; + }; + + TimestampColumnWriter::TimestampColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(options.getRleVersion()), + timezone(getTimezoneByName("GMT")){ + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + std::unique_ptr<BufferedOutputStream> secondaryStream = + factory.createStream(proto::Stream_Kind_SECONDARY); + secRleEncoder = createRleEncoder(std::move(dataStream), + true, + rleVersion, + memPool, + options.getAlignedBitpacking()); + nanoRleEncoder = createRleEncoder(std::move(secondaryStream), + false, + rleVersion, + memPool, + options.getAlignedBitpacking()); + + if (enableIndex) { + recordPosition(); + } + } + + // Because the number of nanoseconds often has a large number of trailing zeros, + // the number has trailing decimal zero digits removed and the last three bits + // are used to record how many zeros were removed if the trailing zeros are + // more than 2. Thus 1000 nanoseconds would be serialized as 0x0a and + // 100000 would be serialized as 0x0c. + static int64_t formatNano(int64_t nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return (nanos) << 3; + } else { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + } + } + + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + TimestampVectorBatch* tsBatch = + dynamic_cast<TimestampVectorBatch*>(&rowBatch); + if (tsBatch == nullptr) { + throw InvalidArgument("Failed to cast to TimestampVectorBatch"); + } + + TimestampColumnStatisticsImpl* tsStats = + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); + if (tsStats == nullptr) { + throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const char* notNull = tsBatch->hasNulls ? + tsBatch->notNull.data() + offset : nullptr; + int64_t *secs = tsBatch->data.data() + offset; + int64_t *nanos = tsBatch->nanoseconds.data() + offset; + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(millsUTC); + } + tsStats->update(millsUTC); + if (secs[i] < 0 && nanos[i] > 999999) { - secs[i] += 1; - } - - secs[i] -= timezone.getEpoch(); - nanos[i] = formatNano(nanos[i]); - } - } - tsStats->increase(count); - if (count < numValues) { - tsStats->setHasNull(true); - } - - secRleEncoder->add(secs, numValues, notNull); - nanoRleEncoder->add(nanos, numValues, notNull); - } - - void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream dataStream; - dataStream.set_kind(proto::Stream_Kind_DATA); - dataStream.set_column(static_cast<uint32_t>(columnId)); - dataStream.set_length(secRleEncoder->flush()); - streams.push_back(dataStream); - - proto::Stream secondaryStream; - secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); - secondaryStream.set_column(static_cast<uint32_t>(columnId)); - secondaryStream.set_length(nanoRleEncoder->flush()); - streams.push_back(secondaryStream); - } - - uint64_t TimestampColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += secRleEncoder->getBufferSize(); - size += nanoRleEncoder->getBufferSize(); - return size; - } - - void TimestampColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(RleVersionMapper(rleVersion)); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void TimestampColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - secRleEncoder->recordPosition(rowIndexPosition.get()); - nanoRleEncoder->recordPosition(rowIndexPosition.get()); - } - - class DateColumnWriter : public IntegerColumnWriter { - public: - DateColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - }; - - DateColumnWriter::DateColumnWriter( - const Type &type, - const StreamsFactory &factory, - const WriterOptions &options) : - IntegerColumnWriter(type, factory, options) { - // PASS - } - - void DateColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const LongVectorBatch* longBatch = - dynamic_cast<const LongVectorBatch*>(&rowBatch); - if (longBatch == nullptr) { - throw InvalidArgument("Failed to cast to LongVectorBatch"); - } - - DateColumnStatisticsImpl* dateStats = - dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get()); - if (dateStats == nullptr) { - throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const int64_t* data = longBatch->data.data() + offset; - const char* notNull = longBatch->hasNulls ? - longBatch->notNull.data() + offset : nullptr; - - rleEncoder->add(data, numValues, notNull); - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - ++count; - dateStats->update(static_cast<int32_t>(data[i])); - if (enableBloomFilter) { - bloomFilter->addLong(data[i]); - } - } - } - dateStats->increase(count); - if (count < numValues) { - dateStats->setHasNull(true); - } - } - - class Decimal64ColumnWriter : public ColumnWriter { - public: - static const uint32_t MAX_PRECISION_64 = 18; - static const uint32_t MAX_PRECISION_128 = 38; - - Decimal64ColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void recordPosition() const override; - - protected: - RleVersion rleVersion; - uint64_t precision; - uint64_t scale; - std::unique_ptr<AppendOnlyBufferedStream> valueStream; - std::unique_ptr<RleEncoder> scaleEncoder; - - private: - char buffer[10]; - }; - - Decimal64ColumnWriter::Decimal64ColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(options.getRleVersion()), - precision(type.getPrecision()), - scale(type.getScale()) { - valueStream.reset(new AppendOnlyBufferedStream( - factory.createStream(proto::Stream_Kind_DATA))); - std::unique_ptr<BufferedOutputStream> scaleStream = - factory.createStream(proto::Stream_Kind_SECONDARY); - scaleEncoder = createRleEncoder(std::move(scaleStream), - true, - rleVersion, - memPool, - options.getAlignedBitpacking()); - - if (enableIndex) { - recordPosition(); - } - } - - void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const Decimal64VectorBatch* decBatch = - dynamic_cast<const Decimal64VectorBatch*>(&rowBatch); - if (decBatch == nullptr) { - throw InvalidArgument("Failed to cast to Decimal64VectorBatch"); - } - - DecimalColumnStatisticsImpl* decStats = - dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); - if (decStats == nullptr) { - throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const char* notNull = decBatch->hasNulls ? - decBatch->notNull.data() + offset : nullptr; - const int64_t* values = decBatch->values.data() + offset; - - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - int64_t val = zigZag(values[i]); - char* data = buffer; - while (true) { - if ((val & ~0x7f) == 0) { - *(data++) = (static_cast<char>(val)); - break; - } else { - *(data++) = static_cast<char>(0x80 | (val & 0x7f)); - // cast val to unsigned so as to force 0-fill right shift - val = (static_cast<uint64_t>(val) >> 7); - } - } - valueStream->write(buffer, static_cast<size_t>(data - buffer)); - ++count; - if (enableBloomFilter) { - std::string decimal = Decimal( - values[i], static_cast<int32_t>(scale)).toString(); - bloomFilter->addBytes( - decimal.c_str(), static_cast<int64_t>(decimal.size())); - } - decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); - } - } - decStats->increase(count); - if (count < numValues) { - decStats->setHasNull(true); - } - std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); - scaleEncoder->add(scales.data(), numValues, notNull); - } - - void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream dataStream; - dataStream.set_kind(proto::Stream_Kind_DATA); - dataStream.set_column(static_cast<uint32_t>(columnId)); - dataStream.set_length(valueStream->flush()); - streams.push_back(dataStream); - - proto::Stream secondaryStream; - secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); - secondaryStream.set_column(static_cast<uint32_t>(columnId)); - secondaryStream.set_length(scaleEncoder->flush()); - streams.push_back(secondaryStream); - } - - uint64_t Decimal64ColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += valueStream->getSize(); - size += scaleEncoder->getBufferSize(); - return size; - } - - void Decimal64ColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(RleVersionMapper(rleVersion)); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - } - - void Decimal64ColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - valueStream->recordPosition(rowIndexPosition.get()); - scaleEncoder->recordPosition(rowIndexPosition.get()); - } - - class Decimal128ColumnWriter : public Decimal64ColumnWriter { - public: - Decimal128ColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - private: - char buffer[20]; - }; - - Decimal128ColumnWriter::Decimal128ColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - Decimal64ColumnWriter(type, factory, options) { - // PASS - } - - // Zigzag encoding moves the sign bit to the least significant bit using the - // expression (val « 1) ^ (val » 63) and derives its name from the fact that - // positive and negative numbers alternate once encoded. - Int128 zigZagInt128(const Int128& value) { - bool isNegative = value < 0; - Int128 val = value.abs(); - val <<= 1; - if (isNegative) { - val -= 1; - } - return val; - } - - void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - const Decimal128VectorBatch* decBatch = - dynamic_cast<const Decimal128VectorBatch*>(&rowBatch); - if (decBatch == nullptr) { - throw InvalidArgument("Failed to cast to Decimal128VectorBatch"); - } - - DecimalColumnStatisticsImpl* decStats = - dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); - if (decStats == nullptr) { - throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const char* notNull = decBatch->hasNulls ? - decBatch->notNull.data() + offset : nullptr; - const Int128* values = decBatch->values.data() + offset; - - // The current encoding of decimal columns stores the integer representation - // of the value as an unbounded length zigzag encoded base 128 varint. - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - Int128 val = zigZagInt128(values[i]); - char* data = buffer; - while (true) { - if ((val & ~0x7f) == 0) { - *(data++) = (static_cast<char>(val.getLowBits())); - break; - } else { - *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f)); - val >>= 7; - } - } - valueStream->write(buffer, static_cast<size_t>(data - buffer)); - - ++count; - if (enableBloomFilter) { - std::string decimal = Decimal( - values[i], static_cast<int32_t>(scale)).toString(); - bloomFilter->addBytes( - decimal.c_str(), static_cast<int64_t>(decimal.size())); - } - decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); - } - } - decStats->increase(count); - if (count < numValues) { - decStats->setHasNull(true); - } - std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); - scaleEncoder->add(scales.data(), numValues, notNull); - } - - class ListColumnWriter : public ColumnWriter { - public: - ListColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - ~ListColumnWriter() override; - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void mergeStripeStatsIntoFileStats() override; - - virtual void mergeRowGroupStatsIntoStripeStats() override; - - virtual void createRowIndexEntry() override; - - virtual void writeIndex( - std::vector<proto::Stream> &streams) const override; - - virtual void recordPosition() const override; - - virtual void writeDictionary() override; - - virtual void reset() override; - - private: - std::unique_ptr<RleEncoder> lengthEncoder; - RleVersion rleVersion; - std::unique_ptr<ColumnWriter> child; - }; - - ListColumnWriter::ListColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(options.getRleVersion()){ - - std::unique_ptr<BufferedOutputStream> lengthStream = - factory.createStream(proto::Stream_Kind_LENGTH); - lengthEncoder = createRleEncoder(std::move(lengthStream), - false, - rleVersion, - memPool, - options.getAlignedBitpacking()); - - if (type.getSubtypeCount() == 1) { - child = buildWriter(*type.getSubtype(0), factory, options); - } - - if (enableIndex) { - recordPosition(); - } - } - - ListColumnWriter::~ListColumnWriter() { - // PASS - } - - void ListColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch); - if (listBatch == nullptr) { - throw InvalidArgument("Failed to cast to ListVectorBatch"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - int64_t* offsets = listBatch->offsets.data() + offset; - const char* notNull = listBatch->hasNulls ? - listBatch->notNull.data() + offset : nullptr; - - uint64_t elemOffset = static_cast<uint64_t>(offsets[0]); - uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]); - - // translate offsets to lengths - for (uint64_t i = 0; i != numValues; ++i) { - offsets[i] = offsets[i + 1] - offsets[i]; - } - - // unnecessary to deal with null as elements are packed together - if (child.get()) { - child->add(*listBatch->elements, elemOffset, totalNumValues, nullptr); - } - lengthEncoder->add(offsets, numValues, notNull); - - if (enableIndex) { - if (!notNull) { - colIndexStatistics->increase(numValues); - } else { - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull[i]) { - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(offsets[i]); - } - } - } - colIndexStatistics->increase(count); - if (count < numValues) { - colIndexStatistics->setHasNull(true); - } - } - } - } - - void ListColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_LENGTH); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(lengthEncoder->flush()); - streams.push_back(stream); - - if (child.get()) { - child->flush(streams); - } - } - - void ListColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { - ColumnWriter::writeIndex(streams); - if (child.get()) { - child->writeIndex(streams); - } - } - - uint64_t ListColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - if (child.get()) { - size += lengthEncoder->getBufferSize(); - size += child->getEstimatedSize(); - } - return size; - } - - void ListColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(RleVersionMapper(rleVersion)); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - if (child.get()) { - child->getColumnEncoding(encodings); - } - } - - void ListColumnWriter::getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getStripeStatistics(stats); - if (child.get()) { - child->getStripeStatistics(stats); - } - } - - void ListColumnWriter::mergeStripeStatsIntoFileStats() { - ColumnWriter::mergeStripeStatsIntoFileStats(); - if (child.get()) { - child->mergeStripeStatsIntoFileStats(); - } - } - - void ListColumnWriter::getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getFileStatistics(stats); - if (child.get()) { - child->getFileStatistics(stats); - } - } - - void ListColumnWriter::mergeRowGroupStatsIntoStripeStats() { - ColumnWriter::mergeRowGroupStatsIntoStripeStats(); - if (child.get()) { - child->mergeRowGroupStatsIntoStripeStats(); - } - } - - void ListColumnWriter::createRowIndexEntry() { - ColumnWriter::createRowIndexEntry(); - if (child.get()) { - child->createRowIndexEntry(); - } - } - - void ListColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - lengthEncoder->recordPosition(rowIndexPosition.get()); - } - - void ListColumnWriter::reset() { - ColumnWriter::reset(); - if (child) { - child->reset(); - } - } - - void ListColumnWriter::writeDictionary() { - if (child) { - child->writeDictionary(); - } - } - - class MapColumnWriter : public ColumnWriter { - public: - MapColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - ~MapColumnWriter() override; - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void mergeStripeStatsIntoFileStats() override; - - virtual void mergeRowGroupStatsIntoStripeStats() override; - - virtual void createRowIndexEntry() override; - - virtual void writeIndex( - std::vector<proto::Stream> &streams) const override; - - virtual void recordPosition() const override; - - virtual void writeDictionary() override; - - virtual void reset() override; - - private: - std::unique_ptr<ColumnWriter> keyWriter; - std::unique_ptr<ColumnWriter> elemWriter; - std::unique_ptr<RleEncoder> lengthEncoder; - RleVersion rleVersion; - }; - - MapColumnWriter::MapColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(options.getRleVersion()){ - std::unique_ptr<BufferedOutputStream> lengthStream = - factory.createStream(proto::Stream_Kind_LENGTH); - lengthEncoder = createRleEncoder(std::move(lengthStream), - false, - rleVersion, - memPool, - options.getAlignedBitpacking()); - - if (type.getSubtypeCount() > 0) { - keyWriter = buildWriter(*type.getSubtype(0), factory, options); - } - - if (type.getSubtypeCount() > 1) { - elemWriter = buildWriter(*type.getSubtype(1), factory, options); - } - - if (enableIndex) { - recordPosition(); - } - } - - MapColumnWriter::~MapColumnWriter() { - // PASS - } - - void MapColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch); - if (mapBatch == nullptr) { - throw InvalidArgument("Failed to cast to MapVectorBatch"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - int64_t* offsets = mapBatch->offsets.data() + offset; - const char* notNull = mapBatch->hasNulls ? - mapBatch->notNull.data() + offset : nullptr; - - uint64_t elemOffset = static_cast<uint64_t>(offsets[0]); - uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]); - - // translate offsets to lengths - for (uint64_t i = 0; i != numValues; ++i) { - offsets[i] = offsets[i + 1] - offsets[i]; - } - - lengthEncoder->add(offsets, numValues, notNull); - - // unnecessary to deal with null as keys and values are packed together - if (keyWriter.get()) { - keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues, nullptr); - } - if (elemWriter.get()) { - elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues, nullptr); - } - - if (enableIndex) { - if (!notNull) { - colIndexStatistics->increase(numValues); - } else { - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull[i]) { - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(offsets[i]); - } - } - } - colIndexStatistics->increase(count); - if (count < numValues) { - colIndexStatistics->setHasNull(true); - } - } - } - } - - void MapColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_LENGTH); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(lengthEncoder->flush()); - streams.push_back(stream); - - if (keyWriter.get()) { - keyWriter->flush(streams); - } - if (elemWriter.get()) { - elemWriter->flush(streams); - } - } - - void MapColumnWriter::writeIndex( - std::vector<proto::Stream> &streams) const { - ColumnWriter::writeIndex(streams); - if (keyWriter.get()) { - keyWriter->writeIndex(streams); - } - if (elemWriter.get()) { - elemWriter->writeIndex(streams); - } - } - - uint64_t MapColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += lengthEncoder->getBufferSize(); - if (keyWriter.get()) { - size += keyWriter->getEstimatedSize(); - } - if (elemWriter.get()) { - size += elemWriter->getEstimatedSize(); - } - return size; - } - - void MapColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(RleVersionMapper(rleVersion)); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - if (keyWriter.get()) { - keyWriter->getColumnEncoding(encodings); - } - if (elemWriter.get()) { - elemWriter->getColumnEncoding(encodings); - } - } - - void MapColumnWriter::getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getStripeStatistics(stats); - if (keyWriter.get()) { - keyWriter->getStripeStatistics(stats); - } - if (elemWriter.get()) { - elemWriter->getStripeStatistics(stats); - } - } - - void MapColumnWriter::mergeStripeStatsIntoFileStats() { - ColumnWriter::mergeStripeStatsIntoFileStats(); - if (keyWriter.get()) { - keyWriter->mergeStripeStatsIntoFileStats(); - } - if (elemWriter.get()) { - elemWriter->mergeStripeStatsIntoFileStats(); - } - } - - void MapColumnWriter::getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getFileStatistics(stats); - if (keyWriter.get()) { - keyWriter->getFileStatistics(stats); - } - if (elemWriter.get()) { - elemWriter->getFileStatistics(stats); - } - } - - void MapColumnWriter::mergeRowGroupStatsIntoStripeStats() { - ColumnWriter::mergeRowGroupStatsIntoStripeStats(); - if (keyWriter.get()) { - keyWriter->mergeRowGroupStatsIntoStripeStats(); - } - if (elemWriter.get()) { - elemWriter->mergeRowGroupStatsIntoStripeStats(); - } - } - - void MapColumnWriter::createRowIndexEntry() { - ColumnWriter::createRowIndexEntry(); - if (keyWriter.get()) { - keyWriter->createRowIndexEntry(); - } - if (elemWriter.get()) { - elemWriter->createRowIndexEntry(); - } - } - - void MapColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - lengthEncoder->recordPosition(rowIndexPosition.get()); - } - - void MapColumnWriter::reset() { - ColumnWriter::reset(); - if (keyWriter) { - keyWriter->reset(); - } - if (elemWriter) { - elemWriter->reset(); - } - } - - void MapColumnWriter::writeDictionary() { - if (keyWriter) { - keyWriter->writeDictionary(); - } - if (elemWriter) { - elemWriter->writeDictionary(); - } - } - - class UnionColumnWriter : public ColumnWriter { - public: - UnionColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options); - ~UnionColumnWriter() override; - - virtual void add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) override; - - virtual void flush(std::vector<proto::Stream>& streams) override; - - virtual uint64_t getEstimatedSize() const override; - - virtual void getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const override; - - virtual void getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const override; - - virtual void mergeStripeStatsIntoFileStats() override; - - virtual void mergeRowGroupStatsIntoStripeStats() override; - - virtual void createRowIndexEntry() override; - - virtual void writeIndex( - std::vector<proto::Stream> &streams) const override; - - virtual void recordPosition() const override; - - virtual void writeDictionary() override; - - virtual void reset() override; - - private: - std::unique_ptr<ByteRleEncoder> rleEncoder; - std::vector<ColumnWriter*> children; - }; - - UnionColumnWriter::UnionColumnWriter(const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options) { - - std::unique_ptr<BufferedOutputStream> dataStream = - factory.createStream(proto::Stream_Kind_DATA); - rleEncoder = createByteRleEncoder(std::move(dataStream)); - - for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) { - children.push_back(buildWriter(*type.getSubtype(i), - factory, - options).release()); - } - - if (enableIndex) { - recordPosition(); - } - } - - UnionColumnWriter::~UnionColumnWriter() { - for (uint32_t i = 0; i < children.size(); ++i) { - delete children[i]; - } - } - - void UnionColumnWriter::add(ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues, - const char* incomingMask) { - UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch); - if (unionBatch == nullptr) { - throw InvalidArgument("Failed to cast to UnionVectorBatch"); - } - - ColumnWriter::add(rowBatch, offset, numValues, incomingMask); - - const char* notNull = unionBatch->hasNulls ? - unionBatch->notNull.data() + offset : nullptr; - unsigned char * tags = unionBatch->tags.data() + offset; - uint64_t * offsets = unionBatch->offsets.data() + offset; - - std::vector<int64_t> childOffset(children.size(), -1); - std::vector<uint64_t> childLength(children.size(), 0); - - for (uint64_t i = 0; i != numValues; ++i) { - if (childOffset[tags[i]] == -1) { - childOffset[tags[i]] = static_cast<int64_t>(offsets[i]); - } - ++childLength[tags[i]]; - } - - rleEncoder->add(reinterpret_cast<char*>(tags), numValues, notNull); - - for (uint32_t i = 0; i < children.size(); ++i) { - if (childLength[i] > 0) { - children[i]->add(*unionBatch->children[i], - static_cast<uint64_t>(childOffset[i]), - childLength[i], nullptr); - } - } - - // update stats - if (enableIndex) { - if (!notNull) { - colIndexStatistics->increase(numValues); - } else { - uint64_t count = 0; - for (uint64_t i = 0; i < numValues; ++i) { - if (notNull[i]) { - ++count; - if (enableBloomFilter) { - bloomFilter->addLong(tags[i]); - } - } - } - colIndexStatistics->increase(count); - if (count < numValues) { - colIndexStatistics->setHasNull(true); - } - } - } - } - - void UnionColumnWriter::flush(std::vector<proto::Stream>& streams) { - ColumnWriter::flush(streams); - - proto::Stream stream; - stream.set_kind(proto::Stream_Kind_DATA); - stream.set_column(static_cast<uint32_t>(columnId)); - stream.set_length(rleEncoder->flush()); - streams.push_back(stream); - - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->flush(streams); - } - } - - void UnionColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { - ColumnWriter::writeIndex(streams); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->writeIndex(streams); - } - } - - uint64_t UnionColumnWriter::getEstimatedSize() const { - uint64_t size = ColumnWriter::getEstimatedSize(); - size += rleEncoder->getBufferSize(); - for (uint32_t i = 0; i < children.size(); ++i) { - size += children[i]->getEstimatedSize(); - } - return size; - } - - void UnionColumnWriter::getColumnEncoding( - std::vector<proto::ColumnEncoding>& encodings) const { - proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); - encoding.set_dictionarysize(0); - if (enableBloomFilter) { - encoding.set_bloomencoding(BloomFilterVersion::UTF8); - } - encodings.push_back(encoding); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->getColumnEncoding(encodings); - } - } - - void UnionColumnWriter::getStripeStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getStripeStatistics(stats); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->getStripeStatistics(stats); - } - } - - void UnionColumnWriter::mergeStripeStatsIntoFileStats() { - ColumnWriter::mergeStripeStatsIntoFileStats(); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->mergeStripeStatsIntoFileStats(); - } - } - - void UnionColumnWriter::getFileStatistics( - std::vector<proto::ColumnStatistics>& stats) const { - ColumnWriter::getFileStatistics(stats); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->getFileStatistics(stats); - } - } - - void UnionColumnWriter::mergeRowGroupStatsIntoStripeStats() { - ColumnWriter::mergeRowGroupStatsIntoStripeStats(); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->mergeRowGroupStatsIntoStripeStats(); - } - } - - void UnionColumnWriter::createRowIndexEntry() { - ColumnWriter::createRowIndexEntry(); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->createRowIndexEntry(); - } - } - - void UnionColumnWriter::recordPosition() const { - ColumnWriter::recordPosition(); - rleEncoder->recordPosition(rowIndexPosition.get()); - } - - void UnionColumnWriter::reset() { - ColumnWriter::reset(); - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->reset(); - } - } - - void UnionColumnWriter::writeDictionary() { - for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->writeDictionary(); - } - } - - std::unique_ptr<ColumnWriter> buildWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) { - switch (static_cast<int64_t>(type.getKind())) { - case STRUCT: - return std::unique_ptr<ColumnWriter>( - new StructColumnWriter( - type, - factory, - options)); - case INT: - case LONG: - case SHORT: - return std::unique_ptr<ColumnWriter>( - new IntegerColumnWriter( - type, - factory, - options)); - case BYTE: - return std::unique_ptr<ColumnWriter>( - new ByteColumnWriter( - type, - factory, - options)); - case BOOLEAN: - return std::unique_ptr<ColumnWriter>( - new BooleanColumnWriter( - type, - factory, - options)); - case DOUBLE: - return std::unique_ptr<ColumnWriter>( - new DoubleColumnWriter( - type, - factory, - options, - false)); - case FLOAT: - return std::unique_ptr<ColumnWriter>( - new DoubleColumnWriter( - type, - factory, - options, - true)); - case BINARY: - return std::unique_ptr<ColumnWriter>( - new BinaryColumnWriter( - type, - factory, - options)); - case STRING: - return std::unique_ptr<ColumnWriter>( - new StringColumnWriter( - type, - factory, - options)); - case CHAR: - return std::unique_ptr<ColumnWriter>( - new CharColumnWriter( - type, - factory, - options)); - case VARCHAR: - return std::unique_ptr<ColumnWriter>( - new VarCharColumnWriter( - type, - factory, - options)); - case DATE: - return std::unique_ptr<ColumnWriter>( - new DateColumnWriter( - type, - factory, - options)); - case TIMESTAMP: - return std::unique_ptr<ColumnWriter>( - new TimestampColumnWriter( - type, - factory, - options)); - case DECIMAL: - if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) { - return std::unique_ptr<ColumnWriter>( - new Decimal64ColumnWriter( - type, - factory, - options)); - } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) { - return std::unique_ptr<ColumnWriter>( - new Decimal128ColumnWriter( - type, - factory, - options)); - } else { - throw NotImplementedYet("Decimal precision more than 38 is not " - "supported"); - } - case LIST: - return std::unique_ptr<ColumnWriter>( - new ListColumnWriter( - type, - factory, - options)); - case MAP: - return std::unique_ptr<ColumnWriter>( - new MapColumnWriter( - type, - factory, - options)); - case UNION: - return std::unique_ptr<ColumnWriter>( - new UnionColumnWriter( - type, - factory, - options)); - default: - throw NotImplementedYet("Type is not supported yet for creating " - "ColumnWriter."); - } - } -} + secs[i] += 1; + } + + secs[i] -= timezone.getEpoch(); + nanos[i] = formatNano(nanos[i]); + } + } + tsStats->increase(count); + if (count < numValues) { + tsStats->setHasNull(true); + } + + secRleEncoder->add(secs, numValues, notNull); + nanoRleEncoder->add(nanos, numValues, notNull); + } + + void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(secRleEncoder->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(nanoRleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t TimestampColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += secRleEncoder->getBufferSize(); + size += nanoRleEncoder->getBufferSize(); + return size; + } + + void TimestampColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(RleVersionMapper(rleVersion)); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void TimestampColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + secRleEncoder->recordPosition(rowIndexPosition.get()); + nanoRleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DateColumnWriter : public IntegerColumnWriter { + public: + DateColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + }; + + DateColumnWriter::DateColumnWriter( + const Type &type, + const StreamsFactory &factory, + const WriterOptions &options) : + IntegerColumnWriter(type, factory, options) { + // PASS + } + + void DateColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const LongVectorBatch* longBatch = + dynamic_cast<const LongVectorBatch*>(&rowBatch); + if (longBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + + DateColumnStatisticsImpl* dateStats = + dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get()); + if (dateStats == nullptr) { + throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const int64_t* data = longBatch->data.data() + offset; + const char* notNull = longBatch->hasNulls ? + longBatch->notNull.data() + offset : nullptr; + + rleEncoder->add(data, numValues, notNull); + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + ++count; + dateStats->update(static_cast<int32_t>(data[i])); + if (enableBloomFilter) { + bloomFilter->addLong(data[i]); + } + } + } + dateStats->increase(count); + if (count < numValues) { + dateStats->setHasNull(true); + } + } + + class Decimal64ColumnWriter : public ColumnWriter { + public: + static const uint32_t MAX_PRECISION_64 = 18; + static const uint32_t MAX_PRECISION_128 = 38; + + Decimal64ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + RleVersion rleVersion; + uint64_t precision; + uint64_t scale; + std::unique_ptr<AppendOnlyBufferedStream> valueStream; + std::unique_ptr<RleEncoder> scaleEncoder; + + private: + char buffer[10]; + }; + + Decimal64ColumnWriter::Decimal64ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(options.getRleVersion()), + precision(type.getPrecision()), + scale(type.getScale()) { + valueStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + std::unique_ptr<BufferedOutputStream> scaleStream = + factory.createStream(proto::Stream_Kind_SECONDARY); + scaleEncoder = createRleEncoder(std::move(scaleStream), + true, + rleVersion, + memPool, + options.getAlignedBitpacking()); + + if (enableIndex) { + recordPosition(); + } + } + + void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const Decimal64VectorBatch* decBatch = + dynamic_cast<const Decimal64VectorBatch*>(&rowBatch); + if (decBatch == nullptr) { + throw InvalidArgument("Failed to cast to Decimal64VectorBatch"); + } + + DecimalColumnStatisticsImpl* decStats = + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); + if (decStats == nullptr) { + throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const char* notNull = decBatch->hasNulls ? + decBatch->notNull.data() + offset : nullptr; + const int64_t* values = decBatch->values.data() + offset; + + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + int64_t val = zigZag(values[i]); + char* data = buffer; + while (true) { + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val)); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val & 0x7f)); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + } + } + valueStream->write(buffer, static_cast<size_t>(data - buffer)); + ++count; + if (enableBloomFilter) { + std::string decimal = Decimal( + values[i], static_cast<int32_t>(scale)).toString(); + bloomFilter->addBytes( + decimal.c_str(), static_cast<int64_t>(decimal.size())); + } + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } + } + decStats->increase(count); + if (count < numValues) { + decStats->setHasNull(true); + } + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); + scaleEncoder->add(scales.data(), numValues, notNull); + } + + void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(valueStream->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(scaleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t Decimal64ColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += valueStream->getSize(); + size += scaleEncoder->getBufferSize(); + return size; + } + + void Decimal64ColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(RleVersionMapper(rleVersion)); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + } + + void Decimal64ColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + valueStream->recordPosition(rowIndexPosition.get()); + scaleEncoder->recordPosition(rowIndexPosition.get()); + } + + class Decimal128ColumnWriter : public Decimal64ColumnWriter { + public: + Decimal128ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + private: + char buffer[20]; + }; + + Decimal128ColumnWriter::Decimal128ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + Decimal64ColumnWriter(type, factory, options) { + // PASS + } + + // Zigzag encoding moves the sign bit to the least significant bit using the + // expression (val « 1) ^ (val » 63) and derives its name from the fact that + // positive and negative numbers alternate once encoded. + Int128 zigZagInt128(const Int128& value) { + bool isNegative = value < 0; + Int128 val = value.abs(); + val <<= 1; + if (isNegative) { + val -= 1; + } + return val; + } + + void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + const Decimal128VectorBatch* decBatch = + dynamic_cast<const Decimal128VectorBatch*>(&rowBatch); + if (decBatch == nullptr) { + throw InvalidArgument("Failed to cast to Decimal128VectorBatch"); + } + + DecimalColumnStatisticsImpl* decStats = + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); + if (decStats == nullptr) { + throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const char* notNull = decBatch->hasNulls ? + decBatch->notNull.data() + offset : nullptr; + const Int128* values = decBatch->values.data() + offset; + + // The current encoding of decimal columns stores the integer representation + // of the value as an unbounded length zigzag encoded base 128 varint. + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + Int128 val = zigZagInt128(values[i]); + char* data = buffer; + while (true) { + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val.getLowBits())); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f)); + val >>= 7; + } + } + valueStream->write(buffer, static_cast<size_t>(data - buffer)); + + ++count; + if (enableBloomFilter) { + std::string decimal = Decimal( + values[i], static_cast<int32_t>(scale)).toString(); + bloomFilter->addBytes( + decimal.c_str(), static_cast<int64_t>(decimal.size())); + } + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } + } + decStats->increase(count); + if (count < numValues) { + decStats->setHasNull(true); + } + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); + scaleEncoder->add(scales.data(), numValues, notNull); + } + + class ListColumnWriter : public ColumnWriter { + public: + ListColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + ~ListColumnWriter() override; + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void mergeStripeStatsIntoFileStats() override; + + virtual void mergeRowGroupStatsIntoStripeStats() override; + + virtual void createRowIndexEntry() override; + + virtual void writeIndex( + std::vector<proto::Stream> &streams) const override; + + virtual void recordPosition() const override; + + virtual void writeDictionary() override; + + virtual void reset() override; + + private: + std::unique_ptr<RleEncoder> lengthEncoder; + RleVersion rleVersion; + std::unique_ptr<ColumnWriter> child; + }; + + ListColumnWriter::ListColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(options.getRleVersion()){ + + std::unique_ptr<BufferedOutputStream> lengthStream = + factory.createStream(proto::Stream_Kind_LENGTH); + lengthEncoder = createRleEncoder(std::move(lengthStream), + false, + rleVersion, + memPool, + options.getAlignedBitpacking()); + + if (type.getSubtypeCount() == 1) { + child = buildWriter(*type.getSubtype(0), factory, options); + } + + if (enableIndex) { + recordPosition(); + } + } + + ListColumnWriter::~ListColumnWriter() { + // PASS + } + + void ListColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch); + if (listBatch == nullptr) { + throw InvalidArgument("Failed to cast to ListVectorBatch"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + int64_t* offsets = listBatch->offsets.data() + offset; + const char* notNull = listBatch->hasNulls ? + listBatch->notNull.data() + offset : nullptr; + + uint64_t elemOffset = static_cast<uint64_t>(offsets[0]); + uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]); + + // translate offsets to lengths + for (uint64_t i = 0; i != numValues; ++i) { + offsets[i] = offsets[i + 1] - offsets[i]; + } + + // unnecessary to deal with null as elements are packed together + if (child.get()) { + child->add(*listBatch->elements, elemOffset, totalNumValues, nullptr); + } + lengthEncoder->add(offsets, numValues, notNull); + + if (enableIndex) { + if (!notNull) { + colIndexStatistics->increase(numValues); + } else { + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull[i]) { + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(offsets[i]); + } + } + } + colIndexStatistics->increase(count); + if (count < numValues) { + colIndexStatistics->setHasNull(true); + } + } + } + } + + void ListColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_LENGTH); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(lengthEncoder->flush()); + streams.push_back(stream); + + if (child.get()) { + child->flush(streams); + } + } + + void ListColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { + ColumnWriter::writeIndex(streams); + if (child.get()) { + child->writeIndex(streams); + } + } + + uint64_t ListColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + if (child.get()) { + size += lengthEncoder->getBufferSize(); + size += child->getEstimatedSize(); + } + return size; + } + + void ListColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(RleVersionMapper(rleVersion)); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + if (child.get()) { + child->getColumnEncoding(encodings); + } + } + + void ListColumnWriter::getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getStripeStatistics(stats); + if (child.get()) { + child->getStripeStatistics(stats); + } + } + + void ListColumnWriter::mergeStripeStatsIntoFileStats() { + ColumnWriter::mergeStripeStatsIntoFileStats(); + if (child.get()) { + child->mergeStripeStatsIntoFileStats(); + } + } + + void ListColumnWriter::getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getFileStatistics(stats); + if (child.get()) { + child->getFileStatistics(stats); + } + } + + void ListColumnWriter::mergeRowGroupStatsIntoStripeStats() { + ColumnWriter::mergeRowGroupStatsIntoStripeStats(); + if (child.get()) { + child->mergeRowGroupStatsIntoStripeStats(); + } + } + + void ListColumnWriter::createRowIndexEntry() { + ColumnWriter::createRowIndexEntry(); + if (child.get()) { + child->createRowIndexEntry(); + } + } + + void ListColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + lengthEncoder->recordPosition(rowIndexPosition.get()); + } + + void ListColumnWriter::reset() { + ColumnWriter::reset(); + if (child) { + child->reset(); + } + } + + void ListColumnWriter::writeDictionary() { + if (child) { + child->writeDictionary(); + } + } + + class MapColumnWriter : public ColumnWriter { + public: + MapColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + ~MapColumnWriter() override; + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void mergeStripeStatsIntoFileStats() override; + + virtual void mergeRowGroupStatsIntoStripeStats() override; + + virtual void createRowIndexEntry() override; + + virtual void writeIndex( + std::vector<proto::Stream> &streams) const override; + + virtual void recordPosition() const override; + + virtual void writeDictionary() override; + + virtual void reset() override; + + private: + std::unique_ptr<ColumnWriter> keyWriter; + std::unique_ptr<ColumnWriter> elemWriter; + std::unique_ptr<RleEncoder> lengthEncoder; + RleVersion rleVersion; + }; + + MapColumnWriter::MapColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(options.getRleVersion()){ + std::unique_ptr<BufferedOutputStream> lengthStream = + factory.createStream(proto::Stream_Kind_LENGTH); + lengthEncoder = createRleEncoder(std::move(lengthStream), + false, + rleVersion, + memPool, + options.getAlignedBitpacking()); + + if (type.getSubtypeCount() > 0) { + keyWriter = buildWriter(*type.getSubtype(0), factory, options); + } + + if (type.getSubtypeCount() > 1) { + elemWriter = buildWriter(*type.getSubtype(1), factory, options); + } + + if (enableIndex) { + recordPosition(); + } + } + + MapColumnWriter::~MapColumnWriter() { + // PASS + } + + void MapColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch); + if (mapBatch == nullptr) { + throw InvalidArgument("Failed to cast to MapVectorBatch"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + int64_t* offsets = mapBatch->offsets.data() + offset; + const char* notNull = mapBatch->hasNulls ? + mapBatch->notNull.data() + offset : nullptr; + + uint64_t elemOffset = static_cast<uint64_t>(offsets[0]); + uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]); + + // translate offsets to lengths + for (uint64_t i = 0; i != numValues; ++i) { + offsets[i] = offsets[i + 1] - offsets[i]; + } + + lengthEncoder->add(offsets, numValues, notNull); + + // unnecessary to deal with null as keys and values are packed together + if (keyWriter.get()) { + keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues, nullptr); + } + if (elemWriter.get()) { + elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues, nullptr); + } + + if (enableIndex) { + if (!notNull) { + colIndexStatistics->increase(numValues); + } else { + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull[i]) { + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(offsets[i]); + } + } + } + colIndexStatistics->increase(count); + if (count < numValues) { + colIndexStatistics->setHasNull(true); + } + } + } + } + + void MapColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_LENGTH); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(lengthEncoder->flush()); + streams.push_back(stream); + + if (keyWriter.get()) { + keyWriter->flush(streams); + } + if (elemWriter.get()) { + elemWriter->flush(streams); + } + } + + void MapColumnWriter::writeIndex( + std::vector<proto::Stream> &streams) const { + ColumnWriter::writeIndex(streams); + if (keyWriter.get()) { + keyWriter->writeIndex(streams); + } + if (elemWriter.get()) { + elemWriter->writeIndex(streams); + } + } + + uint64_t MapColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += lengthEncoder->getBufferSize(); + if (keyWriter.get()) { + size += keyWriter->getEstimatedSize(); + } + if (elemWriter.get()) { + size += elemWriter->getEstimatedSize(); + } + return size; + } + + void MapColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(RleVersionMapper(rleVersion)); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + if (keyWriter.get()) { + keyWriter->getColumnEncoding(encodings); + } + if (elemWriter.get()) { + elemWriter->getColumnEncoding(encodings); + } + } + + void MapColumnWriter::getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getStripeStatistics(stats); + if (keyWriter.get()) { + keyWriter->getStripeStatistics(stats); + } + if (elemWriter.get()) { + elemWriter->getStripeStatistics(stats); + } + } + + void MapColumnWriter::mergeStripeStatsIntoFileStats() { + ColumnWriter::mergeStripeStatsIntoFileStats(); + if (keyWriter.get()) { + keyWriter->mergeStripeStatsIntoFileStats(); + } + if (elemWriter.get()) { + elemWriter->mergeStripeStatsIntoFileStats(); + } + } + + void MapColumnWriter::getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getFileStatistics(stats); + if (keyWriter.get()) { + keyWriter->getFileStatistics(stats); + } + if (elemWriter.get()) { + elemWriter->getFileStatistics(stats); + } + } + + void MapColumnWriter::mergeRowGroupStatsIntoStripeStats() { + ColumnWriter::mergeRowGroupStatsIntoStripeStats(); + if (keyWriter.get()) { + keyWriter->mergeRowGroupStatsIntoStripeStats(); + } + if (elemWriter.get()) { + elemWriter->mergeRowGroupStatsIntoStripeStats(); + } + } + + void MapColumnWriter::createRowIndexEntry() { + ColumnWriter::createRowIndexEntry(); + if (keyWriter.get()) { + keyWriter->createRowIndexEntry(); + } + if (elemWriter.get()) { + elemWriter->createRowIndexEntry(); + } + } + + void MapColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + lengthEncoder->recordPosition(rowIndexPosition.get()); + } + + void MapColumnWriter::reset() { + ColumnWriter::reset(); + if (keyWriter) { + keyWriter->reset(); + } + if (elemWriter) { + elemWriter->reset(); + } + } + + void MapColumnWriter::writeDictionary() { + if (keyWriter) { + keyWriter->writeDictionary(); + } + if (elemWriter) { + elemWriter->writeDictionary(); + } + } + + class UnionColumnWriter : public ColumnWriter { + public: + UnionColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + ~UnionColumnWriter() override; + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const override; + + virtual void mergeStripeStatsIntoFileStats() override; + + virtual void mergeRowGroupStatsIntoStripeStats() override; + + virtual void createRowIndexEntry() override; + + virtual void writeIndex( + std::vector<proto::Stream> &streams) const override; + + virtual void recordPosition() const override; + + virtual void writeDictionary() override; + + virtual void reset() override; + + private: + std::unique_ptr<ByteRleEncoder> rleEncoder; + std::vector<ColumnWriter*> children; + }; + + UnionColumnWriter::UnionColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) { + + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + rleEncoder = createByteRleEncoder(std::move(dataStream)); + + for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) { + children.push_back(buildWriter(*type.getSubtype(i), + factory, + options).release()); + } + + if (enableIndex) { + recordPosition(); + } + } + + UnionColumnWriter::~UnionColumnWriter() { + for (uint32_t i = 0; i < children.size(); ++i) { + delete children[i]; + } + } + + void UnionColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues, + const char* incomingMask) { + UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch); + if (unionBatch == nullptr) { + throw InvalidArgument("Failed to cast to UnionVectorBatch"); + } + + ColumnWriter::add(rowBatch, offset, numValues, incomingMask); + + const char* notNull = unionBatch->hasNulls ? + unionBatch->notNull.data() + offset : nullptr; + unsigned char * tags = unionBatch->tags.data() + offset; + uint64_t * offsets = unionBatch->offsets.data() + offset; + + std::vector<int64_t> childOffset(children.size(), -1); + std::vector<uint64_t> childLength(children.size(), 0); + + for (uint64_t i = 0; i != numValues; ++i) { + if (childOffset[tags[i]] == -1) { + childOffset[tags[i]] = static_cast<int64_t>(offsets[i]); + } + ++childLength[tags[i]]; + } + + rleEncoder->add(reinterpret_cast<char*>(tags), numValues, notNull); + + for (uint32_t i = 0; i < children.size(); ++i) { + if (childLength[i] > 0) { + children[i]->add(*unionBatch->children[i], + static_cast<uint64_t>(childOffset[i]), + childLength[i], nullptr); + } + } + + // update stats + if (enableIndex) { + if (!notNull) { + colIndexStatistics->increase(numValues); + } else { + uint64_t count = 0; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull[i]) { + ++count; + if (enableBloomFilter) { + bloomFilter->addLong(tags[i]); + } + } + } + colIndexStatistics->increase(count); + if (count < numValues) { + colIndexStatistics->setHasNull(true); + } + } + } + } + + void UnionColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->flush(streams); + } + } + + void UnionColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { + ColumnWriter::writeIndex(streams); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->writeIndex(streams); + } + } + + uint64_t UnionColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + for (uint32_t i = 0; i < children.size(); ++i) { + size += children[i]->getEstimatedSize(); + } + return size; + } + + void UnionColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + if (enableBloomFilter) { + encoding.set_bloomencoding(BloomFilterVersion::UTF8); + } + encodings.push_back(encoding); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->getColumnEncoding(encodings); + } + } + + void UnionColumnWriter::getStripeStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getStripeStatistics(stats); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->getStripeStatistics(stats); + } + } + + void UnionColumnWriter::mergeStripeStatsIntoFileStats() { + ColumnWriter::mergeStripeStatsIntoFileStats(); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->mergeStripeStatsIntoFileStats(); + } + } + + void UnionColumnWriter::getFileStatistics( + std::vector<proto::ColumnStatistics>& stats) const { + ColumnWriter::getFileStatistics(stats); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->getFileStatistics(stats); + } + } + + void UnionColumnWriter::mergeRowGroupStatsIntoStripeStats() { + ColumnWriter::mergeRowGroupStatsIntoStripeStats(); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->mergeRowGroupStatsIntoStripeStats(); + } + } + + void UnionColumnWriter::createRowIndexEntry() { + ColumnWriter::createRowIndexEntry(); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->createRowIndexEntry(); + } + } + + void UnionColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + void UnionColumnWriter::reset() { + ColumnWriter::reset(); + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->reset(); + } + } + + void UnionColumnWriter::writeDictionary() { + for (uint32_t i = 0; i < children.size(); ++i) { + children[i]->writeDictionary(); + } + } + + std::unique_ptr<ColumnWriter> buildWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) { + switch (static_cast<int64_t>(type.getKind())) { + case STRUCT: + return std::unique_ptr<ColumnWriter>( + new StructColumnWriter( + type, + factory, + options)); + case INT: + case LONG: + case SHORT: + return std::unique_ptr<ColumnWriter>( + new IntegerColumnWriter( + type, + factory, + options)); + case BYTE: + return std::unique_ptr<ColumnWriter>( + new ByteColumnWriter( + type, + factory, + options)); + case BOOLEAN: + return std::unique_ptr<ColumnWriter>( + new BooleanColumnWriter( + type, + factory, + options)); + case DOUBLE: + return std::unique_ptr<ColumnWriter>( + new DoubleColumnWriter( + type, + factory, + options, + false)); + case FLOAT: + return std::unique_ptr<ColumnWriter>( + new DoubleColumnWriter( + type, + factory, + options, + true)); + case BINARY: + return std::unique_ptr<ColumnWriter>( + new BinaryColumnWriter( + type, + factory, + options)); + case STRING: + return std::unique_ptr<ColumnWriter>( + new StringColumnWriter( + type, + factory, + options)); + case CHAR: + return std::unique_ptr<ColumnWriter>( + new CharColumnWriter( + type, + factory, + options)); + case VARCHAR: + return std::unique_ptr<ColumnWriter>( + new VarCharColumnWriter( + type, + factory, + options)); + case DATE: + return std::unique_ptr<ColumnWriter>( + new DateColumnWriter( + type, + factory, + options)); + case TIMESTAMP: + return std::unique_ptr<ColumnWriter>( + new TimestampColumnWriter( + type, + factory, + options)); + case DECIMAL: + if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) { + return std::unique_ptr<ColumnWriter>( + new Decimal64ColumnWriter( + type, + factory, + options)); + } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) { + return std::unique_ptr<ColumnWriter>( + new Decimal128ColumnWriter( + type, + factory, + options)); + } else { + throw NotImplementedYet("Decimal precision more than 38 is not " + "supported"); + } + case LIST: + return std::unique_ptr<ColumnWriter>( + new ListColumnWriter( + type, + factory, + options)); + case MAP: + return std::unique_ptr<ColumnWriter>( + new MapColumnWriter( + type, + factory, + options)); + case UNION: + return std::unique_ptr<ColumnWriter>( + new UnionColumnWriter( + type, + factory, + options)); + default: + throw NotImplementedYet("Type is not supported yet for creating " + "ColumnWriter."); + } + } +} |