aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/orc/c++/src/ColumnWriter.cc
diff options
context:
space:
mode:
authoriaz1607 <iaz1607@yandex-team.ru>2022-02-10 16:45:37 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:37 +0300
commite5437feb4ac2d2dc044e1090b9312dde5ef197e0 (patch)
treef5a238c69dd20a1fa2092127a31b8aff25020f7d /contrib/libs/apache/orc/c++/src/ColumnWriter.cc
parentf4945d0a44b8770f0801de3056aa41639b0b7bd2 (diff)
downloadydb-e5437feb4ac2d2dc044e1090b9312dde5ef197e0.tar.gz
Restoring authorship annotation for <iaz1607@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/apache/orc/c++/src/ColumnWriter.cc')
-rw-r--r--contrib/libs/apache/orc/c++/src/ColumnWriter.cc6024
1 files changed, 3012 insertions, 3012 deletions
diff --git a/contrib/libs/apache/orc/c++/src/ColumnWriter.cc b/contrib/libs/apache/orc/c++/src/ColumnWriter.cc
index 1408a15457..8d4d00cc61 100644
--- a/contrib/libs/apache/orc/c++/src/ColumnWriter.cc
+++ b/contrib/libs/apache/orc/c++/src/ColumnWriter.cc
@@ -1,3013 +1,3013 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "orc/Int128.hh"
-#include "orc/Writer.hh"
-
-#include "ByteRLE.hh"
-#include "ColumnWriter.hh"
-#include "RLE.hh"
-#include "Statistics.hh"
-#include "Timezone.hh"
-
-namespace orc {
- StreamsFactory::~StreamsFactory() {
- //PASS
- }
-
- class StreamsFactoryImpl : public StreamsFactory {
- public:
- StreamsFactoryImpl(
- const WriterOptions& writerOptions,
- OutputStream* outputStream) :
- options(writerOptions),
- outStream(outputStream) {
- }
-
- virtual std::unique_ptr<BufferedOutputStream>
- createStream(proto::Stream_Kind kind) const override;
- private:
- const WriterOptions& options;
- OutputStream* outStream;
- };
-
- std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
- proto::Stream_Kind) const {
- // In the future, we can decide compression strategy and modifier
- // based on stream kind. But for now we just use the setting from
- // WriterOption
- return createCompressor(
- options.getCompression(),
- outStream,
- options.getCompressionStrategy(),
- // BufferedOutputStream initial capacity
- 1 * 1024 * 1024,
- options.getCompressionBlockSize(),
- *options.getMemoryPool());
- }
-
- std::unique_ptr<StreamsFactory> createStreamsFactory(
- const WriterOptions& options,
- OutputStream* outStream) {
- return std::unique_ptr<StreamsFactory>(
- new StreamsFactoryImpl(options, outStream));
- }
-
- RowIndexPositionRecorder::~RowIndexPositionRecorder() {
- // PASS
- }
-
- proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion)
- {
- switch (rleVersion)
- {
- case RleVersion_1:
- return proto::ColumnEncoding_Kind_DIRECT;
- case RleVersion_2:
- return proto::ColumnEncoding_Kind_DIRECT_V2;
- default:
- throw InvalidArgument("Invalid param");
- }
- }
-
- ColumnWriter::ColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- columnId(type.getColumnId()),
- colIndexStatistics(),
- colStripeStatistics(),
- colFileStatistics(),
- enableIndex(options.getEnableIndex()),
- rowIndex(),
- rowIndexEntry(),
- rowIndexPosition(),
- enableBloomFilter(false),
- memPool(*options.getMemoryPool()),
- indexStream(),
- bloomFilterStream() {
-
- std::unique_ptr<BufferedOutputStream> presentStream =
- factory.createStream(proto::Stream_Kind_PRESENT);
- notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
-
- colIndexStatistics = createColumnStatistics(type);
- colStripeStatistics = createColumnStatistics(type);
- colFileStatistics = createColumnStatistics(type);
-
- if (enableIndex) {
- rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
- rowIndexEntry =
- std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
- rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
- new RowIndexPositionRecorder(*rowIndexEntry));
- indexStream =
- factory.createStream(proto::Stream_Kind_ROW_INDEX);
-
- // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported
- if (options.isColumnUseBloomFilter(columnId)
- && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) {
- enableBloomFilter = true;
- bloomFilter.reset(new BloomFilterImpl(
- options.getRowIndexStride(), options.getBloomFilterFPP()));
- bloomFilterIndex.reset(new proto::BloomFilterIndex());
- bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8);
- }
- }
- }
-
- ColumnWriter::~ColumnWriter() {
- // PASS
- }
-
- void ColumnWriter::add(ColumnVectorBatch& batch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask);
- }
-
- void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_PRESENT);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(notNullEncoder->flush());
- streams.push_back(stream);
- }
-
- uint64_t ColumnWriter::getEstimatedSize() const {
- return notNullEncoder->getBufferSize();
- }
-
- void ColumnWriter::getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- getProtoBufStatistics(stats, colStripeStatistics.get());
- }
-
- void ColumnWriter::mergeStripeStatsIntoFileStats() {
- colFileStatistics->merge(*colStripeStatistics);
- colStripeStatistics->reset();
- }
-
- void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
- colStripeStatistics->merge(*colIndexStatistics);
- colIndexStatistics->reset();
- }
-
- void ColumnWriter::getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- getProtoBufStatistics(stats, colFileStatistics.get());
- }
-
- void ColumnWriter::createRowIndexEntry() {
- proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics();
- colIndexStatistics->toProtoBuf(*indexStats);
-
- *rowIndex->add_entry() = *rowIndexEntry;
-
- rowIndexEntry->clear_positions();
- rowIndexEntry->clear_statistics();
-
- colStripeStatistics->merge(*colIndexStatistics);
- colIndexStatistics->reset();
-
- addBloomFilterEntry();
-
- recordPosition();
- }
-
- void ColumnWriter::addBloomFilterEntry() {
- if (enableBloomFilter) {
- BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter());
- bloomFilter->reset();
- }
- }
-
- void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
- // write row index to output stream
- rowIndex->SerializeToZeroCopyStream(indexStream.get());
-
- // construct row index stream
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_ROW_INDEX);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(indexStream->flush());
- streams.push_back(stream);
-
- // write BLOOM_FILTER_UTF8 stream
- if (enableBloomFilter) {
- if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) {
- throw std::logic_error("Failed to write bloom filter stream.");
- }
- stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(bloomFilterStream->flush());
- streams.push_back(stream);
- }
- }
-
- void ColumnWriter::recordPosition() const {
- notNullEncoder->recordPosition(rowIndexPosition.get());
- }
-
- void ColumnWriter::reset() {
- if (enableIndex) {
- // clear row index
- rowIndex->clear_entry();
- rowIndexEntry->clear_positions();
- rowIndexEntry->clear_statistics();
-
- // write current positions
- recordPosition();
- }
-
- if (enableBloomFilter) {
- bloomFilter->reset();
- bloomFilterIndex->clear_bloomfilter();
- }
- }
-
- void ColumnWriter::writeDictionary() {
- // PASS
- }
-
- class StructColumnWriter : public ColumnWriter {
- public:
- StructColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
- ~StructColumnWriter() override;
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void mergeStripeStatsIntoFileStats() override;
-
- virtual void mergeRowGroupStatsIntoStripeStats() override;
-
- virtual void createRowIndexEntry() override;
-
- virtual void writeIndex(
- std::vector<proto::Stream> &streams) const override;
-
- virtual void writeDictionary() override;
-
- virtual void reset() override;
-
- private:
- std::vector<ColumnWriter *> children;
- };
-
- StructColumnWriter::StructColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options) {
- for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
- const Type& child = *type.getSubtype(i);
- children.push_back(buildWriter(child, factory, options).release());
- }
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- StructColumnWriter::~StructColumnWriter() {
- for (uint32_t i = 0; i < children.size(); ++i) {
- delete children[i];
- }
- }
-
- void StructColumnWriter::add(
- ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const StructVectorBatch* structBatch =
- dynamic_cast<const StructVectorBatch *>(&rowBatch);
- if (structBatch == nullptr) {
- throw InvalidArgument("Failed to cast to StructVectorBatch");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
- const char* notNull = structBatch->hasNulls ?
- structBatch->notNull.data() + offset : nullptr;
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->add(*structBatch->fields[i], offset, numValues, notNull);
- }
-
- // update stats
- if (!notNull) {
- colIndexStatistics->increase(numValues);
- } else {
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull[i]) {
- ++count;
- }
- }
- colIndexStatistics->increase(count);
- if (count < numValues) {
- colIndexStatistics->setHasNull(true);
- }
- }
- }
-
- void StructColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->flush(streams);
- }
- }
-
- void StructColumnWriter::writeIndex(
- std::vector<proto::Stream> &streams) const {
- ColumnWriter::writeIndex(streams);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->writeIndex(streams);
- }
- }
-
- uint64_t StructColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- for (uint32_t i = 0; i < children.size(); ++i) {
- size += children[i]->getEstimatedSize();
- }
- return size;
- }
-
- void StructColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
- encoding.set_dictionarysize(0);
- encodings.push_back(encoding);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->getColumnEncoding(encodings);
- }
- }
-
- void StructColumnWriter::getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getStripeStatistics(stats);
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->getStripeStatistics(stats);
- }
- }
-
- void StructColumnWriter::mergeStripeStatsIntoFileStats() {
- ColumnWriter::mergeStripeStatsIntoFileStats();
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->mergeStripeStatsIntoFileStats();
- }
- }
-
- void StructColumnWriter::getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getFileStatistics(stats);
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->getFileStatistics(stats);
- }
- }
-
- void StructColumnWriter::mergeRowGroupStatsIntoStripeStats() {
- ColumnWriter::mergeRowGroupStatsIntoStripeStats();
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->mergeRowGroupStatsIntoStripeStats();
- }
- }
-
- void StructColumnWriter::createRowIndexEntry() {
- ColumnWriter::createRowIndexEntry();
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->createRowIndexEntry();
- }
- }
-
- void StructColumnWriter::reset() {
- ColumnWriter::reset();
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->reset();
- }
- }
-
- void StructColumnWriter::writeDictionary() {
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->writeDictionary();
- }
- }
-
- class IntegerColumnWriter : public ColumnWriter {
- public:
- IntegerColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- protected:
- std::unique_ptr<RleEncoder> rleEncoder;
-
- private:
- RleVersion rleVersion;
- };
-
- IntegerColumnWriter::IntegerColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options),
- rleVersion(options.getRleVersion()) {
- std::unique_ptr<BufferedOutputStream> dataStream =
- factory.createStream(proto::Stream_Kind_DATA);
- rleEncoder = createRleEncoder(
- std::move(dataStream),
- true,
- rleVersion,
- memPool,
- options.getAlignedBitpacking());
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- void IntegerColumnWriter::add(
- ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const LongVectorBatch* longBatch =
- dynamic_cast<const LongVectorBatch*>(&rowBatch);
- if (longBatch == nullptr) {
- throw InvalidArgument("Failed to cast to LongVectorBatch");
- }
- IntegerColumnStatisticsImpl* intStats =
- dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
- if (intStats == nullptr) {
- throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const int64_t* data = longBatch->data.data() + offset;
- const char* notNull = longBatch->hasNulls ?
- longBatch->notNull.data() + offset : nullptr;
-
- rleEncoder->add(data, numValues, notNull);
-
- // update stats
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull == nullptr || notNull[i]) {
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(data[i]);
- }
- intStats->update(data[i], 1);
- }
- }
- intStats->increase(count);
- if (count < numValues) {
- intStats->setHasNull(true);
- }
- }
-
- void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_DATA);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(rleEncoder->flush());
- streams.push_back(stream);
- }
-
- uint64_t IntegerColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += rleEncoder->getBufferSize();
- return size;
- }
-
- void IntegerColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(RleVersionMapper(rleVersion));
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void IntegerColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- rleEncoder->recordPosition(rowIndexPosition.get());
- }
-
- class ByteColumnWriter : public ColumnWriter {
- public:
- ByteColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- private:
- std::unique_ptr<ByteRleEncoder> byteRleEncoder;
- };
-
- ByteColumnWriter::ByteColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options) {
- std::unique_ptr<BufferedOutputStream> dataStream =
- factory.createStream(proto::Stream_Kind_DATA);
- byteRleEncoder = createByteRleEncoder(std::move(dataStream));
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
- if (byteBatch == nullptr) {
- throw InvalidArgument("Failed to cast to LongVectorBatch");
- }
- IntegerColumnStatisticsImpl* intStats =
- dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
- if (intStats == nullptr) {
- throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- int64_t* data = byteBatch->data.data() + offset;
- const char* notNull = byteBatch->hasNulls ?
- byteBatch->notNull.data() + offset : nullptr;
-
- char* byteData = reinterpret_cast<char*>(data);
- for (uint64_t i = 0; i < numValues; ++i) {
- byteData[i] = static_cast<char>(data[i]);
- }
- byteRleEncoder->add(byteData, numValues, notNull);
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull == nullptr || notNull[i]) {
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(data[i]);
- }
- intStats->update(static_cast<int64_t>(byteData[i]), 1);
- }
- }
- intStats->increase(count);
- if (count < numValues) {
- intStats->setHasNull(true);
- }
- }
-
- void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_DATA);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(byteRleEncoder->flush());
- streams.push_back(stream);
- }
-
- uint64_t ByteColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += byteRleEncoder->getBufferSize();
- return size;
- }
-
- void ByteColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void ByteColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- byteRleEncoder->recordPosition(rowIndexPosition.get());
- }
-
- class BooleanColumnWriter : public ColumnWriter {
- public:
- BooleanColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- private:
- std::unique_ptr<ByteRleEncoder> rleEncoder;
- };
-
- BooleanColumnWriter::BooleanColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options) {
- std::unique_ptr<BufferedOutputStream> dataStream =
- factory.createStream(proto::Stream_Kind_DATA);
- rleEncoder = createBooleanRleEncoder(std::move(dataStream));
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
- if (byteBatch == nullptr) {
- throw InvalidArgument("Failed to cast to LongVectorBatch");
- }
- BooleanColumnStatisticsImpl* boolStats =
- dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
- if (boolStats == nullptr) {
- throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- int64_t* data = byteBatch->data.data() + offset;
- const char* notNull = byteBatch->hasNulls ?
- byteBatch->notNull.data() + offset : nullptr;
-
- char* byteData = reinterpret_cast<char*>(data);
- for (uint64_t i = 0; i < numValues; ++i) {
- byteData[i] = static_cast<char>(data[i]);
- }
- rleEncoder->add(byteData, numValues, notNull);
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull == nullptr || notNull[i]) {
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(data[i]);
- }
- boolStats->update(byteData[i] != 0, 1);
- }
- }
- boolStats->increase(count);
- if (count < numValues) {
- boolStats->setHasNull(true);
- }
- }
-
- void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_DATA);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(rleEncoder->flush());
- streams.push_back(stream);
- }
-
- uint64_t BooleanColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += rleEncoder->getBufferSize();
- return size;
- }
-
- void BooleanColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void BooleanColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- rleEncoder->recordPosition(rowIndexPosition.get());
- }
-
- class DoubleColumnWriter : public ColumnWriter {
- public:
- DoubleColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options,
- bool isFloat);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- private:
- bool isFloat;
- std::unique_ptr<AppendOnlyBufferedStream> dataStream;
- DataBuffer<char> buffer;
- };
-
- DoubleColumnWriter::DoubleColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options,
- bool isFloatType) :
- ColumnWriter(type, factory, options),
- isFloat(isFloatType),
- buffer(*options.getMemoryPool()) {
- dataStream.reset(new AppendOnlyBufferedStream(
- factory.createStream(proto::Stream_Kind_DATA)));
- buffer.resize(isFloat ? 4 : 8);
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- // Floating point types are stored using IEEE 754 floating point bit layout.
- // Float columns use 4 bytes per value and double columns use 8 bytes.
- template <typename FLOAT_TYPE, typename INTEGER_TYPE>
- inline void encodeFloatNum(FLOAT_TYPE input, char* output) {
- INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input);
- for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) {
- output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff);
- }
- }
-
- void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const DoubleVectorBatch* dblBatch =
- dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
- if (dblBatch == nullptr) {
- throw InvalidArgument("Failed to cast to DoubleVectorBatch");
- }
- DoubleColumnStatisticsImpl* doubleStats =
- dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
- if (doubleStats == nullptr) {
- throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const double* doubleData = dblBatch->data.data() + offset;
- const char* notNull = dblBatch->hasNulls ?
- dblBatch->notNull.data() + offset : nullptr;
-
- size_t bytes = isFloat ? 4 : 8;
- char* data = buffer.data();
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- if (isFloat) {
- encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data);
- } else {
- encodeFloatNum<double, int64_t>(doubleData[i], data);
- }
- dataStream->write(data, bytes);
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addDouble(doubleData[i]);
- }
- doubleStats->update(doubleData[i]);
- }
- }
- doubleStats->increase(count);
- if (count < numValues) {
- doubleStats->setHasNull(true);
- }
- }
-
- void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_DATA);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(dataStream->flush());
- streams.push_back(stream);
- }
-
- uint64_t DoubleColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += dataStream->getSize();
- return size;
- }
-
- void DoubleColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void DoubleColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- dataStream->recordPosition(rowIndexPosition.get());
- }
-
- /**
- * Implementation of increasing sorted string dictionary
- */
- class SortedStringDictionary {
- public:
- struct DictEntry {
- DictEntry(const char * str, size_t len):data(str),length(len) {}
- const char * data;
- size_t length;
- };
-
- SortedStringDictionary():totalLength(0) {}
-
- // insert a new string into dictionary, return its insertion order
- size_t insert(const char * data, size_t len);
-
- // write dictionary data & length to output buffer
- void flush(AppendOnlyBufferedStream * dataStream,
- RleEncoder * lengthEncoder) const;
-
- // reorder input index buffer from insertion order to dictionary order
- void reorder(std::vector<int64_t>& idxBuffer) const;
-
- // get dict entries in insertion order
- void getEntriesInInsertionOrder(std::vector<const DictEntry *>&) const;
-
- // return count of entries
- size_t size() const;
-
- // return total length of strings in the dictioanry
- uint64_t length() const;
-
- void clear();
-
- private:
- struct LessThan {
- bool operator()(const DictEntry& left, const DictEntry& right) const {
- int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
- if (ret != 0) {
- return ret < 0;
- }
- return left.length < right.length;
- }
- };
-
- std::map<DictEntry, size_t, LessThan> dict;
- std::vector<std::vector<char>> data;
- uint64_t totalLength;
-
- // use friend class here to avoid being bothered by const function calls
- friend class StringColumnWriter;
- friend class CharColumnWriter;
- friend class VarCharColumnWriter;
- // store indexes of insertion order in the dictionary for not-null rows
- std::vector<int64_t> idxInDictBuffer;
- };
-
- // insert a new string into dictionary, return its insertion order
- size_t SortedStringDictionary::insert(const char * str, size_t len) {
- auto ret = dict.insert({DictEntry(str, len), dict.size()});
- if (ret.second) {
- // make a copy to internal storage
- data.push_back(std::vector<char>(len));
- memcpy(data.back().data(), str, len);
- // update dictionary entry to link pointer to internal storage
- DictEntry * entry = const_cast<DictEntry *>(&(ret.first->first));
- entry->data = data.back().data();
- totalLength += len;
- }
- return ret.first->second;
- }
-
- // write dictionary data & length to output buffer
- void SortedStringDictionary::flush(AppendOnlyBufferedStream * dataStream,
- RleEncoder * lengthEncoder) const {
- for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
- dataStream->write(it->first.data, it->first.length);
- lengthEncoder->write(static_cast<int64_t>(it->first.length));
- }
- }
-
- /**
- * Reorder input index buffer from insertion order to dictionary order
- *
- * We require this function because string values are buffered by indexes
- * in their insertion order. Until the entire dictionary is complete can
- * we get their sorted indexes in the dictionary in that ORC specification
- * demands dictionary should be ordered. Therefore this function transforms
- * the indexes from insertion order to dictionary value order for final
- * output.
- */
- void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
- // iterate the dictionary to get mapping from insertion order to value order
- std::vector<size_t> mapping(dict.size());
- size_t dictIdx = 0;
- for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
- mapping[it->second] = dictIdx++;
- }
-
- // do the transformation
- for (size_t i = 0; i != idxBuffer.size(); ++i) {
- idxBuffer[i] = static_cast<int64_t>(
- mapping[static_cast<size_t>(idxBuffer[i])]);
- }
- }
-
- // get dict entries in insertion order
- void SortedStringDictionary::getEntriesInInsertionOrder(
- std::vector<const DictEntry *>& entries) const {
- entries.resize(dict.size());
- for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
- entries[it->second] = &(it->first);
- }
- }
-
- // return count of entries
- size_t SortedStringDictionary::size() const {
- return dict.size();
- }
-
- // return total length of strings in the dictioanry
- uint64_t SortedStringDictionary::length() const {
- return totalLength;
- }
-
- void SortedStringDictionary::clear() {
- totalLength = 0;
- data.clear();
- dict.clear();
- }
-
- class StringColumnWriter : public ColumnWriter {
- public:
- StringColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- virtual void createRowIndexEntry() override;
-
- virtual void writeDictionary() override;
-
- virtual void reset() override;
-
- private:
- /**
- * dictionary related functions
- */
- bool checkDictionaryKeyRatio();
- void createDirectStreams();
- void createDictStreams();
- void deleteDictStreams();
- void fallbackToDirectEncoding();
-
- protected:
- RleVersion rleVersion;
- bool useCompression;
- const StreamsFactory& streamsFactory;
- bool alignedBitPacking;
-
- // direct encoding streams
- std::unique_ptr<RleEncoder> directLengthEncoder;
- std::unique_ptr<AppendOnlyBufferedStream> directDataStream;
-
- // dictionary encoding streams
- std::unique_ptr<RleEncoder> dictDataEncoder;
- std::unique_ptr<RleEncoder> dictLengthEncoder;
- std::unique_ptr<AppendOnlyBufferedStream> dictStream;
-
- /**
- * dictionary related variables
- */
- SortedStringDictionary dictionary;
- // whether or not dictionary checking is done
- bool doneDictionaryCheck;
- // whether or not it should be used
- bool useDictionary;
- // keys in the dictionary should not exceed this ratio
- double dictSizeThreshold;
-
- // record start row of each row group; null rows are skipped
- mutable std::vector<size_t> startOfRowGroups;
- };
-
- StringColumnWriter::StringColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options),
- rleVersion(options.getRleVersion()),
- useCompression(options.getCompression() != CompressionKind_NONE),
- streamsFactory(factory),
- alignedBitPacking(options.getAlignedBitpacking()),
- doneDictionaryCheck(false),
- useDictionary(options.getEnableDictionary()),
- dictSizeThreshold(options.getDictionaryKeySizeThreshold()){
- if (type.getKind() == TypeKind::BINARY) {
- useDictionary = false;
- doneDictionaryCheck = true;
- }
-
- if (useDictionary) {
- createDictStreams();
- } else {
- doneDictionaryCheck = true;
- createDirectStreams();
- }
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const StringVectorBatch* stringBatch =
- dynamic_cast<const StringVectorBatch*>(&rowBatch);
- if (stringBatch == nullptr) {
- throw InvalidArgument("Failed to cast to StringVectorBatch");
- }
-
- StringColumnStatisticsImpl* strStats =
- dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
- if (strStats == nullptr) {
- throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- char *const * data = stringBatch->data.data() + offset;
- const int64_t* length = stringBatch->length.data() + offset;
- const char* notNull = stringBatch->hasNulls ?
- stringBatch->notNull.data() + offset : nullptr;
-
- if (!useDictionary){
- directLengthEncoder->add(length, numValues, notNull);
- }
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- const size_t len = static_cast<size_t>(length[i]);
- if (useDictionary) {
- size_t index = dictionary.insert(data[i], len);
- dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index));
- } else {
- directDataStream->write(data[i], len);
- }
- if (enableBloomFilter) {
- bloomFilter->addBytes(data[i], static_cast<int64_t>(len));
- }
- strStats->update(data[i], len);
- ++count;
- }
- }
- strStats->increase(count);
- if (count < numValues) {
- strStats->setHasNull(true);
- }
- }
-
- void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- if (useDictionary) {
- proto::Stream data;
- data.set_kind(proto::Stream_Kind_DATA);
- data.set_column(static_cast<uint32_t>(columnId));
- data.set_length(dictDataEncoder->flush());
- streams.push_back(data);
-
- proto::Stream dict;
- dict.set_kind(proto::Stream_Kind_DICTIONARY_DATA);
- dict.set_column(static_cast<uint32_t>(columnId));
- dict.set_length(dictStream->flush());
- streams.push_back(dict);
-
- proto::Stream length;
- length.set_kind(proto::Stream_Kind_LENGTH);
- length.set_column(static_cast<uint32_t>(columnId));
- length.set_length(dictLengthEncoder->flush());
- streams.push_back(length);
- } else {
- proto::Stream length;
- length.set_kind(proto::Stream_Kind_LENGTH);
- length.set_column(static_cast<uint32_t>(columnId));
- length.set_length(directLengthEncoder->flush());
- streams.push_back(length);
-
- proto::Stream data;
- data.set_kind(proto::Stream_Kind_DATA);
- data.set_column(static_cast<uint32_t>(columnId));
- data.set_length(directDataStream->flush());
- streams.push_back(data);
- }
- }
-
- uint64_t StringColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- if (!useDictionary) {
- size += directLengthEncoder->getBufferSize();
- size += directDataStream->getSize();
- } else {
- size += dictionary.length();
- size += dictionary.size() * sizeof(int32_t);
- size += dictionary.idxInDictBuffer.size() * sizeof(int32_t);
- if (useCompression) {
- size /= 3; // estimated ratio is 3:1
- }
- }
- return size;
- }
-
- void StringColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- if (!useDictionary) {
- encoding.set_kind(rleVersion == RleVersion_1 ?
- proto::ColumnEncoding_Kind_DIRECT :
- proto::ColumnEncoding_Kind_DIRECT_V2);
- } else {
- encoding.set_kind(rleVersion == RleVersion_1 ?
- proto::ColumnEncoding_Kind_DICTIONARY :
- proto::ColumnEncoding_Kind_DICTIONARY_V2);
- }
- encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size()));
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void StringColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- if (!useDictionary) {
- directDataStream->recordPosition(rowIndexPosition.get());
- directLengthEncoder->recordPosition(rowIndexPosition.get());
- } else {
- if (enableIndex) {
- startOfRowGroups.push_back(dictionary.idxInDictBuffer.size());
- }
- }
- }
-
- bool StringColumnWriter::checkDictionaryKeyRatio() {
- if (!doneDictionaryCheck) {
- useDictionary = dictionary.size() <= static_cast<size_t>(
- static_cast<double>(dictionary.idxInDictBuffer.size()) * dictSizeThreshold);
- doneDictionaryCheck = true;
- }
-
- return useDictionary;
- }
-
- void StringColumnWriter::createRowIndexEntry() {
- if (useDictionary && !doneDictionaryCheck) {
- if (!checkDictionaryKeyRatio()) {
- fallbackToDirectEncoding();
- }
- }
- ColumnWriter::createRowIndexEntry();
- }
-
- void StringColumnWriter::reset() {
- ColumnWriter::reset();
-
- dictionary.clear();
- dictionary.idxInDictBuffer.resize(0);
- startOfRowGroups.clear();
- startOfRowGroups.push_back(0);
- }
-
- void StringColumnWriter::createDirectStreams() {
- std::unique_ptr<BufferedOutputStream> directLengthStream =
- streamsFactory.createStream(proto::Stream_Kind_LENGTH);
- directLengthEncoder = createRleEncoder(std::move(directLengthStream),
- false,
- rleVersion,
- memPool,
- alignedBitPacking);
- directDataStream.reset(new AppendOnlyBufferedStream(
- streamsFactory.createStream(proto::Stream_Kind_DATA)));
- }
-
- void StringColumnWriter::createDictStreams() {
- std::unique_ptr<BufferedOutputStream> dictDataStream =
- streamsFactory.createStream(proto::Stream_Kind_DATA);
- dictDataEncoder = createRleEncoder(std::move(dictDataStream),
- false,
- rleVersion,
- memPool,
- alignedBitPacking);
- std::unique_ptr<BufferedOutputStream> dictLengthStream =
- streamsFactory.createStream(proto::Stream_Kind_LENGTH);
- dictLengthEncoder = createRleEncoder(std::move(dictLengthStream),
- false,
- rleVersion,
- memPool,
- alignedBitPacking);
- dictStream.reset(new AppendOnlyBufferedStream(
- streamsFactory.createStream(proto::Stream_Kind_DICTIONARY_DATA)));
- }
-
- void StringColumnWriter::deleteDictStreams() {
- dictDataEncoder.reset(nullptr);
- dictLengthEncoder.reset(nullptr);
- dictStream.reset(nullptr);
-
- dictionary.clear();
- dictionary.idxInDictBuffer.clear();
- startOfRowGroups.clear();
- }
-
- void StringColumnWriter::writeDictionary() {
- if (useDictionary && !doneDictionaryCheck) {
- // when index is disabled, dictionary check happens while writing 1st stripe
- if (!checkDictionaryKeyRatio()) {
- fallbackToDirectEncoding();
- return;
- }
- }
-
- if (useDictionary) {
- // flush dictionary data & length streams
- dictionary.flush(dictStream.get(), dictLengthEncoder.get());
-
- // convert index from insertion order to dictionary order
- dictionary.reorder(dictionary.idxInDictBuffer);
-
- // write data sequences
- int64_t * data = dictionary.idxInDictBuffer.data();
- if (enableIndex) {
- size_t prevOffset = 0;
- for (size_t i = 0; i < startOfRowGroups.size(); ++i) {
- // write sequences in batch for a row group stride
- size_t offset = startOfRowGroups[i];
- dictDataEncoder->add(data + prevOffset, offset - prevOffset, nullptr);
-
- // update index positions
- int rowGroupId = static_cast<int>(i);
- proto::RowIndexEntry* indexEntry =
- (rowGroupId < rowIndex->entry_size()) ?
- rowIndex->mutable_entry(rowGroupId) : rowIndexEntry.get();
-
- // add positions for direct streams
- RowIndexPositionRecorder recorder(*indexEntry);
- dictDataEncoder->recordPosition(&recorder);
-
- prevOffset = offset;
- }
-
- dictDataEncoder->add(data + prevOffset,
- dictionary.idxInDictBuffer.size() - prevOffset,
- nullptr);
- } else {
- dictDataEncoder->add(data, dictionary.idxInDictBuffer.size(), nullptr);
- }
- }
- }
-
- void StringColumnWriter::fallbackToDirectEncoding() {
- createDirectStreams();
-
- if (enableIndex) {
- // fallback happens at the 1st row group;
- // simply complete positions for direct streams
- proto::RowIndexEntry * indexEntry = rowIndexEntry.get();
- RowIndexPositionRecorder recorder(*indexEntry);
- directDataStream->recordPosition(&recorder);
- directLengthEncoder->recordPosition(&recorder);
- }
-
- // get dictionary entries in insertion order
- std::vector<const SortedStringDictionary::DictEntry *> entries;
- dictionary.getEntriesInInsertionOrder(entries);
-
- // store each length of the data into a vector
- const SortedStringDictionary::DictEntry * dictEntry = nullptr;
- for (uint64_t i = 0; i != dictionary.idxInDictBuffer.size(); ++i) {
- // write one row data in direct encoding
- dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer[i])];
- directDataStream->write(dictEntry->data, dictEntry->length);
- directLengthEncoder->write(static_cast<int64_t>(dictEntry->length));
- }
-
- deleteDictStreams();
- }
-
- struct Utf8Utils {
- /**
- * Counts how many utf-8 chars of the input data
- */
- static uint64_t charLength(const char * data, uint64_t length) {
- uint64_t chars = 0;
- for (uint64_t i = 0; i < length; i++) {
- if (isUtfStartByte(data[i])) {
- chars++;
- }
- }
- return chars;
- }
-
- /**
- * Return the number of bytes required to read at most maxCharLength
- * characters in full from a utf-8 encoded byte array provided
- * by data. This does not validate utf-8 data, but
- * operates correctly on already valid utf-8 data.
- *
- * @param maxCharLength number of characters required
- * @param data the bytes of UTF-8
- * @param length the length of data to truncate
- */
- static uint64_t truncateBytesTo(uint64_t maxCharLength,
- const char * data,
- uint64_t length) {
- uint64_t chars = 0;
- if (length <= maxCharLength) {
- return length;
- }
- for (uint64_t i = 0; i < length; i++) {
- if (isUtfStartByte(data[i])) {
- chars++;
- }
- if (chars > maxCharLength) {
- return i;
- }
- }
- // everything fits
- return length;
- }
-
- /**
- * Checks if b is the first byte of a UTF-8 character.
- */
- inline static bool isUtfStartByte(char b) {
- return (b & 0xC0) != 0x80;
- }
-
- /**
- * Find the start of the last character that ends in the current string.
- * @param text the bytes of the utf-8
- * @param from the first byte location
- * @param until the last byte location
- * @return the index of the last character
- */
- static uint64_t findLastCharacter(const char * text, uint64_t from, uint64_t until) {
- uint64_t posn = until;
- /* we don't expect characters more than 5 bytes */
- while (posn >= from) {
- if (isUtfStartByte(text[posn])) {
- return posn;
- }
- posn -= 1;
- }
- /* beginning of a valid char not found */
- throw std::logic_error(
- "Could not truncate string, beginning of a valid char not found");
- }
- };
-
- class CharColumnWriter : public StringColumnWriter {
- public:
- CharColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- StringColumnWriter(type, factory, options),
- maxLength(type.getMaximumLength()),
- padBuffer(*options.getMemoryPool()) {
- // utf-8 is currently 4 bytes long, but it could be up to 6
- padBuffer.resize(maxLength * 6);
- }
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- private:
- uint64_t maxLength;
- DataBuffer<char> padBuffer;
- };
-
- void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
- if (charsBatch == nullptr) {
- throw InvalidArgument("Failed to cast to StringVectorBatch");
- }
-
- StringColumnStatisticsImpl* strStats =
- dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
- if (strStats == nullptr) {
- throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- char** data = charsBatch->data.data() + offset;
- int64_t* length = charsBatch->length.data() + offset;
- const char* notNull = charsBatch->hasNulls ?
- charsBatch->notNull.data() + offset : nullptr;
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- const char * charData = nullptr;
- uint64_t originLength = static_cast<uint64_t>(length[i]);
- uint64_t charLength = Utf8Utils::charLength(data[i], originLength);
- if (charLength >= maxLength) {
- charData = data[i];
- length[i] = static_cast<int64_t>(
- Utf8Utils::truncateBytesTo(maxLength, data[i], originLength));
- } else {
- charData = padBuffer.data();
- // the padding is exactly 1 byte per char
- length[i] = length[i] + static_cast<int64_t>(maxLength - charLength);
- memcpy(padBuffer.data(), data[i], originLength);
- memset(padBuffer.data() + originLength,
- ' ',
- static_cast<size_t>(length[i]) - originLength);
- }
-
- if (useDictionary) {
- size_t index = dictionary.insert(charData, static_cast<size_t>(length[i]));
- dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index));
- } else {
- directDataStream->write(charData, static_cast<size_t>(length[i]));
- }
-
- if (enableBloomFilter) {
- bloomFilter->addBytes(data[i], length[i]);
- }
- strStats->update(charData, static_cast<size_t>(length[i]));
- ++count;
- }
- }
-
- if (!useDictionary) {
- directLengthEncoder->add(length, numValues, notNull);
- }
-
- strStats->increase(count);
- if (count < numValues) {
- strStats->setHasNull(true);
- }
- }
-
- class VarCharColumnWriter : public StringColumnWriter {
- public:
- VarCharColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- StringColumnWriter(type, factory, options),
- maxLength(type.getMaximumLength()) {
- // PASS
- }
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- private:
- uint64_t maxLength;
- };
-
- void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
- if (charsBatch == nullptr) {
- throw InvalidArgument("Failed to cast to StringVectorBatch");
- }
-
- StringColumnStatisticsImpl* strStats =
- dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
- if (strStats == nullptr) {
- throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- char* const* data = charsBatch->data.data() + offset;
- int64_t* length = charsBatch->length.data() + offset;
- const char* notNull = charsBatch->hasNulls ?
- charsBatch->notNull.data() + offset : nullptr;
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- uint64_t itemLength = Utf8Utils::truncateBytesTo(
- maxLength, data[i], static_cast<uint64_t>(length[i]));
- length[i] = static_cast<int64_t>(itemLength);
-
- if (useDictionary) {
- size_t index = dictionary.insert(data[i], static_cast<size_t>(length[i]));
- dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index));
- } else {
- directDataStream->write(data[i], static_cast<size_t>(length[i]));
- }
-
- if (enableBloomFilter) {
- bloomFilter->addBytes(data[i], length[i]);
- }
- strStats->update(data[i], static_cast<size_t>(length[i]));
- ++count;
- }
- }
-
- if (!useDictionary) {
- directLengthEncoder->add(length, numValues, notNull);
- }
-
- strStats->increase(count);
- if (count < numValues) {
- strStats->setHasNull(true);
- }
- }
-
- class BinaryColumnWriter : public StringColumnWriter {
- public:
- BinaryColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- StringColumnWriter(type, factory, options) {
- // PASS
- }
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
- };
-
- void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
- if (binBatch == nullptr) {
- throw InvalidArgument("Failed to cast to StringVectorBatch");
- }
-
- BinaryColumnStatisticsImpl* binStats =
- dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
- if (binStats == nullptr) {
- throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- char** data = binBatch->data.data() + offset;
- int64_t* length = binBatch->length.data() + offset;
- const char* notNull = binBatch->hasNulls ?
- binBatch->notNull.data() + offset : nullptr;
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
- if (!notNull || notNull[i]) {
- directDataStream->write(data[i], unsignedLength);
-
- binStats->update(unsignedLength);
- ++count;
- }
- }
- directLengthEncoder->add(length, numValues, notNull);
- binStats->increase(count);
- if (count < numValues) {
- binStats->setHasNull(true);
- }
- }
-
- class TimestampColumnWriter : public ColumnWriter {
- public:
- TimestampColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- protected:
- std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
-
- private:
- RleVersion rleVersion;
- const Timezone& timezone;
- };
-
- TimestampColumnWriter::TimestampColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options),
- rleVersion(options.getRleVersion()),
- timezone(getTimezoneByName("GMT")){
- std::unique_ptr<BufferedOutputStream> dataStream =
- factory.createStream(proto::Stream_Kind_DATA);
- std::unique_ptr<BufferedOutputStream> secondaryStream =
- factory.createStream(proto::Stream_Kind_SECONDARY);
- secRleEncoder = createRleEncoder(std::move(dataStream),
- true,
- rleVersion,
- memPool,
- options.getAlignedBitpacking());
- nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
- false,
- rleVersion,
- memPool,
- options.getAlignedBitpacking());
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- // Because the number of nanoseconds often has a large number of trailing zeros,
- // the number has trailing decimal zero digits removed and the last three bits
- // are used to record how many zeros were removed if the trailing zeros are
- // more than 2. Thus 1000 nanoseconds would be serialized as 0x0a and
- // 100000 would be serialized as 0x0c.
- static int64_t formatNano(int64_t nanos) {
- if (nanos == 0) {
- return 0;
- } else if (nanos % 100 != 0) {
- return (nanos) << 3;
- } else {
- nanos /= 100;
- int64_t trailingZeros = 1;
- while (nanos % 10 == 0 && trailingZeros < 7) {
- nanos /= 10;
- trailingZeros += 1;
- }
- return (nanos) << 3 | trailingZeros;
- }
- }
-
- void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- TimestampVectorBatch* tsBatch =
- dynamic_cast<TimestampVectorBatch*>(&rowBatch);
- if (tsBatch == nullptr) {
- throw InvalidArgument("Failed to cast to TimestampVectorBatch");
- }
-
- TimestampColumnStatisticsImpl* tsStats =
- dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
- if (tsStats == nullptr) {
- throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const char* notNull = tsBatch->hasNulls ?
- tsBatch->notNull.data() + offset : nullptr;
- int64_t *secs = tsBatch->data.data() + offset;
- int64_t *nanos = tsBatch->nanoseconds.data() + offset;
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull == nullptr || notNull[i]) {
- // TimestampVectorBatch already stores data in UTC
- int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(millsUTC);
- }
- tsStats->update(millsUTC);
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Int128.hh"
+#include "orc/Writer.hh"
+
+#include "ByteRLE.hh"
+#include "ColumnWriter.hh"
+#include "RLE.hh"
+#include "Statistics.hh"
+#include "Timezone.hh"
+
+namespace orc {
+ StreamsFactory::~StreamsFactory() {
+ //PASS
+ }
+
+ class StreamsFactoryImpl : public StreamsFactory {
+ public:
+ StreamsFactoryImpl(
+ const WriterOptions& writerOptions,
+ OutputStream* outputStream) :
+ options(writerOptions),
+ outStream(outputStream) {
+ }
+
+ virtual std::unique_ptr<BufferedOutputStream>
+ createStream(proto::Stream_Kind kind) const override;
+ private:
+ const WriterOptions& options;
+ OutputStream* outStream;
+ };
+
+ std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
+ proto::Stream_Kind) const {
+ // In the future, we can decide compression strategy and modifier
+ // based on stream kind. But for now we just use the setting from
+ // WriterOption
+ return createCompressor(
+ options.getCompression(),
+ outStream,
+ options.getCompressionStrategy(),
+ // BufferedOutputStream initial capacity
+ 1 * 1024 * 1024,
+ options.getCompressionBlockSize(),
+ *options.getMemoryPool());
+ }
+
+ std::unique_ptr<StreamsFactory> createStreamsFactory(
+ const WriterOptions& options,
+ OutputStream* outStream) {
+ return std::unique_ptr<StreamsFactory>(
+ new StreamsFactoryImpl(options, outStream));
+ }
+
+ RowIndexPositionRecorder::~RowIndexPositionRecorder() {
+ // PASS
+ }
+
+ proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion)
+ {
+ switch (rleVersion)
+ {
+ case RleVersion_1:
+ return proto::ColumnEncoding_Kind_DIRECT;
+ case RleVersion_2:
+ return proto::ColumnEncoding_Kind_DIRECT_V2;
+ default:
+ throw InvalidArgument("Invalid param");
+ }
+ }
+
+ ColumnWriter::ColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ columnId(type.getColumnId()),
+ colIndexStatistics(),
+ colStripeStatistics(),
+ colFileStatistics(),
+ enableIndex(options.getEnableIndex()),
+ rowIndex(),
+ rowIndexEntry(),
+ rowIndexPosition(),
+ enableBloomFilter(false),
+ memPool(*options.getMemoryPool()),
+ indexStream(),
+ bloomFilterStream() {
+
+ std::unique_ptr<BufferedOutputStream> presentStream =
+ factory.createStream(proto::Stream_Kind_PRESENT);
+ notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
+
+ colIndexStatistics = createColumnStatistics(type);
+ colStripeStatistics = createColumnStatistics(type);
+ colFileStatistics = createColumnStatistics(type);
+
+ if (enableIndex) {
+ rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
+ rowIndexEntry =
+ std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
+ rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
+ new RowIndexPositionRecorder(*rowIndexEntry));
+ indexStream =
+ factory.createStream(proto::Stream_Kind_ROW_INDEX);
+
+ // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported
+ if (options.isColumnUseBloomFilter(columnId)
+ && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) {
+ enableBloomFilter = true;
+ bloomFilter.reset(new BloomFilterImpl(
+ options.getRowIndexStride(), options.getBloomFilterFPP()));
+ bloomFilterIndex.reset(new proto::BloomFilterIndex());
+ bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8);
+ }
+ }
+ }
+
+ ColumnWriter::~ColumnWriter() {
+ // PASS
+ }
+
+ void ColumnWriter::add(ColumnVectorBatch& batch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask);
+ }
+
+ void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_PRESENT);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(notNullEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t ColumnWriter::getEstimatedSize() const {
+ return notNullEncoder->getBufferSize();
+ }
+
+ void ColumnWriter::getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ getProtoBufStatistics(stats, colStripeStatistics.get());
+ }
+
+ void ColumnWriter::mergeStripeStatsIntoFileStats() {
+ colFileStatistics->merge(*colStripeStatistics);
+ colStripeStatistics->reset();
+ }
+
+ void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+ colStripeStatistics->merge(*colIndexStatistics);
+ colIndexStatistics->reset();
+ }
+
+ void ColumnWriter::getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ getProtoBufStatistics(stats, colFileStatistics.get());
+ }
+
+ void ColumnWriter::createRowIndexEntry() {
+ proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics();
+ colIndexStatistics->toProtoBuf(*indexStats);
+
+ *rowIndex->add_entry() = *rowIndexEntry;
+
+ rowIndexEntry->clear_positions();
+ rowIndexEntry->clear_statistics();
+
+ colStripeStatistics->merge(*colIndexStatistics);
+ colIndexStatistics->reset();
+
+ addBloomFilterEntry();
+
+ recordPosition();
+ }
+
+ void ColumnWriter::addBloomFilterEntry() {
+ if (enableBloomFilter) {
+ BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter());
+ bloomFilter->reset();
+ }
+ }
+
+ void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
+ // write row index to output stream
+ rowIndex->SerializeToZeroCopyStream(indexStream.get());
+
+ // construct row index stream
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_ROW_INDEX);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(indexStream->flush());
+ streams.push_back(stream);
+
+ // write BLOOM_FILTER_UTF8 stream
+ if (enableBloomFilter) {
+ if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) {
+ throw std::logic_error("Failed to write bloom filter stream.");
+ }
+ stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(bloomFilterStream->flush());
+ streams.push_back(stream);
+ }
+ }
+
+ void ColumnWriter::recordPosition() const {
+ notNullEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ void ColumnWriter::reset() {
+ if (enableIndex) {
+ // clear row index
+ rowIndex->clear_entry();
+ rowIndexEntry->clear_positions();
+ rowIndexEntry->clear_statistics();
+
+ // write current positions
+ recordPosition();
+ }
+
+ if (enableBloomFilter) {
+ bloomFilter->reset();
+ bloomFilterIndex->clear_bloomfilter();
+ }
+ }
+
+ void ColumnWriter::writeDictionary() {
+ // PASS
+ }
+
+ class StructColumnWriter : public ColumnWriter {
+ public:
+ StructColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+ ~StructColumnWriter() override;
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void mergeStripeStatsIntoFileStats() override;
+
+ virtual void mergeRowGroupStatsIntoStripeStats() override;
+
+ virtual void createRowIndexEntry() override;
+
+ virtual void writeIndex(
+ std::vector<proto::Stream> &streams) const override;
+
+ virtual void writeDictionary() override;
+
+ virtual void reset() override;
+
+ private:
+ std::vector<ColumnWriter *> children;
+ };
+
+ StructColumnWriter::StructColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
+ const Type& child = *type.getSubtype(i);
+ children.push_back(buildWriter(child, factory, options).release());
+ }
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ StructColumnWriter::~StructColumnWriter() {
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ delete children[i];
+ }
+ }
+
+ void StructColumnWriter::add(
+ ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const StructVectorBatch* structBatch =
+ dynamic_cast<const StructVectorBatch *>(&rowBatch);
+ if (structBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to StructVectorBatch");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+ const char* notNull = structBatch->hasNulls ?
+ structBatch->notNull.data() + offset : nullptr;
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->add(*structBatch->fields[i], offset, numValues, notNull);
+ }
+
+ // update stats
+ if (!notNull) {
+ colIndexStatistics->increase(numValues);
+ } else {
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull[i]) {
+ ++count;
+ }
+ }
+ colIndexStatistics->increase(count);
+ if (count < numValues) {
+ colIndexStatistics->setHasNull(true);
+ }
+ }
+ }
+
+ void StructColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->flush(streams);
+ }
+ }
+
+ void StructColumnWriter::writeIndex(
+ std::vector<proto::Stream> &streams) const {
+ ColumnWriter::writeIndex(streams);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->writeIndex(streams);
+ }
+ }
+
+ uint64_t StructColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ size += children[i]->getEstimatedSize();
+ }
+ return size;
+ }
+
+ void StructColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ encodings.push_back(encoding);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->getColumnEncoding(encodings);
+ }
+ }
+
+ void StructColumnWriter::getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getStripeStatistics(stats);
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->getStripeStatistics(stats);
+ }
+ }
+
+ void StructColumnWriter::mergeStripeStatsIntoFileStats() {
+ ColumnWriter::mergeStripeStatsIntoFileStats();
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->mergeStripeStatsIntoFileStats();
+ }
+ }
+
+ void StructColumnWriter::getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getFileStatistics(stats);
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->getFileStatistics(stats);
+ }
+ }
+
+ void StructColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+ ColumnWriter::mergeRowGroupStatsIntoStripeStats();
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->mergeRowGroupStatsIntoStripeStats();
+ }
+ }
+
+ void StructColumnWriter::createRowIndexEntry() {
+ ColumnWriter::createRowIndexEntry();
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->createRowIndexEntry();
+ }
+ }
+
+ void StructColumnWriter::reset() {
+ ColumnWriter::reset();
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->reset();
+ }
+ }
+
+ void StructColumnWriter::writeDictionary() {
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->writeDictionary();
+ }
+ }
+
+ class IntegerColumnWriter : public ColumnWriter {
+ public:
+ IntegerColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ protected:
+ std::unique_ptr<RleEncoder> rleEncoder;
+
+ private:
+ RleVersion rleVersion;
+ };
+
+ IntegerColumnWriter::IntegerColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(options.getRleVersion()) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ rleEncoder = createRleEncoder(
+ std::move(dataStream),
+ true,
+ rleVersion,
+ memPool,
+ options.getAlignedBitpacking());
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void IntegerColumnWriter::add(
+ ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const LongVectorBatch* longBatch =
+ dynamic_cast<const LongVectorBatch*>(&rowBatch);
+ if (longBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to LongVectorBatch");
+ }
+ IntegerColumnStatisticsImpl* intStats =
+ dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (intStats == nullptr) {
+ throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const int64_t* data = longBatch->data.data() + offset;
+ const char* notNull = longBatch->hasNulls ?
+ longBatch->notNull.data() + offset : nullptr;
+
+ rleEncoder->add(data, numValues, notNull);
+
+ // update stats
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
+ intStats->update(data[i], 1);
+ }
+ }
+ intStats->increase(count);
+ if (count < numValues) {
+ intStats->setHasNull(true);
+ }
+ }
+
+ void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(rleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t IntegerColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += rleEncoder->getBufferSize();
+ return size;
+ }
+
+ void IntegerColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(RleVersionMapper(rleVersion));
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void IntegerColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ rleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class ByteColumnWriter : public ColumnWriter {
+ public:
+ ByteColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> byteRleEncoder;
+ };
+
+ ByteColumnWriter::ByteColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ byteRleEncoder = createByteRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
+ if (byteBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to LongVectorBatch");
+ }
+ IntegerColumnStatisticsImpl* intStats =
+ dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (intStats == nullptr) {
+ throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ int64_t* data = byteBatch->data.data() + offset;
+ const char* notNull = byteBatch->hasNulls ?
+ byteBatch->notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ byteRleEncoder->add(byteData, numValues, notNull);
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
+ intStats->update(static_cast<int64_t>(byteData[i]), 1);
+ }
+ }
+ intStats->increase(count);
+ if (count < numValues) {
+ intStats->setHasNull(true);
+ }
+ }
+
+ void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(byteRleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t ByteColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += byteRleEncoder->getBufferSize();
+ return size;
+ }
+
+ void ByteColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void ByteColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ byteRleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class BooleanColumnWriter : public ColumnWriter {
+ public:
+ BooleanColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> rleEncoder;
+ };
+
+ BooleanColumnWriter::BooleanColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ rleEncoder = createBooleanRleEncoder(std::move(dataStream));
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
+ if (byteBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to LongVectorBatch");
+ }
+ BooleanColumnStatisticsImpl* boolStats =
+ dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (boolStats == nullptr) {
+ throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ int64_t* data = byteBatch->data.data() + offset;
+ const char* notNull = byteBatch->hasNulls ?
+ byteBatch->notNull.data() + offset : nullptr;
+
+ char* byteData = reinterpret_cast<char*>(data);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ byteData[i] = static_cast<char>(data[i]);
+ }
+ rleEncoder->add(byteData, numValues, notNull);
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
+ boolStats->update(byteData[i] != 0, 1);
+ }
+ }
+ boolStats->increase(count);
+ if (count < numValues) {
+ boolStats->setHasNull(true);
+ }
+ }
+
+ void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(rleEncoder->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t BooleanColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += rleEncoder->getBufferSize();
+ return size;
+ }
+
+ void BooleanColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void BooleanColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ rleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class DoubleColumnWriter : public ColumnWriter {
+ public:
+ DoubleColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloat);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ private:
+ bool isFloat;
+ std::unique_ptr<AppendOnlyBufferedStream> dataStream;
+ DataBuffer<char> buffer;
+ };
+
+ DoubleColumnWriter::DoubleColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloatType) :
+ ColumnWriter(type, factory, options),
+ isFloat(isFloatType),
+ buffer(*options.getMemoryPool()) {
+ dataStream.reset(new AppendOnlyBufferedStream(
+ factory.createStream(proto::Stream_Kind_DATA)));
+ buffer.resize(isFloat ? 4 : 8);
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ // Floating point types are stored using IEEE 754 floating point bit layout.
+ // Float columns use 4 bytes per value and double columns use 8 bytes.
+ template <typename FLOAT_TYPE, typename INTEGER_TYPE>
+ inline void encodeFloatNum(FLOAT_TYPE input, char* output) {
+ INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input);
+ for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) {
+ output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff);
+ }
+ }
+
+ void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const DoubleVectorBatch* dblBatch =
+ dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
+ if (dblBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to DoubleVectorBatch");
+ }
+ DoubleColumnStatisticsImpl* doubleStats =
+ dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (doubleStats == nullptr) {
+ throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const double* doubleData = dblBatch->data.data() + offset;
+ const char* notNull = dblBatch->hasNulls ?
+ dblBatch->notNull.data() + offset : nullptr;
+
+ size_t bytes = isFloat ? 4 : 8;
+ char* data = buffer.data();
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ if (isFloat) {
+ encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data);
+ } else {
+ encodeFloatNum<double, int64_t>(doubleData[i], data);
+ }
+ dataStream->write(data, bytes);
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addDouble(doubleData[i]);
+ }
+ doubleStats->update(doubleData[i]);
+ }
+ }
+ doubleStats->increase(count);
+ if (count < numValues) {
+ doubleStats->setHasNull(true);
+ }
+ }
+
+ void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(dataStream->flush());
+ streams.push_back(stream);
+ }
+
+ uint64_t DoubleColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += dataStream->getSize();
+ return size;
+ }
+
+ void DoubleColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void DoubleColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ dataStream->recordPosition(rowIndexPosition.get());
+ }
+
+ /**
+ * Implementation of increasing sorted string dictionary
+ */
+ class SortedStringDictionary {
+ public:
+ struct DictEntry {
+ DictEntry(const char * str, size_t len):data(str),length(len) {}
+ const char * data;
+ size_t length;
+ };
+
+ SortedStringDictionary():totalLength(0) {}
+
+ // insert a new string into dictionary, return its insertion order
+ size_t insert(const char * data, size_t len);
+
+ // write dictionary data & length to output buffer
+ void flush(AppendOnlyBufferedStream * dataStream,
+ RleEncoder * lengthEncoder) const;
+
+ // reorder input index buffer from insertion order to dictionary order
+ void reorder(std::vector<int64_t>& idxBuffer) const;
+
+ // get dict entries in insertion order
+ void getEntriesInInsertionOrder(std::vector<const DictEntry *>&) const;
+
+ // return count of entries
+ size_t size() const;
+
+ // return total length of strings in the dictioanry
+ uint64_t length() const;
+
+ void clear();
+
+ private:
+ struct LessThan {
+ bool operator()(const DictEntry& left, const DictEntry& right) const {
+ int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
+ if (ret != 0) {
+ return ret < 0;
+ }
+ return left.length < right.length;
+ }
+ };
+
+ std::map<DictEntry, size_t, LessThan> dict;
+ std::vector<std::vector<char>> data;
+ uint64_t totalLength;
+
+ // use friend class here to avoid being bothered by const function calls
+ friend class StringColumnWriter;
+ friend class CharColumnWriter;
+ friend class VarCharColumnWriter;
+ // store indexes of insertion order in the dictionary for not-null rows
+ std::vector<int64_t> idxInDictBuffer;
+ };
+
+ // insert a new string into dictionary, return its insertion order
+ size_t SortedStringDictionary::insert(const char * str, size_t len) {
+ auto ret = dict.insert({DictEntry(str, len), dict.size()});
+ if (ret.second) {
+ // make a copy to internal storage
+ data.push_back(std::vector<char>(len));
+ memcpy(data.back().data(), str, len);
+ // update dictionary entry to link pointer to internal storage
+ DictEntry * entry = const_cast<DictEntry *>(&(ret.first->first));
+ entry->data = data.back().data();
+ totalLength += len;
+ }
+ return ret.first->second;
+ }
+
+ // write dictionary data & length to output buffer
+ void SortedStringDictionary::flush(AppendOnlyBufferedStream * dataStream,
+ RleEncoder * lengthEncoder) const {
+ for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
+ dataStream->write(it->first.data, it->first.length);
+ lengthEncoder->write(static_cast<int64_t>(it->first.length));
+ }
+ }
+
+ /**
+ * Reorder input index buffer from insertion order to dictionary order
+ *
+ * We require this function because string values are buffered by indexes
+ * in their insertion order. Until the entire dictionary is complete can
+ * we get their sorted indexes in the dictionary in that ORC specification
+ * demands dictionary should be ordered. Therefore this function transforms
+ * the indexes from insertion order to dictionary value order for final
+ * output.
+ */
+ void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
+ // iterate the dictionary to get mapping from insertion order to value order
+ std::vector<size_t> mapping(dict.size());
+ size_t dictIdx = 0;
+ for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
+ mapping[it->second] = dictIdx++;
+ }
+
+ // do the transformation
+ for (size_t i = 0; i != idxBuffer.size(); ++i) {
+ idxBuffer[i] = static_cast<int64_t>(
+ mapping[static_cast<size_t>(idxBuffer[i])]);
+ }
+ }
+
+ // get dict entries in insertion order
+ void SortedStringDictionary::getEntriesInInsertionOrder(
+ std::vector<const DictEntry *>& entries) const {
+ entries.resize(dict.size());
+ for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
+ entries[it->second] = &(it->first);
+ }
+ }
+
+ // return count of entries
+ size_t SortedStringDictionary::size() const {
+ return dict.size();
+ }
+
+ // return total length of strings in the dictioanry
+ uint64_t SortedStringDictionary::length() const {
+ return totalLength;
+ }
+
+ void SortedStringDictionary::clear() {
+ totalLength = 0;
+ data.clear();
+ dict.clear();
+ }
+
+ class StringColumnWriter : public ColumnWriter {
+ public:
+ StringColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ virtual void createRowIndexEntry() override;
+
+ virtual void writeDictionary() override;
+
+ virtual void reset() override;
+
+ private:
+ /**
+ * dictionary related functions
+ */
+ bool checkDictionaryKeyRatio();
+ void createDirectStreams();
+ void createDictStreams();
+ void deleteDictStreams();
+ void fallbackToDirectEncoding();
+
+ protected:
+ RleVersion rleVersion;
+ bool useCompression;
+ const StreamsFactory& streamsFactory;
+ bool alignedBitPacking;
+
+ // direct encoding streams
+ std::unique_ptr<RleEncoder> directLengthEncoder;
+ std::unique_ptr<AppendOnlyBufferedStream> directDataStream;
+
+ // dictionary encoding streams
+ std::unique_ptr<RleEncoder> dictDataEncoder;
+ std::unique_ptr<RleEncoder> dictLengthEncoder;
+ std::unique_ptr<AppendOnlyBufferedStream> dictStream;
+
+ /**
+ * dictionary related variables
+ */
+ SortedStringDictionary dictionary;
+ // whether or not dictionary checking is done
+ bool doneDictionaryCheck;
+ // whether or not it should be used
+ bool useDictionary;
+ // keys in the dictionary should not exceed this ratio
+ double dictSizeThreshold;
+
+ // record start row of each row group; null rows are skipped
+ mutable std::vector<size_t> startOfRowGroups;
+ };
+
+ StringColumnWriter::StringColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(options.getRleVersion()),
+ useCompression(options.getCompression() != CompressionKind_NONE),
+ streamsFactory(factory),
+ alignedBitPacking(options.getAlignedBitpacking()),
+ doneDictionaryCheck(false),
+ useDictionary(options.getEnableDictionary()),
+ dictSizeThreshold(options.getDictionaryKeySizeThreshold()){
+ if (type.getKind() == TypeKind::BINARY) {
+ useDictionary = false;
+ doneDictionaryCheck = true;
+ }
+
+ if (useDictionary) {
+ createDictStreams();
+ } else {
+ doneDictionaryCheck = true;
+ createDirectStreams();
+ }
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const StringVectorBatch* stringBatch =
+ dynamic_cast<const StringVectorBatch*>(&rowBatch);
+ if (stringBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to StringVectorBatch");
+ }
+
+ StringColumnStatisticsImpl* strStats =
+ dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (strStats == nullptr) {
+ throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ char *const * data = stringBatch->data.data() + offset;
+ const int64_t* length = stringBatch->length.data() + offset;
+ const char* notNull = stringBatch->hasNulls ?
+ stringBatch->notNull.data() + offset : nullptr;
+
+ if (!useDictionary){
+ directLengthEncoder->add(length, numValues, notNull);
+ }
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ const size_t len = static_cast<size_t>(length[i]);
+ if (useDictionary) {
+ size_t index = dictionary.insert(data[i], len);
+ dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index));
+ } else {
+ directDataStream->write(data[i], len);
+ }
+ if (enableBloomFilter) {
+ bloomFilter->addBytes(data[i], static_cast<int64_t>(len));
+ }
+ strStats->update(data[i], len);
+ ++count;
+ }
+ }
+ strStats->increase(count);
+ if (count < numValues) {
+ strStats->setHasNull(true);
+ }
+ }
+
+ void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ if (useDictionary) {
+ proto::Stream data;
+ data.set_kind(proto::Stream_Kind_DATA);
+ data.set_column(static_cast<uint32_t>(columnId));
+ data.set_length(dictDataEncoder->flush());
+ streams.push_back(data);
+
+ proto::Stream dict;
+ dict.set_kind(proto::Stream_Kind_DICTIONARY_DATA);
+ dict.set_column(static_cast<uint32_t>(columnId));
+ dict.set_length(dictStream->flush());
+ streams.push_back(dict);
+
+ proto::Stream length;
+ length.set_kind(proto::Stream_Kind_LENGTH);
+ length.set_column(static_cast<uint32_t>(columnId));
+ length.set_length(dictLengthEncoder->flush());
+ streams.push_back(length);
+ } else {
+ proto::Stream length;
+ length.set_kind(proto::Stream_Kind_LENGTH);
+ length.set_column(static_cast<uint32_t>(columnId));
+ length.set_length(directLengthEncoder->flush());
+ streams.push_back(length);
+
+ proto::Stream data;
+ data.set_kind(proto::Stream_Kind_DATA);
+ data.set_column(static_cast<uint32_t>(columnId));
+ data.set_length(directDataStream->flush());
+ streams.push_back(data);
+ }
+ }
+
+ uint64_t StringColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ if (!useDictionary) {
+ size += directLengthEncoder->getBufferSize();
+ size += directDataStream->getSize();
+ } else {
+ size += dictionary.length();
+ size += dictionary.size() * sizeof(int32_t);
+ size += dictionary.idxInDictBuffer.size() * sizeof(int32_t);
+ if (useCompression) {
+ size /= 3; // estimated ratio is 3:1
+ }
+ }
+ return size;
+ }
+
+ void StringColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ if (!useDictionary) {
+ encoding.set_kind(rleVersion == RleVersion_1 ?
+ proto::ColumnEncoding_Kind_DIRECT :
+ proto::ColumnEncoding_Kind_DIRECT_V2);
+ } else {
+ encoding.set_kind(rleVersion == RleVersion_1 ?
+ proto::ColumnEncoding_Kind_DICTIONARY :
+ proto::ColumnEncoding_Kind_DICTIONARY_V2);
+ }
+ encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size()));
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void StringColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ if (!useDictionary) {
+ directDataStream->recordPosition(rowIndexPosition.get());
+ directLengthEncoder->recordPosition(rowIndexPosition.get());
+ } else {
+ if (enableIndex) {
+ startOfRowGroups.push_back(dictionary.idxInDictBuffer.size());
+ }
+ }
+ }
+
+ bool StringColumnWriter::checkDictionaryKeyRatio() {
+ if (!doneDictionaryCheck) {
+ useDictionary = dictionary.size() <= static_cast<size_t>(
+ static_cast<double>(dictionary.idxInDictBuffer.size()) * dictSizeThreshold);
+ doneDictionaryCheck = true;
+ }
+
+ return useDictionary;
+ }
+
+ void StringColumnWriter::createRowIndexEntry() {
+ if (useDictionary && !doneDictionaryCheck) {
+ if (!checkDictionaryKeyRatio()) {
+ fallbackToDirectEncoding();
+ }
+ }
+ ColumnWriter::createRowIndexEntry();
+ }
+
+ void StringColumnWriter::reset() {
+ ColumnWriter::reset();
+
+ dictionary.clear();
+ dictionary.idxInDictBuffer.resize(0);
+ startOfRowGroups.clear();
+ startOfRowGroups.push_back(0);
+ }
+
+ void StringColumnWriter::createDirectStreams() {
+ std::unique_ptr<BufferedOutputStream> directLengthStream =
+ streamsFactory.createStream(proto::Stream_Kind_LENGTH);
+ directLengthEncoder = createRleEncoder(std::move(directLengthStream),
+ false,
+ rleVersion,
+ memPool,
+ alignedBitPacking);
+ directDataStream.reset(new AppendOnlyBufferedStream(
+ streamsFactory.createStream(proto::Stream_Kind_DATA)));
+ }
+
+ void StringColumnWriter::createDictStreams() {
+ std::unique_ptr<BufferedOutputStream> dictDataStream =
+ streamsFactory.createStream(proto::Stream_Kind_DATA);
+ dictDataEncoder = createRleEncoder(std::move(dictDataStream),
+ false,
+ rleVersion,
+ memPool,
+ alignedBitPacking);
+ std::unique_ptr<BufferedOutputStream> dictLengthStream =
+ streamsFactory.createStream(proto::Stream_Kind_LENGTH);
+ dictLengthEncoder = createRleEncoder(std::move(dictLengthStream),
+ false,
+ rleVersion,
+ memPool,
+ alignedBitPacking);
+ dictStream.reset(new AppendOnlyBufferedStream(
+ streamsFactory.createStream(proto::Stream_Kind_DICTIONARY_DATA)));
+ }
+
+ void StringColumnWriter::deleteDictStreams() {
+ dictDataEncoder.reset(nullptr);
+ dictLengthEncoder.reset(nullptr);
+ dictStream.reset(nullptr);
+
+ dictionary.clear();
+ dictionary.idxInDictBuffer.clear();
+ startOfRowGroups.clear();
+ }
+
+ void StringColumnWriter::writeDictionary() {
+ if (useDictionary && !doneDictionaryCheck) {
+ // when index is disabled, dictionary check happens while writing 1st stripe
+ if (!checkDictionaryKeyRatio()) {
+ fallbackToDirectEncoding();
+ return;
+ }
+ }
+
+ if (useDictionary) {
+ // flush dictionary data & length streams
+ dictionary.flush(dictStream.get(), dictLengthEncoder.get());
+
+ // convert index from insertion order to dictionary order
+ dictionary.reorder(dictionary.idxInDictBuffer);
+
+ // write data sequences
+ int64_t * data = dictionary.idxInDictBuffer.data();
+ if (enableIndex) {
+ size_t prevOffset = 0;
+ for (size_t i = 0; i < startOfRowGroups.size(); ++i) {
+ // write sequences in batch for a row group stride
+ size_t offset = startOfRowGroups[i];
+ dictDataEncoder->add(data + prevOffset, offset - prevOffset, nullptr);
+
+ // update index positions
+ int rowGroupId = static_cast<int>(i);
+ proto::RowIndexEntry* indexEntry =
+ (rowGroupId < rowIndex->entry_size()) ?
+ rowIndex->mutable_entry(rowGroupId) : rowIndexEntry.get();
+
+ // add positions for direct streams
+ RowIndexPositionRecorder recorder(*indexEntry);
+ dictDataEncoder->recordPosition(&recorder);
+
+ prevOffset = offset;
+ }
+
+ dictDataEncoder->add(data + prevOffset,
+ dictionary.idxInDictBuffer.size() - prevOffset,
+ nullptr);
+ } else {
+ dictDataEncoder->add(data, dictionary.idxInDictBuffer.size(), nullptr);
+ }
+ }
+ }
+
+ void StringColumnWriter::fallbackToDirectEncoding() {
+ createDirectStreams();
+
+ if (enableIndex) {
+ // fallback happens at the 1st row group;
+ // simply complete positions for direct streams
+ proto::RowIndexEntry * indexEntry = rowIndexEntry.get();
+ RowIndexPositionRecorder recorder(*indexEntry);
+ directDataStream->recordPosition(&recorder);
+ directLengthEncoder->recordPosition(&recorder);
+ }
+
+ // get dictionary entries in insertion order
+ std::vector<const SortedStringDictionary::DictEntry *> entries;
+ dictionary.getEntriesInInsertionOrder(entries);
+
+ // store each length of the data into a vector
+ const SortedStringDictionary::DictEntry * dictEntry = nullptr;
+ for (uint64_t i = 0; i != dictionary.idxInDictBuffer.size(); ++i) {
+ // write one row data in direct encoding
+ dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer[i])];
+ directDataStream->write(dictEntry->data, dictEntry->length);
+ directLengthEncoder->write(static_cast<int64_t>(dictEntry->length));
+ }
+
+ deleteDictStreams();
+ }
+
+ struct Utf8Utils {
+ /**
+ * Counts how many utf-8 chars of the input data
+ */
+ static uint64_t charLength(const char * data, uint64_t length) {
+ uint64_t chars = 0;
+ for (uint64_t i = 0; i < length; i++) {
+ if (isUtfStartByte(data[i])) {
+ chars++;
+ }
+ }
+ return chars;
+ }
+
+ /**
+ * Return the number of bytes required to read at most maxCharLength
+ * characters in full from a utf-8 encoded byte array provided
+ * by data. This does not validate utf-8 data, but
+ * operates correctly on already valid utf-8 data.
+ *
+ * @param maxCharLength number of characters required
+ * @param data the bytes of UTF-8
+ * @param length the length of data to truncate
+ */
+ static uint64_t truncateBytesTo(uint64_t maxCharLength,
+ const char * data,
+ uint64_t length) {
+ uint64_t chars = 0;
+ if (length <= maxCharLength) {
+ return length;
+ }
+ for (uint64_t i = 0; i < length; i++) {
+ if (isUtfStartByte(data[i])) {
+ chars++;
+ }
+ if (chars > maxCharLength) {
+ return i;
+ }
+ }
+ // everything fits
+ return length;
+ }
+
+ /**
+ * Checks if b is the first byte of a UTF-8 character.
+ */
+ inline static bool isUtfStartByte(char b) {
+ return (b & 0xC0) != 0x80;
+ }
+
+ /**
+ * Find the start of the last character that ends in the current string.
+ * @param text the bytes of the utf-8
+ * @param from the first byte location
+ * @param until the last byte location
+ * @return the index of the last character
+ */
+ static uint64_t findLastCharacter(const char * text, uint64_t from, uint64_t until) {
+ uint64_t posn = until;
+ /* we don't expect characters more than 5 bytes */
+ while (posn >= from) {
+ if (isUtfStartByte(text[posn])) {
+ return posn;
+ }
+ posn -= 1;
+ }
+ /* beginning of a valid char not found */
+ throw std::logic_error(
+ "Could not truncate string, beginning of a valid char not found");
+ }
+ };
+
+ class CharColumnWriter : public StringColumnWriter {
+ public:
+ CharColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ StringColumnWriter(type, factory, options),
+ maxLength(type.getMaximumLength()),
+ padBuffer(*options.getMemoryPool()) {
+ // utf-8 is currently 4 bytes long, but it could be up to 6
+ padBuffer.resize(maxLength * 6);
+ }
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ private:
+ uint64_t maxLength;
+ DataBuffer<char> padBuffer;
+ };
+
+ void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
+ if (charsBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to StringVectorBatch");
+ }
+
+ StringColumnStatisticsImpl* strStats =
+ dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (strStats == nullptr) {
+ throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ char** data = charsBatch->data.data() + offset;
+ int64_t* length = charsBatch->length.data() + offset;
+ const char* notNull = charsBatch->hasNulls ?
+ charsBatch->notNull.data() + offset : nullptr;
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ const char * charData = nullptr;
+ uint64_t originLength = static_cast<uint64_t>(length[i]);
+ uint64_t charLength = Utf8Utils::charLength(data[i], originLength);
+ if (charLength >= maxLength) {
+ charData = data[i];
+ length[i] = static_cast<int64_t>(
+ Utf8Utils::truncateBytesTo(maxLength, data[i], originLength));
+ } else {
+ charData = padBuffer.data();
+ // the padding is exactly 1 byte per char
+ length[i] = length[i] + static_cast<int64_t>(maxLength - charLength);
+ memcpy(padBuffer.data(), data[i], originLength);
+ memset(padBuffer.data() + originLength,
+ ' ',
+ static_cast<size_t>(length[i]) - originLength);
+ }
+
+ if (useDictionary) {
+ size_t index = dictionary.insert(charData, static_cast<size_t>(length[i]));
+ dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index));
+ } else {
+ directDataStream->write(charData, static_cast<size_t>(length[i]));
+ }
+
+ if (enableBloomFilter) {
+ bloomFilter->addBytes(data[i], length[i]);
+ }
+ strStats->update(charData, static_cast<size_t>(length[i]));
+ ++count;
+ }
+ }
+
+ if (!useDictionary) {
+ directLengthEncoder->add(length, numValues, notNull);
+ }
+
+ strStats->increase(count);
+ if (count < numValues) {
+ strStats->setHasNull(true);
+ }
+ }
+
+ class VarCharColumnWriter : public StringColumnWriter {
+ public:
+ VarCharColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ StringColumnWriter(type, factory, options),
+ maxLength(type.getMaximumLength()) {
+ // PASS
+ }
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ private:
+ uint64_t maxLength;
+ };
+
+ void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
+ if (charsBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to StringVectorBatch");
+ }
+
+ StringColumnStatisticsImpl* strStats =
+ dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (strStats == nullptr) {
+ throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ char* const* data = charsBatch->data.data() + offset;
+ int64_t* length = charsBatch->length.data() + offset;
+ const char* notNull = charsBatch->hasNulls ?
+ charsBatch->notNull.data() + offset : nullptr;
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ uint64_t itemLength = Utf8Utils::truncateBytesTo(
+ maxLength, data[i], static_cast<uint64_t>(length[i]));
+ length[i] = static_cast<int64_t>(itemLength);
+
+ if (useDictionary) {
+ size_t index = dictionary.insert(data[i], static_cast<size_t>(length[i]));
+ dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index));
+ } else {
+ directDataStream->write(data[i], static_cast<size_t>(length[i]));
+ }
+
+ if (enableBloomFilter) {
+ bloomFilter->addBytes(data[i], length[i]);
+ }
+ strStats->update(data[i], static_cast<size_t>(length[i]));
+ ++count;
+ }
+ }
+
+ if (!useDictionary) {
+ directLengthEncoder->add(length, numValues, notNull);
+ }
+
+ strStats->increase(count);
+ if (count < numValues) {
+ strStats->setHasNull(true);
+ }
+ }
+
+ class BinaryColumnWriter : public StringColumnWriter {
+ public:
+ BinaryColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ StringColumnWriter(type, factory, options) {
+ // PASS
+ }
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+ };
+
+ void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
+ if (binBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to StringVectorBatch");
+ }
+
+ BinaryColumnStatisticsImpl* binStats =
+ dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (binStats == nullptr) {
+ throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ char** data = binBatch->data.data() + offset;
+ int64_t* length = binBatch->length.data() + offset;
+ const char* notNull = binBatch->hasNulls ?
+ binBatch->notNull.data() + offset : nullptr;
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
+ if (!notNull || notNull[i]) {
+ directDataStream->write(data[i], unsignedLength);
+
+ binStats->update(unsignedLength);
+ ++count;
+ }
+ }
+ directLengthEncoder->add(length, numValues, notNull);
+ binStats->increase(count);
+ if (count < numValues) {
+ binStats->setHasNull(true);
+ }
+ }
+
+ class TimestampColumnWriter : public ColumnWriter {
+ public:
+ TimestampColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ protected:
+ std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
+
+ private:
+ RleVersion rleVersion;
+ const Timezone& timezone;
+ };
+
+ TimestampColumnWriter::TimestampColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(options.getRleVersion()),
+ timezone(getTimezoneByName("GMT")){
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ std::unique_ptr<BufferedOutputStream> secondaryStream =
+ factory.createStream(proto::Stream_Kind_SECONDARY);
+ secRleEncoder = createRleEncoder(std::move(dataStream),
+ true,
+ rleVersion,
+ memPool,
+ options.getAlignedBitpacking());
+ nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
+ false,
+ rleVersion,
+ memPool,
+ options.getAlignedBitpacking());
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ // Because the number of nanoseconds often has a large number of trailing zeros,
+ // the number has trailing decimal zero digits removed and the last three bits
+ // are used to record how many zeros were removed if the trailing zeros are
+ // more than 2. Thus 1000 nanoseconds would be serialized as 0x0a and
+ // 100000 would be serialized as 0x0c.
+ static int64_t formatNano(int64_t nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return (nanos) << 3;
+ } else {
+ nanos /= 100;
+ int64_t trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return (nanos) << 3 | trailingZeros;
+ }
+ }
+
+ void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ TimestampVectorBatch* tsBatch =
+ dynamic_cast<TimestampVectorBatch*>(&rowBatch);
+ if (tsBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to TimestampVectorBatch");
+ }
+
+ TimestampColumnStatisticsImpl* tsStats =
+ dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (tsStats == nullptr) {
+ throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const char* notNull = tsBatch->hasNulls ?
+ tsBatch->notNull.data() + offset : nullptr;
+ int64_t *secs = tsBatch->data.data() + offset;
+ int64_t *nanos = tsBatch->nanoseconds.data() + offset;
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i]) {
+ // TimestampVectorBatch already stores data in UTC
+ int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(millsUTC);
+ }
+ tsStats->update(millsUTC);
+
if (secs[i] < 0 && nanos[i] > 999999) {
- secs[i] += 1;
- }
-
- secs[i] -= timezone.getEpoch();
- nanos[i] = formatNano(nanos[i]);
- }
- }
- tsStats->increase(count);
- if (count < numValues) {
- tsStats->setHasNull(true);
- }
-
- secRleEncoder->add(secs, numValues, notNull);
- nanoRleEncoder->add(nanos, numValues, notNull);
- }
-
- void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream dataStream;
- dataStream.set_kind(proto::Stream_Kind_DATA);
- dataStream.set_column(static_cast<uint32_t>(columnId));
- dataStream.set_length(secRleEncoder->flush());
- streams.push_back(dataStream);
-
- proto::Stream secondaryStream;
- secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
- secondaryStream.set_column(static_cast<uint32_t>(columnId));
- secondaryStream.set_length(nanoRleEncoder->flush());
- streams.push_back(secondaryStream);
- }
-
- uint64_t TimestampColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += secRleEncoder->getBufferSize();
- size += nanoRleEncoder->getBufferSize();
- return size;
- }
-
- void TimestampColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(RleVersionMapper(rleVersion));
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void TimestampColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- secRleEncoder->recordPosition(rowIndexPosition.get());
- nanoRleEncoder->recordPosition(rowIndexPosition.get());
- }
-
- class DateColumnWriter : public IntegerColumnWriter {
- public:
- DateColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
- };
-
- DateColumnWriter::DateColumnWriter(
- const Type &type,
- const StreamsFactory &factory,
- const WriterOptions &options) :
- IntegerColumnWriter(type, factory, options) {
- // PASS
- }
-
- void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const LongVectorBatch* longBatch =
- dynamic_cast<const LongVectorBatch*>(&rowBatch);
- if (longBatch == nullptr) {
- throw InvalidArgument("Failed to cast to LongVectorBatch");
- }
-
- DateColumnStatisticsImpl* dateStats =
- dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get());
- if (dateStats == nullptr) {
- throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const int64_t* data = longBatch->data.data() + offset;
- const char* notNull = longBatch->hasNulls ?
- longBatch->notNull.data() + offset : nullptr;
-
- rleEncoder->add(data, numValues, notNull);
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- ++count;
- dateStats->update(static_cast<int32_t>(data[i]));
- if (enableBloomFilter) {
- bloomFilter->addLong(data[i]);
- }
- }
- }
- dateStats->increase(count);
- if (count < numValues) {
- dateStats->setHasNull(true);
- }
- }
-
- class Decimal64ColumnWriter : public ColumnWriter {
- public:
- static const uint32_t MAX_PRECISION_64 = 18;
- static const uint32_t MAX_PRECISION_128 = 38;
-
- Decimal64ColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void recordPosition() const override;
-
- protected:
- RleVersion rleVersion;
- uint64_t precision;
- uint64_t scale;
- std::unique_ptr<AppendOnlyBufferedStream> valueStream;
- std::unique_ptr<RleEncoder> scaleEncoder;
-
- private:
- char buffer[10];
- };
-
- Decimal64ColumnWriter::Decimal64ColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options),
- rleVersion(options.getRleVersion()),
- precision(type.getPrecision()),
- scale(type.getScale()) {
- valueStream.reset(new AppendOnlyBufferedStream(
- factory.createStream(proto::Stream_Kind_DATA)));
- std::unique_ptr<BufferedOutputStream> scaleStream =
- factory.createStream(proto::Stream_Kind_SECONDARY);
- scaleEncoder = createRleEncoder(std::move(scaleStream),
- true,
- rleVersion,
- memPool,
- options.getAlignedBitpacking());
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const Decimal64VectorBatch* decBatch =
- dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
- if (decBatch == nullptr) {
- throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
- }
-
- DecimalColumnStatisticsImpl* decStats =
- dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
- if (decStats == nullptr) {
- throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const char* notNull = decBatch->hasNulls ?
- decBatch->notNull.data() + offset : nullptr;
- const int64_t* values = decBatch->values.data() + offset;
-
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- int64_t val = zigZag(values[i]);
- char* data = buffer;
- while (true) {
- if ((val & ~0x7f) == 0) {
- *(data++) = (static_cast<char>(val));
- break;
- } else {
- *(data++) = static_cast<char>(0x80 | (val & 0x7f));
- // cast val to unsigned so as to force 0-fill right shift
- val = (static_cast<uint64_t>(val) >> 7);
- }
- }
- valueStream->write(buffer, static_cast<size_t>(data - buffer));
- ++count;
- if (enableBloomFilter) {
- std::string decimal = Decimal(
- values[i], static_cast<int32_t>(scale)).toString();
- bloomFilter->addBytes(
- decimal.c_str(), static_cast<int64_t>(decimal.size()));
- }
- decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
- }
- }
- decStats->increase(count);
- if (count < numValues) {
- decStats->setHasNull(true);
- }
- std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
- scaleEncoder->add(scales.data(), numValues, notNull);
- }
-
- void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream dataStream;
- dataStream.set_kind(proto::Stream_Kind_DATA);
- dataStream.set_column(static_cast<uint32_t>(columnId));
- dataStream.set_length(valueStream->flush());
- streams.push_back(dataStream);
-
- proto::Stream secondaryStream;
- secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
- secondaryStream.set_column(static_cast<uint32_t>(columnId));
- secondaryStream.set_length(scaleEncoder->flush());
- streams.push_back(secondaryStream);
- }
-
- uint64_t Decimal64ColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += valueStream->getSize();
- size += scaleEncoder->getBufferSize();
- return size;
- }
-
- void Decimal64ColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(RleVersionMapper(rleVersion));
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- }
-
- void Decimal64ColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- valueStream->recordPosition(rowIndexPosition.get());
- scaleEncoder->recordPosition(rowIndexPosition.get());
- }
-
- class Decimal128ColumnWriter : public Decimal64ColumnWriter {
- public:
- Decimal128ColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- private:
- char buffer[20];
- };
-
- Decimal128ColumnWriter::Decimal128ColumnWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- Decimal64ColumnWriter(type, factory, options) {
- // PASS
- }
-
- // Zigzag encoding moves the sign bit to the least significant bit using the
- // expression (val « 1) ^ (val » 63) and derives its name from the fact that
- // positive and negative numbers alternate once encoded.
- Int128 zigZagInt128(const Int128& value) {
- bool isNegative = value < 0;
- Int128 val = value.abs();
- val <<= 1;
- if (isNegative) {
- val -= 1;
- }
- return val;
- }
-
- void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- const Decimal128VectorBatch* decBatch =
- dynamic_cast<const Decimal128VectorBatch*>(&rowBatch);
- if (decBatch == nullptr) {
- throw InvalidArgument("Failed to cast to Decimal128VectorBatch");
- }
-
- DecimalColumnStatisticsImpl* decStats =
- dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
- if (decStats == nullptr) {
- throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const char* notNull = decBatch->hasNulls ?
- decBatch->notNull.data() + offset : nullptr;
- const Int128* values = decBatch->values.data() + offset;
-
- // The current encoding of decimal columns stores the integer representation
- // of the value as an unbounded length zigzag encoded base 128 varint.
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (!notNull || notNull[i]) {
- Int128 val = zigZagInt128(values[i]);
- char* data = buffer;
- while (true) {
- if ((val & ~0x7f) == 0) {
- *(data++) = (static_cast<char>(val.getLowBits()));
- break;
- } else {
- *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f));
- val >>= 7;
- }
- }
- valueStream->write(buffer, static_cast<size_t>(data - buffer));
-
- ++count;
- if (enableBloomFilter) {
- std::string decimal = Decimal(
- values[i], static_cast<int32_t>(scale)).toString();
- bloomFilter->addBytes(
- decimal.c_str(), static_cast<int64_t>(decimal.size()));
- }
- decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
- }
- }
- decStats->increase(count);
- if (count < numValues) {
- decStats->setHasNull(true);
- }
- std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
- scaleEncoder->add(scales.data(), numValues, notNull);
- }
-
- class ListColumnWriter : public ColumnWriter {
- public:
- ListColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
- ~ListColumnWriter() override;
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void mergeStripeStatsIntoFileStats() override;
-
- virtual void mergeRowGroupStatsIntoStripeStats() override;
-
- virtual void createRowIndexEntry() override;
-
- virtual void writeIndex(
- std::vector<proto::Stream> &streams) const override;
-
- virtual void recordPosition() const override;
-
- virtual void writeDictionary() override;
-
- virtual void reset() override;
-
- private:
- std::unique_ptr<RleEncoder> lengthEncoder;
- RleVersion rleVersion;
- std::unique_ptr<ColumnWriter> child;
- };
-
- ListColumnWriter::ListColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options),
- rleVersion(options.getRleVersion()){
-
- std::unique_ptr<BufferedOutputStream> lengthStream =
- factory.createStream(proto::Stream_Kind_LENGTH);
- lengthEncoder = createRleEncoder(std::move(lengthStream),
- false,
- rleVersion,
- memPool,
- options.getAlignedBitpacking());
-
- if (type.getSubtypeCount() == 1) {
- child = buildWriter(*type.getSubtype(0), factory, options);
- }
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- ListColumnWriter::~ListColumnWriter() {
- // PASS
- }
-
- void ListColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch);
- if (listBatch == nullptr) {
- throw InvalidArgument("Failed to cast to ListVectorBatch");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- int64_t* offsets = listBatch->offsets.data() + offset;
- const char* notNull = listBatch->hasNulls ?
- listBatch->notNull.data() + offset : nullptr;
-
- uint64_t elemOffset = static_cast<uint64_t>(offsets[0]);
- uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]);
-
- // translate offsets to lengths
- for (uint64_t i = 0; i != numValues; ++i) {
- offsets[i] = offsets[i + 1] - offsets[i];
- }
-
- // unnecessary to deal with null as elements are packed together
- if (child.get()) {
- child->add(*listBatch->elements, elemOffset, totalNumValues, nullptr);
- }
- lengthEncoder->add(offsets, numValues, notNull);
-
- if (enableIndex) {
- if (!notNull) {
- colIndexStatistics->increase(numValues);
- } else {
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull[i]) {
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(offsets[i]);
- }
- }
- }
- colIndexStatistics->increase(count);
- if (count < numValues) {
- colIndexStatistics->setHasNull(true);
- }
- }
- }
- }
-
- void ListColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_LENGTH);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(lengthEncoder->flush());
- streams.push_back(stream);
-
- if (child.get()) {
- child->flush(streams);
- }
- }
-
- void ListColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
- ColumnWriter::writeIndex(streams);
- if (child.get()) {
- child->writeIndex(streams);
- }
- }
-
- uint64_t ListColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- if (child.get()) {
- size += lengthEncoder->getBufferSize();
- size += child->getEstimatedSize();
- }
- return size;
- }
-
- void ListColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(RleVersionMapper(rleVersion));
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- if (child.get()) {
- child->getColumnEncoding(encodings);
- }
- }
-
- void ListColumnWriter::getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getStripeStatistics(stats);
- if (child.get()) {
- child->getStripeStatistics(stats);
- }
- }
-
- void ListColumnWriter::mergeStripeStatsIntoFileStats() {
- ColumnWriter::mergeStripeStatsIntoFileStats();
- if (child.get()) {
- child->mergeStripeStatsIntoFileStats();
- }
- }
-
- void ListColumnWriter::getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getFileStatistics(stats);
- if (child.get()) {
- child->getFileStatistics(stats);
- }
- }
-
- void ListColumnWriter::mergeRowGroupStatsIntoStripeStats() {
- ColumnWriter::mergeRowGroupStatsIntoStripeStats();
- if (child.get()) {
- child->mergeRowGroupStatsIntoStripeStats();
- }
- }
-
- void ListColumnWriter::createRowIndexEntry() {
- ColumnWriter::createRowIndexEntry();
- if (child.get()) {
- child->createRowIndexEntry();
- }
- }
-
- void ListColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- lengthEncoder->recordPosition(rowIndexPosition.get());
- }
-
- void ListColumnWriter::reset() {
- ColumnWriter::reset();
- if (child) {
- child->reset();
- }
- }
-
- void ListColumnWriter::writeDictionary() {
- if (child) {
- child->writeDictionary();
- }
- }
-
- class MapColumnWriter : public ColumnWriter {
- public:
- MapColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
- ~MapColumnWriter() override;
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void mergeStripeStatsIntoFileStats() override;
-
- virtual void mergeRowGroupStatsIntoStripeStats() override;
-
- virtual void createRowIndexEntry() override;
-
- virtual void writeIndex(
- std::vector<proto::Stream> &streams) const override;
-
- virtual void recordPosition() const override;
-
- virtual void writeDictionary() override;
-
- virtual void reset() override;
-
- private:
- std::unique_ptr<ColumnWriter> keyWriter;
- std::unique_ptr<ColumnWriter> elemWriter;
- std::unique_ptr<RleEncoder> lengthEncoder;
- RleVersion rleVersion;
- };
-
- MapColumnWriter::MapColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options),
- rleVersion(options.getRleVersion()){
- std::unique_ptr<BufferedOutputStream> lengthStream =
- factory.createStream(proto::Stream_Kind_LENGTH);
- lengthEncoder = createRleEncoder(std::move(lengthStream),
- false,
- rleVersion,
- memPool,
- options.getAlignedBitpacking());
-
- if (type.getSubtypeCount() > 0) {
- keyWriter = buildWriter(*type.getSubtype(0), factory, options);
- }
-
- if (type.getSubtypeCount() > 1) {
- elemWriter = buildWriter(*type.getSubtype(1), factory, options);
- }
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- MapColumnWriter::~MapColumnWriter() {
- // PASS
- }
-
- void MapColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch);
- if (mapBatch == nullptr) {
- throw InvalidArgument("Failed to cast to MapVectorBatch");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- int64_t* offsets = mapBatch->offsets.data() + offset;
- const char* notNull = mapBatch->hasNulls ?
- mapBatch->notNull.data() + offset : nullptr;
-
- uint64_t elemOffset = static_cast<uint64_t>(offsets[0]);
- uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]);
-
- // translate offsets to lengths
- for (uint64_t i = 0; i != numValues; ++i) {
- offsets[i] = offsets[i + 1] - offsets[i];
- }
-
- lengthEncoder->add(offsets, numValues, notNull);
-
- // unnecessary to deal with null as keys and values are packed together
- if (keyWriter.get()) {
- keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues, nullptr);
- }
- if (elemWriter.get()) {
- elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues, nullptr);
- }
-
- if (enableIndex) {
- if (!notNull) {
- colIndexStatistics->increase(numValues);
- } else {
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull[i]) {
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(offsets[i]);
- }
- }
- }
- colIndexStatistics->increase(count);
- if (count < numValues) {
- colIndexStatistics->setHasNull(true);
- }
- }
- }
- }
-
- void MapColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_LENGTH);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(lengthEncoder->flush());
- streams.push_back(stream);
-
- if (keyWriter.get()) {
- keyWriter->flush(streams);
- }
- if (elemWriter.get()) {
- elemWriter->flush(streams);
- }
- }
-
- void MapColumnWriter::writeIndex(
- std::vector<proto::Stream> &streams) const {
- ColumnWriter::writeIndex(streams);
- if (keyWriter.get()) {
- keyWriter->writeIndex(streams);
- }
- if (elemWriter.get()) {
- elemWriter->writeIndex(streams);
- }
- }
-
- uint64_t MapColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += lengthEncoder->getBufferSize();
- if (keyWriter.get()) {
- size += keyWriter->getEstimatedSize();
- }
- if (elemWriter.get()) {
- size += elemWriter->getEstimatedSize();
- }
- return size;
- }
-
- void MapColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(RleVersionMapper(rleVersion));
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- if (keyWriter.get()) {
- keyWriter->getColumnEncoding(encodings);
- }
- if (elemWriter.get()) {
- elemWriter->getColumnEncoding(encodings);
- }
- }
-
- void MapColumnWriter::getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getStripeStatistics(stats);
- if (keyWriter.get()) {
- keyWriter->getStripeStatistics(stats);
- }
- if (elemWriter.get()) {
- elemWriter->getStripeStatistics(stats);
- }
- }
-
- void MapColumnWriter::mergeStripeStatsIntoFileStats() {
- ColumnWriter::mergeStripeStatsIntoFileStats();
- if (keyWriter.get()) {
- keyWriter->mergeStripeStatsIntoFileStats();
- }
- if (elemWriter.get()) {
- elemWriter->mergeStripeStatsIntoFileStats();
- }
- }
-
- void MapColumnWriter::getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getFileStatistics(stats);
- if (keyWriter.get()) {
- keyWriter->getFileStatistics(stats);
- }
- if (elemWriter.get()) {
- elemWriter->getFileStatistics(stats);
- }
- }
-
- void MapColumnWriter::mergeRowGroupStatsIntoStripeStats() {
- ColumnWriter::mergeRowGroupStatsIntoStripeStats();
- if (keyWriter.get()) {
- keyWriter->mergeRowGroupStatsIntoStripeStats();
- }
- if (elemWriter.get()) {
- elemWriter->mergeRowGroupStatsIntoStripeStats();
- }
- }
-
- void MapColumnWriter::createRowIndexEntry() {
- ColumnWriter::createRowIndexEntry();
- if (keyWriter.get()) {
- keyWriter->createRowIndexEntry();
- }
- if (elemWriter.get()) {
- elemWriter->createRowIndexEntry();
- }
- }
-
- void MapColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- lengthEncoder->recordPosition(rowIndexPosition.get());
- }
-
- void MapColumnWriter::reset() {
- ColumnWriter::reset();
- if (keyWriter) {
- keyWriter->reset();
- }
- if (elemWriter) {
- elemWriter->reset();
- }
- }
-
- void MapColumnWriter::writeDictionary() {
- if (keyWriter) {
- keyWriter->writeDictionary();
- }
- if (elemWriter) {
- elemWriter->writeDictionary();
- }
- }
-
- class UnionColumnWriter : public ColumnWriter {
- public:
- UnionColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options);
- ~UnionColumnWriter() override;
-
- virtual void add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) override;
-
- virtual void flush(std::vector<proto::Stream>& streams) override;
-
- virtual uint64_t getEstimatedSize() const override;
-
- virtual void getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const override;
-
- virtual void getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const override;
-
- virtual void mergeStripeStatsIntoFileStats() override;
-
- virtual void mergeRowGroupStatsIntoStripeStats() override;
-
- virtual void createRowIndexEntry() override;
-
- virtual void writeIndex(
- std::vector<proto::Stream> &streams) const override;
-
- virtual void recordPosition() const override;
-
- virtual void writeDictionary() override;
-
- virtual void reset() override;
-
- private:
- std::unique_ptr<ByteRleEncoder> rleEncoder;
- std::vector<ColumnWriter*> children;
- };
-
- UnionColumnWriter::UnionColumnWriter(const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) :
- ColumnWriter(type, factory, options) {
-
- std::unique_ptr<BufferedOutputStream> dataStream =
- factory.createStream(proto::Stream_Kind_DATA);
- rleEncoder = createByteRleEncoder(std::move(dataStream));
-
- for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
- children.push_back(buildWriter(*type.getSubtype(i),
- factory,
- options).release());
- }
-
- if (enableIndex) {
- recordPosition();
- }
- }
-
- UnionColumnWriter::~UnionColumnWriter() {
- for (uint32_t i = 0; i < children.size(); ++i) {
- delete children[i];
- }
- }
-
- void UnionColumnWriter::add(ColumnVectorBatch& rowBatch,
- uint64_t offset,
- uint64_t numValues,
- const char* incomingMask) {
- UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch);
- if (unionBatch == nullptr) {
- throw InvalidArgument("Failed to cast to UnionVectorBatch");
- }
-
- ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
-
- const char* notNull = unionBatch->hasNulls ?
- unionBatch->notNull.data() + offset : nullptr;
- unsigned char * tags = unionBatch->tags.data() + offset;
- uint64_t * offsets = unionBatch->offsets.data() + offset;
-
- std::vector<int64_t> childOffset(children.size(), -1);
- std::vector<uint64_t> childLength(children.size(), 0);
-
- for (uint64_t i = 0; i != numValues; ++i) {
- if (childOffset[tags[i]] == -1) {
- childOffset[tags[i]] = static_cast<int64_t>(offsets[i]);
- }
- ++childLength[tags[i]];
- }
-
- rleEncoder->add(reinterpret_cast<char*>(tags), numValues, notNull);
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- if (childLength[i] > 0) {
- children[i]->add(*unionBatch->children[i],
- static_cast<uint64_t>(childOffset[i]),
- childLength[i], nullptr);
- }
- }
-
- // update stats
- if (enableIndex) {
- if (!notNull) {
- colIndexStatistics->increase(numValues);
- } else {
- uint64_t count = 0;
- for (uint64_t i = 0; i < numValues; ++i) {
- if (notNull[i]) {
- ++count;
- if (enableBloomFilter) {
- bloomFilter->addLong(tags[i]);
- }
- }
- }
- colIndexStatistics->increase(count);
- if (count < numValues) {
- colIndexStatistics->setHasNull(true);
- }
- }
- }
- }
-
- void UnionColumnWriter::flush(std::vector<proto::Stream>& streams) {
- ColumnWriter::flush(streams);
-
- proto::Stream stream;
- stream.set_kind(proto::Stream_Kind_DATA);
- stream.set_column(static_cast<uint32_t>(columnId));
- stream.set_length(rleEncoder->flush());
- streams.push_back(stream);
-
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->flush(streams);
- }
- }
-
- void UnionColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
- ColumnWriter::writeIndex(streams);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->writeIndex(streams);
- }
- }
-
- uint64_t UnionColumnWriter::getEstimatedSize() const {
- uint64_t size = ColumnWriter::getEstimatedSize();
- size += rleEncoder->getBufferSize();
- for (uint32_t i = 0; i < children.size(); ++i) {
- size += children[i]->getEstimatedSize();
- }
- return size;
- }
-
- void UnionColumnWriter::getColumnEncoding(
- std::vector<proto::ColumnEncoding>& encodings) const {
- proto::ColumnEncoding encoding;
- encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
- encoding.set_dictionarysize(0);
- if (enableBloomFilter) {
- encoding.set_bloomencoding(BloomFilterVersion::UTF8);
- }
- encodings.push_back(encoding);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->getColumnEncoding(encodings);
- }
- }
-
- void UnionColumnWriter::getStripeStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getStripeStatistics(stats);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->getStripeStatistics(stats);
- }
- }
-
- void UnionColumnWriter::mergeStripeStatsIntoFileStats() {
- ColumnWriter::mergeStripeStatsIntoFileStats();
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->mergeStripeStatsIntoFileStats();
- }
- }
-
- void UnionColumnWriter::getFileStatistics(
- std::vector<proto::ColumnStatistics>& stats) const {
- ColumnWriter::getFileStatistics(stats);
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->getFileStatistics(stats);
- }
- }
-
- void UnionColumnWriter::mergeRowGroupStatsIntoStripeStats() {
- ColumnWriter::mergeRowGroupStatsIntoStripeStats();
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->mergeRowGroupStatsIntoStripeStats();
- }
- }
-
- void UnionColumnWriter::createRowIndexEntry() {
- ColumnWriter::createRowIndexEntry();
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->createRowIndexEntry();
- }
- }
-
- void UnionColumnWriter::recordPosition() const {
- ColumnWriter::recordPosition();
- rleEncoder->recordPosition(rowIndexPosition.get());
- }
-
- void UnionColumnWriter::reset() {
- ColumnWriter::reset();
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->reset();
- }
- }
-
- void UnionColumnWriter::writeDictionary() {
- for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->writeDictionary();
- }
- }
-
- std::unique_ptr<ColumnWriter> buildWriter(
- const Type& type,
- const StreamsFactory& factory,
- const WriterOptions& options) {
- switch (static_cast<int64_t>(type.getKind())) {
- case STRUCT:
- return std::unique_ptr<ColumnWriter>(
- new StructColumnWriter(
- type,
- factory,
- options));
- case INT:
- case LONG:
- case SHORT:
- return std::unique_ptr<ColumnWriter>(
- new IntegerColumnWriter(
- type,
- factory,
- options));
- case BYTE:
- return std::unique_ptr<ColumnWriter>(
- new ByteColumnWriter(
- type,
- factory,
- options));
- case BOOLEAN:
- return std::unique_ptr<ColumnWriter>(
- new BooleanColumnWriter(
- type,
- factory,
- options));
- case DOUBLE:
- return std::unique_ptr<ColumnWriter>(
- new DoubleColumnWriter(
- type,
- factory,
- options,
- false));
- case FLOAT:
- return std::unique_ptr<ColumnWriter>(
- new DoubleColumnWriter(
- type,
- factory,
- options,
- true));
- case BINARY:
- return std::unique_ptr<ColumnWriter>(
- new BinaryColumnWriter(
- type,
- factory,
- options));
- case STRING:
- return std::unique_ptr<ColumnWriter>(
- new StringColumnWriter(
- type,
- factory,
- options));
- case CHAR:
- return std::unique_ptr<ColumnWriter>(
- new CharColumnWriter(
- type,
- factory,
- options));
- case VARCHAR:
- return std::unique_ptr<ColumnWriter>(
- new VarCharColumnWriter(
- type,
- factory,
- options));
- case DATE:
- return std::unique_ptr<ColumnWriter>(
- new DateColumnWriter(
- type,
- factory,
- options));
- case TIMESTAMP:
- return std::unique_ptr<ColumnWriter>(
- new TimestampColumnWriter(
- type,
- factory,
- options));
- case DECIMAL:
- if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
- return std::unique_ptr<ColumnWriter>(
- new Decimal64ColumnWriter(
- type,
- factory,
- options));
- } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) {
- return std::unique_ptr<ColumnWriter>(
- new Decimal128ColumnWriter(
- type,
- factory,
- options));
- } else {
- throw NotImplementedYet("Decimal precision more than 38 is not "
- "supported");
- }
- case LIST:
- return std::unique_ptr<ColumnWriter>(
- new ListColumnWriter(
- type,
- factory,
- options));
- case MAP:
- return std::unique_ptr<ColumnWriter>(
- new MapColumnWriter(
- type,
- factory,
- options));
- case UNION:
- return std::unique_ptr<ColumnWriter>(
- new UnionColumnWriter(
- type,
- factory,
- options));
- default:
- throw NotImplementedYet("Type is not supported yet for creating "
- "ColumnWriter.");
- }
- }
-}
+ secs[i] += 1;
+ }
+
+ secs[i] -= timezone.getEpoch();
+ nanos[i] = formatNano(nanos[i]);
+ }
+ }
+ tsStats->increase(count);
+ if (count < numValues) {
+ tsStats->setHasNull(true);
+ }
+
+ secRleEncoder->add(secs, numValues, notNull);
+ nanoRleEncoder->add(nanos, numValues, notNull);
+ }
+
+ void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream dataStream;
+ dataStream.set_kind(proto::Stream_Kind_DATA);
+ dataStream.set_column(static_cast<uint32_t>(columnId));
+ dataStream.set_length(secRleEncoder->flush());
+ streams.push_back(dataStream);
+
+ proto::Stream secondaryStream;
+ secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
+ secondaryStream.set_column(static_cast<uint32_t>(columnId));
+ secondaryStream.set_length(nanoRleEncoder->flush());
+ streams.push_back(secondaryStream);
+ }
+
+ uint64_t TimestampColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += secRleEncoder->getBufferSize();
+ size += nanoRleEncoder->getBufferSize();
+ return size;
+ }
+
+ void TimestampColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(RleVersionMapper(rleVersion));
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void TimestampColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ secRleEncoder->recordPosition(rowIndexPosition.get());
+ nanoRleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class DateColumnWriter : public IntegerColumnWriter {
+ public:
+ DateColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+ };
+
+ DateColumnWriter::DateColumnWriter(
+ const Type &type,
+ const StreamsFactory &factory,
+ const WriterOptions &options) :
+ IntegerColumnWriter(type, factory, options) {
+ // PASS
+ }
+
+ void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const LongVectorBatch* longBatch =
+ dynamic_cast<const LongVectorBatch*>(&rowBatch);
+ if (longBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to LongVectorBatch");
+ }
+
+ DateColumnStatisticsImpl* dateStats =
+ dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (dateStats == nullptr) {
+ throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const int64_t* data = longBatch->data.data() + offset;
+ const char* notNull = longBatch->hasNulls ?
+ longBatch->notNull.data() + offset : nullptr;
+
+ rleEncoder->add(data, numValues, notNull);
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ ++count;
+ dateStats->update(static_cast<int32_t>(data[i]));
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
+ }
+ }
+ dateStats->increase(count);
+ if (count < numValues) {
+ dateStats->setHasNull(true);
+ }
+ }
+
+ class Decimal64ColumnWriter : public ColumnWriter {
+ public:
+ static const uint32_t MAX_PRECISION_64 = 18;
+ static const uint32_t MAX_PRECISION_128 = 38;
+
+ Decimal64ColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ protected:
+ RleVersion rleVersion;
+ uint64_t precision;
+ uint64_t scale;
+ std::unique_ptr<AppendOnlyBufferedStream> valueStream;
+ std::unique_ptr<RleEncoder> scaleEncoder;
+
+ private:
+ char buffer[10];
+ };
+
+ Decimal64ColumnWriter::Decimal64ColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(options.getRleVersion()),
+ precision(type.getPrecision()),
+ scale(type.getScale()) {
+ valueStream.reset(new AppendOnlyBufferedStream(
+ factory.createStream(proto::Stream_Kind_DATA)));
+ std::unique_ptr<BufferedOutputStream> scaleStream =
+ factory.createStream(proto::Stream_Kind_SECONDARY);
+ scaleEncoder = createRleEncoder(std::move(scaleStream),
+ true,
+ rleVersion,
+ memPool,
+ options.getAlignedBitpacking());
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const Decimal64VectorBatch* decBatch =
+ dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
+ if (decBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
+ }
+
+ DecimalColumnStatisticsImpl* decStats =
+ dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (decStats == nullptr) {
+ throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const char* notNull = decBatch->hasNulls ?
+ decBatch->notNull.data() + offset : nullptr;
+ const int64_t* values = decBatch->values.data() + offset;
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ int64_t val = zigZag(values[i]);
+ char* data = buffer;
+ while (true) {
+ if ((val & ~0x7f) == 0) {
+ *(data++) = (static_cast<char>(val));
+ break;
+ } else {
+ *(data++) = static_cast<char>(0x80 | (val & 0x7f));
+ // cast val to unsigned so as to force 0-fill right shift
+ val = (static_cast<uint64_t>(val) >> 7);
+ }
+ }
+ valueStream->write(buffer, static_cast<size_t>(data - buffer));
+ ++count;
+ if (enableBloomFilter) {
+ std::string decimal = Decimal(
+ values[i], static_cast<int32_t>(scale)).toString();
+ bloomFilter->addBytes(
+ decimal.c_str(), static_cast<int64_t>(decimal.size()));
+ }
+ decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
+ }
+ }
+ decStats->increase(count);
+ if (count < numValues) {
+ decStats->setHasNull(true);
+ }
+ std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
+ scaleEncoder->add(scales.data(), numValues, notNull);
+ }
+
+ void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream dataStream;
+ dataStream.set_kind(proto::Stream_Kind_DATA);
+ dataStream.set_column(static_cast<uint32_t>(columnId));
+ dataStream.set_length(valueStream->flush());
+ streams.push_back(dataStream);
+
+ proto::Stream secondaryStream;
+ secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
+ secondaryStream.set_column(static_cast<uint32_t>(columnId));
+ secondaryStream.set_length(scaleEncoder->flush());
+ streams.push_back(secondaryStream);
+ }
+
+ uint64_t Decimal64ColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += valueStream->getSize();
+ size += scaleEncoder->getBufferSize();
+ return size;
+ }
+
+ void Decimal64ColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(RleVersionMapper(rleVersion));
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void Decimal64ColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ valueStream->recordPosition(rowIndexPosition.get());
+ scaleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ class Decimal128ColumnWriter : public Decimal64ColumnWriter {
+ public:
+ Decimal128ColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ private:
+ char buffer[20];
+ };
+
+ Decimal128ColumnWriter::Decimal128ColumnWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ Decimal64ColumnWriter(type, factory, options) {
+ // PASS
+ }
+
+ // Zigzag encoding moves the sign bit to the least significant bit using the
+ // expression (val « 1) ^ (val » 63) and derives its name from the fact that
+ // positive and negative numbers alternate once encoded.
+ Int128 zigZagInt128(const Int128& value) {
+ bool isNegative = value < 0;
+ Int128 val = value.abs();
+ val <<= 1;
+ if (isNegative) {
+ val -= 1;
+ }
+ return val;
+ }
+
+ void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const Decimal128VectorBatch* decBatch =
+ dynamic_cast<const Decimal128VectorBatch*>(&rowBatch);
+ if (decBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to Decimal128VectorBatch");
+ }
+
+ DecimalColumnStatisticsImpl* decStats =
+ dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (decStats == nullptr) {
+ throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const char* notNull = decBatch->hasNulls ?
+ decBatch->notNull.data() + offset : nullptr;
+ const Int128* values = decBatch->values.data() + offset;
+
+ // The current encoding of decimal columns stores the integer representation
+ // of the value as an unbounded length zigzag encoded base 128 varint.
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ Int128 val = zigZagInt128(values[i]);
+ char* data = buffer;
+ while (true) {
+ if ((val & ~0x7f) == 0) {
+ *(data++) = (static_cast<char>(val.getLowBits()));
+ break;
+ } else {
+ *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f));
+ val >>= 7;
+ }
+ }
+ valueStream->write(buffer, static_cast<size_t>(data - buffer));
+
+ ++count;
+ if (enableBloomFilter) {
+ std::string decimal = Decimal(
+ values[i], static_cast<int32_t>(scale)).toString();
+ bloomFilter->addBytes(
+ decimal.c_str(), static_cast<int64_t>(decimal.size()));
+ }
+ decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
+ }
+ }
+ decStats->increase(count);
+ if (count < numValues) {
+ decStats->setHasNull(true);
+ }
+ std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
+ scaleEncoder->add(scales.data(), numValues, notNull);
+ }
+
+ class ListColumnWriter : public ColumnWriter {
+ public:
+ ListColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+ ~ListColumnWriter() override;
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void mergeStripeStatsIntoFileStats() override;
+
+ virtual void mergeRowGroupStatsIntoStripeStats() override;
+
+ virtual void createRowIndexEntry() override;
+
+ virtual void writeIndex(
+ std::vector<proto::Stream> &streams) const override;
+
+ virtual void recordPosition() const override;
+
+ virtual void writeDictionary() override;
+
+ virtual void reset() override;
+
+ private:
+ std::unique_ptr<RleEncoder> lengthEncoder;
+ RleVersion rleVersion;
+ std::unique_ptr<ColumnWriter> child;
+ };
+
+ ListColumnWriter::ListColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(options.getRleVersion()){
+
+ std::unique_ptr<BufferedOutputStream> lengthStream =
+ factory.createStream(proto::Stream_Kind_LENGTH);
+ lengthEncoder = createRleEncoder(std::move(lengthStream),
+ false,
+ rleVersion,
+ memPool,
+ options.getAlignedBitpacking());
+
+ if (type.getSubtypeCount() == 1) {
+ child = buildWriter(*type.getSubtype(0), factory, options);
+ }
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ ListColumnWriter::~ListColumnWriter() {
+ // PASS
+ }
+
+ void ListColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch);
+ if (listBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to ListVectorBatch");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ int64_t* offsets = listBatch->offsets.data() + offset;
+ const char* notNull = listBatch->hasNulls ?
+ listBatch->notNull.data() + offset : nullptr;
+
+ uint64_t elemOffset = static_cast<uint64_t>(offsets[0]);
+ uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]);
+
+ // translate offsets to lengths
+ for (uint64_t i = 0; i != numValues; ++i) {
+ offsets[i] = offsets[i + 1] - offsets[i];
+ }
+
+ // unnecessary to deal with null as elements are packed together
+ if (child.get()) {
+ child->add(*listBatch->elements, elemOffset, totalNumValues, nullptr);
+ }
+ lengthEncoder->add(offsets, numValues, notNull);
+
+ if (enableIndex) {
+ if (!notNull) {
+ colIndexStatistics->increase(numValues);
+ } else {
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(offsets[i]);
+ }
+ }
+ }
+ colIndexStatistics->increase(count);
+ if (count < numValues) {
+ colIndexStatistics->setHasNull(true);
+ }
+ }
+ }
+ }
+
+ void ListColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_LENGTH);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(lengthEncoder->flush());
+ streams.push_back(stream);
+
+ if (child.get()) {
+ child->flush(streams);
+ }
+ }
+
+ void ListColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
+ ColumnWriter::writeIndex(streams);
+ if (child.get()) {
+ child->writeIndex(streams);
+ }
+ }
+
+ uint64_t ListColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ if (child.get()) {
+ size += lengthEncoder->getBufferSize();
+ size += child->getEstimatedSize();
+ }
+ return size;
+ }
+
+ void ListColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(RleVersionMapper(rleVersion));
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ if (child.get()) {
+ child->getColumnEncoding(encodings);
+ }
+ }
+
+ void ListColumnWriter::getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getStripeStatistics(stats);
+ if (child.get()) {
+ child->getStripeStatistics(stats);
+ }
+ }
+
+ void ListColumnWriter::mergeStripeStatsIntoFileStats() {
+ ColumnWriter::mergeStripeStatsIntoFileStats();
+ if (child.get()) {
+ child->mergeStripeStatsIntoFileStats();
+ }
+ }
+
+ void ListColumnWriter::getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getFileStatistics(stats);
+ if (child.get()) {
+ child->getFileStatistics(stats);
+ }
+ }
+
+ void ListColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+ ColumnWriter::mergeRowGroupStatsIntoStripeStats();
+ if (child.get()) {
+ child->mergeRowGroupStatsIntoStripeStats();
+ }
+ }
+
+ void ListColumnWriter::createRowIndexEntry() {
+ ColumnWriter::createRowIndexEntry();
+ if (child.get()) {
+ child->createRowIndexEntry();
+ }
+ }
+
+ void ListColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ lengthEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ void ListColumnWriter::reset() {
+ ColumnWriter::reset();
+ if (child) {
+ child->reset();
+ }
+ }
+
+ void ListColumnWriter::writeDictionary() {
+ if (child) {
+ child->writeDictionary();
+ }
+ }
+
+ class MapColumnWriter : public ColumnWriter {
+ public:
+ MapColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+ ~MapColumnWriter() override;
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void mergeStripeStatsIntoFileStats() override;
+
+ virtual void mergeRowGroupStatsIntoStripeStats() override;
+
+ virtual void createRowIndexEntry() override;
+
+ virtual void writeIndex(
+ std::vector<proto::Stream> &streams) const override;
+
+ virtual void recordPosition() const override;
+
+ virtual void writeDictionary() override;
+
+ virtual void reset() override;
+
+ private:
+ std::unique_ptr<ColumnWriter> keyWriter;
+ std::unique_ptr<ColumnWriter> elemWriter;
+ std::unique_ptr<RleEncoder> lengthEncoder;
+ RleVersion rleVersion;
+ };
+
+ MapColumnWriter::MapColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ rleVersion(options.getRleVersion()){
+ std::unique_ptr<BufferedOutputStream> lengthStream =
+ factory.createStream(proto::Stream_Kind_LENGTH);
+ lengthEncoder = createRleEncoder(std::move(lengthStream),
+ false,
+ rleVersion,
+ memPool,
+ options.getAlignedBitpacking());
+
+ if (type.getSubtypeCount() > 0) {
+ keyWriter = buildWriter(*type.getSubtype(0), factory, options);
+ }
+
+ if (type.getSubtypeCount() > 1) {
+ elemWriter = buildWriter(*type.getSubtype(1), factory, options);
+ }
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ MapColumnWriter::~MapColumnWriter() {
+ // PASS
+ }
+
+ void MapColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch);
+ if (mapBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to MapVectorBatch");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ int64_t* offsets = mapBatch->offsets.data() + offset;
+ const char* notNull = mapBatch->hasNulls ?
+ mapBatch->notNull.data() + offset : nullptr;
+
+ uint64_t elemOffset = static_cast<uint64_t>(offsets[0]);
+ uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]);
+
+ // translate offsets to lengths
+ for (uint64_t i = 0; i != numValues; ++i) {
+ offsets[i] = offsets[i + 1] - offsets[i];
+ }
+
+ lengthEncoder->add(offsets, numValues, notNull);
+
+ // unnecessary to deal with null as keys and values are packed together
+ if (keyWriter.get()) {
+ keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues, nullptr);
+ }
+ if (elemWriter.get()) {
+ elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues, nullptr);
+ }
+
+ if (enableIndex) {
+ if (!notNull) {
+ colIndexStatistics->increase(numValues);
+ } else {
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(offsets[i]);
+ }
+ }
+ }
+ colIndexStatistics->increase(count);
+ if (count < numValues) {
+ colIndexStatistics->setHasNull(true);
+ }
+ }
+ }
+ }
+
+ void MapColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_LENGTH);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(lengthEncoder->flush());
+ streams.push_back(stream);
+
+ if (keyWriter.get()) {
+ keyWriter->flush(streams);
+ }
+ if (elemWriter.get()) {
+ elemWriter->flush(streams);
+ }
+ }
+
+ void MapColumnWriter::writeIndex(
+ std::vector<proto::Stream> &streams) const {
+ ColumnWriter::writeIndex(streams);
+ if (keyWriter.get()) {
+ keyWriter->writeIndex(streams);
+ }
+ if (elemWriter.get()) {
+ elemWriter->writeIndex(streams);
+ }
+ }
+
+ uint64_t MapColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += lengthEncoder->getBufferSize();
+ if (keyWriter.get()) {
+ size += keyWriter->getEstimatedSize();
+ }
+ if (elemWriter.get()) {
+ size += elemWriter->getEstimatedSize();
+ }
+ return size;
+ }
+
+ void MapColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(RleVersionMapper(rleVersion));
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ if (keyWriter.get()) {
+ keyWriter->getColumnEncoding(encodings);
+ }
+ if (elemWriter.get()) {
+ elemWriter->getColumnEncoding(encodings);
+ }
+ }
+
+ void MapColumnWriter::getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getStripeStatistics(stats);
+ if (keyWriter.get()) {
+ keyWriter->getStripeStatistics(stats);
+ }
+ if (elemWriter.get()) {
+ elemWriter->getStripeStatistics(stats);
+ }
+ }
+
+ void MapColumnWriter::mergeStripeStatsIntoFileStats() {
+ ColumnWriter::mergeStripeStatsIntoFileStats();
+ if (keyWriter.get()) {
+ keyWriter->mergeStripeStatsIntoFileStats();
+ }
+ if (elemWriter.get()) {
+ elemWriter->mergeStripeStatsIntoFileStats();
+ }
+ }
+
+ void MapColumnWriter::getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getFileStatistics(stats);
+ if (keyWriter.get()) {
+ keyWriter->getFileStatistics(stats);
+ }
+ if (elemWriter.get()) {
+ elemWriter->getFileStatistics(stats);
+ }
+ }
+
+ void MapColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+ ColumnWriter::mergeRowGroupStatsIntoStripeStats();
+ if (keyWriter.get()) {
+ keyWriter->mergeRowGroupStatsIntoStripeStats();
+ }
+ if (elemWriter.get()) {
+ elemWriter->mergeRowGroupStatsIntoStripeStats();
+ }
+ }
+
+ void MapColumnWriter::createRowIndexEntry() {
+ ColumnWriter::createRowIndexEntry();
+ if (keyWriter.get()) {
+ keyWriter->createRowIndexEntry();
+ }
+ if (elemWriter.get()) {
+ elemWriter->createRowIndexEntry();
+ }
+ }
+
+ void MapColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ lengthEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ void MapColumnWriter::reset() {
+ ColumnWriter::reset();
+ if (keyWriter) {
+ keyWriter->reset();
+ }
+ if (elemWriter) {
+ elemWriter->reset();
+ }
+ }
+
+ void MapColumnWriter::writeDictionary() {
+ if (keyWriter) {
+ keyWriter->writeDictionary();
+ }
+ if (elemWriter) {
+ elemWriter->writeDictionary();
+ }
+ }
+
+ class UnionColumnWriter : public ColumnWriter {
+ public:
+ UnionColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+ ~UnionColumnWriter() override;
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const override;
+
+ virtual void mergeStripeStatsIntoFileStats() override;
+
+ virtual void mergeRowGroupStatsIntoStripeStats() override;
+
+ virtual void createRowIndexEntry() override;
+
+ virtual void writeIndex(
+ std::vector<proto::Stream> &streams) const override;
+
+ virtual void recordPosition() const override;
+
+ virtual void writeDictionary() override;
+
+ virtual void reset() override;
+
+ private:
+ std::unique_ptr<ByteRleEncoder> rleEncoder;
+ std::vector<ColumnWriter*> children;
+ };
+
+ UnionColumnWriter::UnionColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options) {
+
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ rleEncoder = createByteRleEncoder(std::move(dataStream));
+
+ for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
+ children.push_back(buildWriter(*type.getSubtype(i),
+ factory,
+ options).release());
+ }
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ UnionColumnWriter::~UnionColumnWriter() {
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ delete children[i];
+ }
+ }
+
+ void UnionColumnWriter::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch);
+ if (unionBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to UnionVectorBatch");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const char* notNull = unionBatch->hasNulls ?
+ unionBatch->notNull.data() + offset : nullptr;
+ unsigned char * tags = unionBatch->tags.data() + offset;
+ uint64_t * offsets = unionBatch->offsets.data() + offset;
+
+ std::vector<int64_t> childOffset(children.size(), -1);
+ std::vector<uint64_t> childLength(children.size(), 0);
+
+ for (uint64_t i = 0; i != numValues; ++i) {
+ if (childOffset[tags[i]] == -1) {
+ childOffset[tags[i]] = static_cast<int64_t>(offsets[i]);
+ }
+ ++childLength[tags[i]];
+ }
+
+ rleEncoder->add(reinterpret_cast<char*>(tags), numValues, notNull);
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ if (childLength[i] > 0) {
+ children[i]->add(*unionBatch->children[i],
+ static_cast<uint64_t>(childOffset[i]),
+ childLength[i], nullptr);
+ }
+ }
+
+ // update stats
+ if (enableIndex) {
+ if (!notNull) {
+ colIndexStatistics->increase(numValues);
+ } else {
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(tags[i]);
+ }
+ }
+ }
+ colIndexStatistics->increase(count);
+ if (count < numValues) {
+ colIndexStatistics->setHasNull(true);
+ }
+ }
+ }
+ }
+
+ void UnionColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream stream;
+ stream.set_kind(proto::Stream_Kind_DATA);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(rleEncoder->flush());
+ streams.push_back(stream);
+
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->flush(streams);
+ }
+ }
+
+ void UnionColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
+ ColumnWriter::writeIndex(streams);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->writeIndex(streams);
+ }
+ }
+
+ uint64_t UnionColumnWriter::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += rleEncoder->getBufferSize();
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ size += children[i]->getEstimatedSize();
+ }
+ return size;
+ }
+
+ void UnionColumnWriter::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->getColumnEncoding(encodings);
+ }
+ }
+
+ void UnionColumnWriter::getStripeStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getStripeStatistics(stats);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->getStripeStatistics(stats);
+ }
+ }
+
+ void UnionColumnWriter::mergeStripeStatsIntoFileStats() {
+ ColumnWriter::mergeStripeStatsIntoFileStats();
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->mergeStripeStatsIntoFileStats();
+ }
+ }
+
+ void UnionColumnWriter::getFileStatistics(
+ std::vector<proto::ColumnStatistics>& stats) const {
+ ColumnWriter::getFileStatistics(stats);
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->getFileStatistics(stats);
+ }
+ }
+
+ void UnionColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+ ColumnWriter::mergeRowGroupStatsIntoStripeStats();
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->mergeRowGroupStatsIntoStripeStats();
+ }
+ }
+
+ void UnionColumnWriter::createRowIndexEntry() {
+ ColumnWriter::createRowIndexEntry();
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->createRowIndexEntry();
+ }
+ }
+
+ void UnionColumnWriter::recordPosition() const {
+ ColumnWriter::recordPosition();
+ rleEncoder->recordPosition(rowIndexPosition.get());
+ }
+
+ void UnionColumnWriter::reset() {
+ ColumnWriter::reset();
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->reset();
+ }
+ }
+
+ void UnionColumnWriter::writeDictionary() {
+ for (uint32_t i = 0; i < children.size(); ++i) {
+ children[i]->writeDictionary();
+ }
+ }
+
+ std::unique_ptr<ColumnWriter> buildWriter(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) {
+ switch (static_cast<int64_t>(type.getKind())) {
+ case STRUCT:
+ return std::unique_ptr<ColumnWriter>(
+ new StructColumnWriter(
+ type,
+ factory,
+ options));
+ case INT:
+ case LONG:
+ case SHORT:
+ return std::unique_ptr<ColumnWriter>(
+ new IntegerColumnWriter(
+ type,
+ factory,
+ options));
+ case BYTE:
+ return std::unique_ptr<ColumnWriter>(
+ new ByteColumnWriter(
+ type,
+ factory,
+ options));
+ case BOOLEAN:
+ return std::unique_ptr<ColumnWriter>(
+ new BooleanColumnWriter(
+ type,
+ factory,
+ options));
+ case DOUBLE:
+ return std::unique_ptr<ColumnWriter>(
+ new DoubleColumnWriter(
+ type,
+ factory,
+ options,
+ false));
+ case FLOAT:
+ return std::unique_ptr<ColumnWriter>(
+ new DoubleColumnWriter(
+ type,
+ factory,
+ options,
+ true));
+ case BINARY:
+ return std::unique_ptr<ColumnWriter>(
+ new BinaryColumnWriter(
+ type,
+ factory,
+ options));
+ case STRING:
+ return std::unique_ptr<ColumnWriter>(
+ new StringColumnWriter(
+ type,
+ factory,
+ options));
+ case CHAR:
+ return std::unique_ptr<ColumnWriter>(
+ new CharColumnWriter(
+ type,
+ factory,
+ options));
+ case VARCHAR:
+ return std::unique_ptr<ColumnWriter>(
+ new VarCharColumnWriter(
+ type,
+ factory,
+ options));
+ case DATE:
+ return std::unique_ptr<ColumnWriter>(
+ new DateColumnWriter(
+ type,
+ factory,
+ options));
+ case TIMESTAMP:
+ return std::unique_ptr<ColumnWriter>(
+ new TimestampColumnWriter(
+ type,
+ factory,
+ options));
+ case DECIMAL:
+ if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
+ return std::unique_ptr<ColumnWriter>(
+ new Decimal64ColumnWriter(
+ type,
+ factory,
+ options));
+ } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) {
+ return std::unique_ptr<ColumnWriter>(
+ new Decimal128ColumnWriter(
+ type,
+ factory,
+ options));
+ } else {
+ throw NotImplementedYet("Decimal precision more than 38 is not "
+ "supported");
+ }
+ case LIST:
+ return std::unique_ptr<ColumnWriter>(
+ new ListColumnWriter(
+ type,
+ factory,
+ options));
+ case MAP:
+ return std::unique_ptr<ColumnWriter>(
+ new MapColumnWriter(
+ type,
+ factory,
+ options));
+ case UNION:
+ return std::unique_ptr<ColumnWriter>(
+ new UnionColumnWriter(
+ type,
+ factory,
+ options));
+ default:
+ throw NotImplementedYet("Type is not supported yet for creating "
+ "ColumnWriter.");
+ }
+ }
+}