diff options
author | iaz1607 <iaz1607@yandex-team.ru> | 2022-02-10 16:45:37 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:37 +0300 |
commit | 94e51c602b555459333b3c6ae92476c424c930bc (patch) | |
tree | b2cc84ee7850122e7ccf51d0ea21e4fa7e7a5685 /contrib/libs/apache/orc/c++/src/Writer.cc | |
parent | e5437feb4ac2d2dc044e1090b9312dde5ef197e0 (diff) | |
download | ydb-94e51c602b555459333b3c6ae92476c424c930bc.tar.gz |
Restoring authorship annotation for <iaz1607@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/apache/orc/c++/src/Writer.cc')
-rw-r--r-- | contrib/libs/apache/orc/c++/src/Writer.cc | 1280 |
1 files changed, 640 insertions, 640 deletions
diff --git a/contrib/libs/apache/orc/c++/src/Writer.cc b/contrib/libs/apache/orc/c++/src/Writer.cc index 8b13750865..b5bd19b304 100644 --- a/contrib/libs/apache/orc/c++/src/Writer.cc +++ b/contrib/libs/apache/orc/c++/src/Writer.cc @@ -1,641 +1,641 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "orc/Common.hh" -#include "orc/OrcFile.hh" - -#include "ColumnWriter.hh" -#include "Timezone.hh" - -#include <memory> - -namespace orc { - - struct WriterOptionsPrivate { - uint64_t stripeSize; - uint64_t compressionBlockSize; - uint64_t rowIndexStride; - CompressionKind compression; - CompressionStrategy compressionStrategy; - MemoryPool* memoryPool; - double paddingTolerance; - std::ostream* errorStream; - FileVersion fileVersion; - double dictionaryKeySizeThreshold; - bool enableIndex; - std::set<uint64_t> columnsUseBloomFilter; - double bloomFilterFalsePositiveProb; - BloomFilterVersion bloomFilterVersion; - - WriterOptionsPrivate() : - fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12 - stripeSize = 64 * 1024 * 1024; // 64M - compressionBlockSize = 64 * 1024; // 64K - rowIndexStride = 10000; - compression = CompressionKind_ZLIB; - compressionStrategy = CompressionStrategy_SPEED; - memoryPool = getDefaultPool(); - paddingTolerance = 0.0; - errorStream = &std::cerr; - dictionaryKeySizeThreshold = 0.0; - enableIndex = true; - bloomFilterFalsePositiveProb = 0.05; - bloomFilterVersion = UTF8; - } - }; - - WriterOptions::WriterOptions(): - privateBits(std::unique_ptr<WriterOptionsPrivate> - (new WriterOptionsPrivate())) { - // PASS - } - - WriterOptions::WriterOptions(const WriterOptions& rhs): - privateBits(std::unique_ptr<WriterOptionsPrivate> - (new WriterOptionsPrivate(*(rhs.privateBits.get())))) { - // PASS - } - - WriterOptions::WriterOptions(WriterOptions& rhs) { - // swap privateBits with rhs - WriterOptionsPrivate* l = privateBits.release(); - privateBits.reset(rhs.privateBits.release()); - rhs.privateBits.reset(l); - } - - WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) { - if (this != &rhs) { - privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get()))); - } - return *this; - } - - WriterOptions::~WriterOptions() { - // PASS - } - RleVersion WriterOptions::getRleVersion() const { - if(privateBits->fileVersion == FileVersion::v_0_11()) - { - return RleVersion_1; - } - - return RleVersion_2; - } - - WriterOptions& WriterOptions::setStripeSize(uint64_t size) { - privateBits->stripeSize = size; - return *this; - } - - uint64_t WriterOptions::getStripeSize() const { - return privateBits->stripeSize; - } - - WriterOptions& WriterOptions::setCompressionBlockSize(uint64_t size) { - privateBits->compressionBlockSize = size; - return *this; - } - - uint64_t WriterOptions::getCompressionBlockSize() const { - return privateBits->compressionBlockSize; - } - - WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) { - privateBits->rowIndexStride = stride; - privateBits->enableIndex = (stride != 0); - return *this; - } - - uint64_t WriterOptions::getRowIndexStride() const { - return privateBits->rowIndexStride; - } - - WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) { - privateBits->dictionaryKeySizeThreshold = val; - return *this; - } - - double WriterOptions::getDictionaryKeySizeThreshold() const { - return privateBits->dictionaryKeySizeThreshold; - } - - WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) { - // Only Hive_0_11 and Hive_0_12 version are supported currently - if (version.getMajor() == 0 && (version.getMinor() == 11 || version.getMinor() == 12)) { - privateBits->fileVersion = version; - return *this; - } - throw std::logic_error("Unsupported file version specified."); - } - - FileVersion WriterOptions::getFileVersion() const { - return privateBits->fileVersion; - } - - WriterOptions& WriterOptions::setCompression(CompressionKind comp) { - privateBits->compression = comp; - return *this; - } - - CompressionKind WriterOptions::getCompression() const { - return privateBits->compression; - } - - WriterOptions& WriterOptions::setCompressionStrategy( - CompressionStrategy strategy) { - privateBits->compressionStrategy = strategy; - return *this; - } - - CompressionStrategy WriterOptions::getCompressionStrategy() const { - return privateBits->compressionStrategy; - } - - bool WriterOptions::getAlignedBitpacking() const { - return privateBits->compressionStrategy == CompressionStrategy ::CompressionStrategy_SPEED; - } - - WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) { - privateBits->paddingTolerance = tolerance; - return *this; - } - - double WriterOptions::getPaddingTolerance() const { - return privateBits->paddingTolerance; - } - - WriterOptions& WriterOptions::setMemoryPool(MemoryPool* memoryPool) { - privateBits->memoryPool = memoryPool; - return *this; - } - - MemoryPool* WriterOptions::getMemoryPool() const { - return privateBits->memoryPool; - } - - WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) { - privateBits->errorStream = &errStream; - return *this; - } - - std::ostream* WriterOptions::getErrorStream() const { - return privateBits->errorStream; - } - - bool WriterOptions::getEnableIndex() const { - return privateBits->enableIndex; - } - - bool WriterOptions::getEnableDictionary() const { - return privateBits->dictionaryKeySizeThreshold > 0.0; - } - - WriterOptions& WriterOptions::setColumnsUseBloomFilter( - const std::set<uint64_t>& columns) { - privateBits->columnsUseBloomFilter = columns; - return *this; - } - - bool WriterOptions::isColumnUseBloomFilter(uint64_t column) const { - return privateBits->columnsUseBloomFilter.find(column) != - privateBits->columnsUseBloomFilter.end(); - } - - WriterOptions& WriterOptions::setBloomFilterFPP(double fpp) { - privateBits->bloomFilterFalsePositiveProb = fpp; - return *this; - } - - double WriterOptions::getBloomFilterFPP() const { - return privateBits->bloomFilterFalsePositiveProb; - } - - // delibrately not provide setter to write bloom filter version because - // we only support UTF8 for now. - BloomFilterVersion WriterOptions::getBloomFilterVersion() const { - return privateBits->bloomFilterVersion; - } - - Writer::~Writer() { - // PASS - } - - class WriterImpl : public Writer { - private: - std::unique_ptr<ColumnWriter> columnWriter; - std::unique_ptr<BufferedOutputStream> compressionStream; - std::unique_ptr<BufferedOutputStream> bufferedStream; - std::unique_ptr<StreamsFactory> streamsFactory; - OutputStream* outStream; - WriterOptions options; - const Type& type; - uint64_t stripeRows, totalRows, indexRows; - uint64_t currentOffset; - proto::Footer fileFooter; - proto::PostScript postScript; - proto::StripeInformation stripeInfo; - proto::Metadata metadata; - - static const char* magicId; - static const WriterId writerId; - - public: - WriterImpl( - const Type& type, - OutputStream* stream, - const WriterOptions& options); - - std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) - const override; - - void add(ColumnVectorBatch& rowsToAdd) override; - - void close() override; - - void addUserMetadata(const std::string name, const std::string value) override; - - private: - void init(); - void initStripe(); - void writeStripe(); - void writeMetadata(); - void writeFileFooter(); - void writePostscript(); - void buildFooterType(const Type& t, proto::Footer& footer, uint32_t& index); - static proto::CompressionKind convertCompressionKind( - const CompressionKind& kind); - }; - - const char * WriterImpl::magicId = "ORC"; - - const WriterId WriterImpl::writerId = WriterId::ORC_CPP_WRITER; - - WriterImpl::WriterImpl( - const Type& t, - OutputStream* stream, - const WriterOptions& opts) : - outStream(stream), - options(opts), - type(t) { - streamsFactory = createStreamsFactory(options, outStream); - columnWriter = buildWriter(type, *streamsFactory, options); - stripeRows = totalRows = indexRows = 0; - currentOffset = 0; - - // compression stream for stripe footer, file footer and metadata - compressionStream = createCompressor( - options.getCompression(), - outStream, - options.getCompressionStrategy(), - 1 * 1024 * 1024, // buffer capacity: 1M - options.getCompressionBlockSize(), - *options.getMemoryPool()); - - // uncompressed stream for post script - bufferedStream.reset(new BufferedOutputStream( - *options.getMemoryPool(), - outStream, - 1024, // buffer capacity: 1024 bytes - options.getCompressionBlockSize())); - - init(); - } - - std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size) - const { - return type.createRowBatch(size, *options.getMemoryPool()); - } - - void WriterImpl::add(ColumnVectorBatch& rowsToAdd) { - if (options.getEnableIndex()) { - uint64_t pos = 0; - uint64_t chunkSize = 0; - uint64_t rowIndexStride = options.getRowIndexStride(); - while (pos < rowsToAdd.numElements) { - chunkSize = std::min(rowsToAdd.numElements - pos, - rowIndexStride - indexRows); - columnWriter->add(rowsToAdd, pos, chunkSize, nullptr); - - pos += chunkSize; - indexRows += chunkSize; - stripeRows += chunkSize; - - if (indexRows >= rowIndexStride) { - columnWriter->createRowIndexEntry(); - indexRows = 0; - } - } - } else { - stripeRows += rowsToAdd.numElements; - columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements, nullptr); - } - - if (columnWriter->getEstimatedSize() >= options.getStripeSize()) { - writeStripe(); - } - } - - void WriterImpl::close() { - if (stripeRows > 0) { - writeStripe(); - } - writeMetadata(); - writeFileFooter(); - writePostscript(); - outStream->close(); - } - - void WriterImpl::addUserMetadata(const std::string name, const std::string value){ - proto::UserMetadataItem* userMetadataItem = fileFooter.add_metadata(); - userMetadataItem->set_name(TString(name)); - userMetadataItem->set_value(TString(value)); - } - - void WriterImpl::init() { - // Write file header - const static size_t magicIdLength = strlen(WriterImpl::magicId); - outStream->write(WriterImpl::magicId, magicIdLength); - currentOffset += magicIdLength; - - // Initialize file footer - fileFooter.set_headerlength(currentOffset); - fileFooter.set_contentlength(0); - fileFooter.set_numberofrows(0); - fileFooter.set_rowindexstride( - static_cast<uint32_t>(options.getRowIndexStride())); - fileFooter.set_writer(writerId); +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "orc/Common.hh" +#include "orc/OrcFile.hh" + +#include "ColumnWriter.hh" +#include "Timezone.hh" + +#include <memory> + +namespace orc { + + struct WriterOptionsPrivate { + uint64_t stripeSize; + uint64_t compressionBlockSize; + uint64_t rowIndexStride; + CompressionKind compression; + CompressionStrategy compressionStrategy; + MemoryPool* memoryPool; + double paddingTolerance; + std::ostream* errorStream; + FileVersion fileVersion; + double dictionaryKeySizeThreshold; + bool enableIndex; + std::set<uint64_t> columnsUseBloomFilter; + double bloomFilterFalsePositiveProb; + BloomFilterVersion bloomFilterVersion; + + WriterOptionsPrivate() : + fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12 + stripeSize = 64 * 1024 * 1024; // 64M + compressionBlockSize = 64 * 1024; // 64K + rowIndexStride = 10000; + compression = CompressionKind_ZLIB; + compressionStrategy = CompressionStrategy_SPEED; + memoryPool = getDefaultPool(); + paddingTolerance = 0.0; + errorStream = &std::cerr; + dictionaryKeySizeThreshold = 0.0; + enableIndex = true; + bloomFilterFalsePositiveProb = 0.05; + bloomFilterVersion = UTF8; + } + }; + + WriterOptions::WriterOptions(): + privateBits(std::unique_ptr<WriterOptionsPrivate> + (new WriterOptionsPrivate())) { + // PASS + } + + WriterOptions::WriterOptions(const WriterOptions& rhs): + privateBits(std::unique_ptr<WriterOptionsPrivate> + (new WriterOptionsPrivate(*(rhs.privateBits.get())))) { + // PASS + } + + WriterOptions::WriterOptions(WriterOptions& rhs) { + // swap privateBits with rhs + WriterOptionsPrivate* l = privateBits.release(); + privateBits.reset(rhs.privateBits.release()); + rhs.privateBits.reset(l); + } + + WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) { + if (this != &rhs) { + privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get()))); + } + return *this; + } + + WriterOptions::~WriterOptions() { + // PASS + } + RleVersion WriterOptions::getRleVersion() const { + if(privateBits->fileVersion == FileVersion::v_0_11()) + { + return RleVersion_1; + } + + return RleVersion_2; + } + + WriterOptions& WriterOptions::setStripeSize(uint64_t size) { + privateBits->stripeSize = size; + return *this; + } + + uint64_t WriterOptions::getStripeSize() const { + return privateBits->stripeSize; + } + + WriterOptions& WriterOptions::setCompressionBlockSize(uint64_t size) { + privateBits->compressionBlockSize = size; + return *this; + } + + uint64_t WriterOptions::getCompressionBlockSize() const { + return privateBits->compressionBlockSize; + } + + WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) { + privateBits->rowIndexStride = stride; + privateBits->enableIndex = (stride != 0); + return *this; + } + + uint64_t WriterOptions::getRowIndexStride() const { + return privateBits->rowIndexStride; + } + + WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) { + privateBits->dictionaryKeySizeThreshold = val; + return *this; + } + + double WriterOptions::getDictionaryKeySizeThreshold() const { + return privateBits->dictionaryKeySizeThreshold; + } + + WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) { + // Only Hive_0_11 and Hive_0_12 version are supported currently + if (version.getMajor() == 0 && (version.getMinor() == 11 || version.getMinor() == 12)) { + privateBits->fileVersion = version; + return *this; + } + throw std::logic_error("Unsupported file version specified."); + } + + FileVersion WriterOptions::getFileVersion() const { + return privateBits->fileVersion; + } + + WriterOptions& WriterOptions::setCompression(CompressionKind comp) { + privateBits->compression = comp; + return *this; + } + + CompressionKind WriterOptions::getCompression() const { + return privateBits->compression; + } + + WriterOptions& WriterOptions::setCompressionStrategy( + CompressionStrategy strategy) { + privateBits->compressionStrategy = strategy; + return *this; + } + + CompressionStrategy WriterOptions::getCompressionStrategy() const { + return privateBits->compressionStrategy; + } + + bool WriterOptions::getAlignedBitpacking() const { + return privateBits->compressionStrategy == CompressionStrategy ::CompressionStrategy_SPEED; + } + + WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) { + privateBits->paddingTolerance = tolerance; + return *this; + } + + double WriterOptions::getPaddingTolerance() const { + return privateBits->paddingTolerance; + } + + WriterOptions& WriterOptions::setMemoryPool(MemoryPool* memoryPool) { + privateBits->memoryPool = memoryPool; + return *this; + } + + MemoryPool* WriterOptions::getMemoryPool() const { + return privateBits->memoryPool; + } + + WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) { + privateBits->errorStream = &errStream; + return *this; + } + + std::ostream* WriterOptions::getErrorStream() const { + return privateBits->errorStream; + } + + bool WriterOptions::getEnableIndex() const { + return privateBits->enableIndex; + } + + bool WriterOptions::getEnableDictionary() const { + return privateBits->dictionaryKeySizeThreshold > 0.0; + } + + WriterOptions& WriterOptions::setColumnsUseBloomFilter( + const std::set<uint64_t>& columns) { + privateBits->columnsUseBloomFilter = columns; + return *this; + } + + bool WriterOptions::isColumnUseBloomFilter(uint64_t column) const { + return privateBits->columnsUseBloomFilter.find(column) != + privateBits->columnsUseBloomFilter.end(); + } + + WriterOptions& WriterOptions::setBloomFilterFPP(double fpp) { + privateBits->bloomFilterFalsePositiveProb = fpp; + return *this; + } + + double WriterOptions::getBloomFilterFPP() const { + return privateBits->bloomFilterFalsePositiveProb; + } + + // delibrately not provide setter to write bloom filter version because + // we only support UTF8 for now. + BloomFilterVersion WriterOptions::getBloomFilterVersion() const { + return privateBits->bloomFilterVersion; + } + + Writer::~Writer() { + // PASS + } + + class WriterImpl : public Writer { + private: + std::unique_ptr<ColumnWriter> columnWriter; + std::unique_ptr<BufferedOutputStream> compressionStream; + std::unique_ptr<BufferedOutputStream> bufferedStream; + std::unique_ptr<StreamsFactory> streamsFactory; + OutputStream* outStream; + WriterOptions options; + const Type& type; + uint64_t stripeRows, totalRows, indexRows; + uint64_t currentOffset; + proto::Footer fileFooter; + proto::PostScript postScript; + proto::StripeInformation stripeInfo; + proto::Metadata metadata; + + static const char* magicId; + static const WriterId writerId; + + public: + WriterImpl( + const Type& type, + OutputStream* stream, + const WriterOptions& options); + + std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) + const override; + + void add(ColumnVectorBatch& rowsToAdd) override; + + void close() override; + + void addUserMetadata(const std::string name, const std::string value) override; + + private: + void init(); + void initStripe(); + void writeStripe(); + void writeMetadata(); + void writeFileFooter(); + void writePostscript(); + void buildFooterType(const Type& t, proto::Footer& footer, uint32_t& index); + static proto::CompressionKind convertCompressionKind( + const CompressionKind& kind); + }; + + const char * WriterImpl::magicId = "ORC"; + + const WriterId WriterImpl::writerId = WriterId::ORC_CPP_WRITER; + + WriterImpl::WriterImpl( + const Type& t, + OutputStream* stream, + const WriterOptions& opts) : + outStream(stream), + options(opts), + type(t) { + streamsFactory = createStreamsFactory(options, outStream); + columnWriter = buildWriter(type, *streamsFactory, options); + stripeRows = totalRows = indexRows = 0; + currentOffset = 0; + + // compression stream for stripe footer, file footer and metadata + compressionStream = createCompressor( + options.getCompression(), + outStream, + options.getCompressionStrategy(), + 1 * 1024 * 1024, // buffer capacity: 1M + options.getCompressionBlockSize(), + *options.getMemoryPool()); + + // uncompressed stream for post script + bufferedStream.reset(new BufferedOutputStream( + *options.getMemoryPool(), + outStream, + 1024, // buffer capacity: 1024 bytes + options.getCompressionBlockSize())); + + init(); + } + + std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size) + const { + return type.createRowBatch(size, *options.getMemoryPool()); + } + + void WriterImpl::add(ColumnVectorBatch& rowsToAdd) { + if (options.getEnableIndex()) { + uint64_t pos = 0; + uint64_t chunkSize = 0; + uint64_t rowIndexStride = options.getRowIndexStride(); + while (pos < rowsToAdd.numElements) { + chunkSize = std::min(rowsToAdd.numElements - pos, + rowIndexStride - indexRows); + columnWriter->add(rowsToAdd, pos, chunkSize, nullptr); + + pos += chunkSize; + indexRows += chunkSize; + stripeRows += chunkSize; + + if (indexRows >= rowIndexStride) { + columnWriter->createRowIndexEntry(); + indexRows = 0; + } + } + } else { + stripeRows += rowsToAdd.numElements; + columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements, nullptr); + } + + if (columnWriter->getEstimatedSize() >= options.getStripeSize()) { + writeStripe(); + } + } + + void WriterImpl::close() { + if (stripeRows > 0) { + writeStripe(); + } + writeMetadata(); + writeFileFooter(); + writePostscript(); + outStream->close(); + } + + void WriterImpl::addUserMetadata(const std::string name, const std::string value){ + proto::UserMetadataItem* userMetadataItem = fileFooter.add_metadata(); + userMetadataItem->set_name(TString(name)); + userMetadataItem->set_value(TString(value)); + } + + void WriterImpl::init() { + // Write file header + const static size_t magicIdLength = strlen(WriterImpl::magicId); + outStream->write(WriterImpl::magicId, magicIdLength); + currentOffset += magicIdLength; + + // Initialize file footer + fileFooter.set_headerlength(currentOffset); + fileFooter.set_contentlength(0); + fileFooter.set_numberofrows(0); + fileFooter.set_rowindexstride( + static_cast<uint32_t>(options.getRowIndexStride())); + fileFooter.set_writer(writerId); fileFooter.set_softwareversion(ORC_VERSION); - - uint32_t index = 0; - buildFooterType(type, fileFooter, index); - - // Initialize post script - postScript.set_footerlength(0); - postScript.set_compression( - WriterImpl::convertCompressionKind(options.getCompression())); - postScript.set_compressionblocksize(options.getCompressionBlockSize()); - - postScript.add_version(options.getFileVersion().getMajor()); - postScript.add_version(options.getFileVersion().getMinor()); - - postScript.set_writerversion(WriterVersion_ORC_135); - postScript.set_magic("ORC"); - - // Initialize first stripe - initStripe(); - } - - void WriterImpl::initStripe() { - stripeInfo.set_offset(currentOffset); - stripeInfo.set_indexlength(0); - stripeInfo.set_datalength(0); - stripeInfo.set_footerlength(0); - stripeInfo.set_numberofrows(0); - - stripeRows = indexRows = 0; - } - - void WriterImpl::writeStripe() { - if (options.getEnableIndex() && indexRows != 0) { - columnWriter->createRowIndexEntry(); - indexRows = 0; - } else { - columnWriter->mergeRowGroupStatsIntoStripeStats(); - } - - // dictionary should be written before any stream is flushed - columnWriter->writeDictionary(); - - std::vector<proto::Stream> streams; - // write ROW_INDEX streams - if (options.getEnableIndex()) { - columnWriter->writeIndex(streams); - } - // write streams like PRESENT, DATA, etc. - columnWriter->flush(streams); - - // generate and write stripe footer - proto::StripeFooter stripeFooter; - for (uint32_t i = 0; i < streams.size(); ++i) { - *stripeFooter.add_streams() = streams[i]; - } - - std::vector<proto::ColumnEncoding> encodings; - columnWriter->getColumnEncoding(encodings); - - for (uint32_t i = 0; i < encodings.size(); ++i) { - *stripeFooter.add_columns() = encodings[i]; - } - - // use GMT to guarantee TimestampVectorBatch from reader can write - // same wall clock time - stripeFooter.set_writertimezone("GMT"); - - // add stripe statistics to metadata - proto::StripeStatistics* stripeStats = metadata.add_stripestats(); - std::vector<proto::ColumnStatistics> colStats; - columnWriter->getStripeStatistics(colStats); - for (uint32_t i = 0; i != colStats.size(); ++i) { - *stripeStats->add_colstats() = colStats[i]; - } - // merge stripe stats into file stats and clear stripe stats - columnWriter->mergeStripeStatsIntoFileStats(); - - if (!stripeFooter.SerializeToZeroCopyStream(compressionStream.get())) { - throw std::logic_error("Failed to write stripe footer."); - } - uint64_t footerLength = compressionStream->flush(); - - // calculate data length and index length - uint64_t dataLength = 0; - uint64_t indexLength = 0; - for (uint32_t i = 0; i < streams.size(); ++i) { - if (streams[i].kind() == proto::Stream_Kind_ROW_INDEX || - streams[i].kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8) { - indexLength += streams[i].length(); - } else { - dataLength += streams[i].length(); - } - } - - // update stripe info - stripeInfo.set_indexlength(indexLength); - stripeInfo.set_datalength(dataLength); - stripeInfo.set_footerlength(footerLength); - stripeInfo.set_numberofrows(stripeRows); - - *fileFooter.add_stripes() = stripeInfo; - - currentOffset = currentOffset + indexLength + dataLength + footerLength; - totalRows += stripeRows; - - columnWriter->reset(); - - initStripe(); - } - - void WriterImpl::writeMetadata() { - if (!metadata.SerializeToZeroCopyStream(compressionStream.get())) { - throw std::logic_error("Failed to write metadata."); - } - postScript.set_metadatalength(compressionStream.get()->flush()); - } - - void WriterImpl::writeFileFooter() { - fileFooter.set_contentlength(currentOffset - fileFooter.headerlength()); - fileFooter.set_numberofrows(totalRows); - - // update file statistics - std::vector<proto::ColumnStatistics> colStats; - columnWriter->getFileStatistics(colStats); - for (uint32_t i = 0; i != colStats.size(); ++i) { - *fileFooter.add_statistics() = colStats[i]; - } - - if (!fileFooter.SerializeToZeroCopyStream(compressionStream.get())) { - throw std::logic_error("Failed to write file footer."); - } - postScript.set_footerlength(compressionStream->flush()); - } - - void WriterImpl::writePostscript() { - if (!postScript.SerializeToZeroCopyStream(bufferedStream.get())) { - throw std::logic_error("Failed to write post script."); - } - unsigned char psLength = - static_cast<unsigned char>(bufferedStream->flush()); - outStream->write(&psLength, sizeof(unsigned char)); - } - - void WriterImpl::buildFooterType( - const Type& t, - proto::Footer& footer, - uint32_t & index) { - proto::Type protoType; - protoType.set_maximumlength(static_cast<uint32_t>(t.getMaximumLength())); - protoType.set_precision(static_cast<uint32_t>(t.getPrecision())); - protoType.set_scale(static_cast<uint32_t>(t.getScale())); - - switch (t.getKind()) { - case BOOLEAN: { - protoType.set_kind(proto::Type_Kind_BOOLEAN); - break; - } - case BYTE: { - protoType.set_kind(proto::Type_Kind_BYTE); - break; - } - case SHORT: { - protoType.set_kind(proto::Type_Kind_SHORT); - break; - } - case INT: { - protoType.set_kind(proto::Type_Kind_INT); - break; - } - case LONG: { - protoType.set_kind(proto::Type_Kind_LONG); - break; - } - case FLOAT: { - protoType.set_kind(proto::Type_Kind_FLOAT); - break; - } - case DOUBLE: { - protoType.set_kind(proto::Type_Kind_DOUBLE); - break; - } - case STRING: { - protoType.set_kind(proto::Type_Kind_STRING); - break; - } - case BINARY: { - protoType.set_kind(proto::Type_Kind_BINARY); - break; - } - case TIMESTAMP: { - protoType.set_kind(proto::Type_Kind_TIMESTAMP); - break; - } - case LIST: { - protoType.set_kind(proto::Type_Kind_LIST); - break; - } - case MAP: { - protoType.set_kind(proto::Type_Kind_MAP); - break; - } - case STRUCT: { - protoType.set_kind(proto::Type_Kind_STRUCT); - break; - } - case UNION: { - protoType.set_kind(proto::Type_Kind_UNION); - break; - } - case DECIMAL: { - protoType.set_kind(proto::Type_Kind_DECIMAL); - break; - } - case DATE: { - protoType.set_kind(proto::Type_Kind_DATE); - break; - } - case VARCHAR: { - protoType.set_kind(proto::Type_Kind_VARCHAR); - break; - } - case CHAR: { - protoType.set_kind(proto::Type_Kind_CHAR); - break; - } - default: - throw std::logic_error("Unknown type."); - } - - int pos = static_cast<int>(index); - *footer.add_types() = protoType; - - for (uint64_t i = 0; i < t.getSubtypeCount(); ++i) { - // only add subtypes' field names if this type is STRUCT - if (t.getKind() == STRUCT) { - footer.mutable_types(pos)->add_fieldnames(TString(t.getFieldName(i))); - } - footer.mutable_types(pos)->add_subtypes(++index); - buildFooterType(*t.getSubtype(i), footer, index); - } - } - - proto::CompressionKind WriterImpl::convertCompressionKind( - const CompressionKind& kind) { - return static_cast<proto::CompressionKind>(kind); - } - - std::unique_ptr<Writer> createWriter( - const Type& type, - OutputStream* stream, - const WriterOptions& options) { - return std::unique_ptr<Writer>( - new WriterImpl( - type, - stream, - options)); - } - -} - + + uint32_t index = 0; + buildFooterType(type, fileFooter, index); + + // Initialize post script + postScript.set_footerlength(0); + postScript.set_compression( + WriterImpl::convertCompressionKind(options.getCompression())); + postScript.set_compressionblocksize(options.getCompressionBlockSize()); + + postScript.add_version(options.getFileVersion().getMajor()); + postScript.add_version(options.getFileVersion().getMinor()); + + postScript.set_writerversion(WriterVersion_ORC_135); + postScript.set_magic("ORC"); + + // Initialize first stripe + initStripe(); + } + + void WriterImpl::initStripe() { + stripeInfo.set_offset(currentOffset); + stripeInfo.set_indexlength(0); + stripeInfo.set_datalength(0); + stripeInfo.set_footerlength(0); + stripeInfo.set_numberofrows(0); + + stripeRows = indexRows = 0; + } + + void WriterImpl::writeStripe() { + if (options.getEnableIndex() && indexRows != 0) { + columnWriter->createRowIndexEntry(); + indexRows = 0; + } else { + columnWriter->mergeRowGroupStatsIntoStripeStats(); + } + + // dictionary should be written before any stream is flushed + columnWriter->writeDictionary(); + + std::vector<proto::Stream> streams; + // write ROW_INDEX streams + if (options.getEnableIndex()) { + columnWriter->writeIndex(streams); + } + // write streams like PRESENT, DATA, etc. + columnWriter->flush(streams); + + // generate and write stripe footer + proto::StripeFooter stripeFooter; + for (uint32_t i = 0; i < streams.size(); ++i) { + *stripeFooter.add_streams() = streams[i]; + } + + std::vector<proto::ColumnEncoding> encodings; + columnWriter->getColumnEncoding(encodings); + + for (uint32_t i = 0; i < encodings.size(); ++i) { + *stripeFooter.add_columns() = encodings[i]; + } + + // use GMT to guarantee TimestampVectorBatch from reader can write + // same wall clock time + stripeFooter.set_writertimezone("GMT"); + + // add stripe statistics to metadata + proto::StripeStatistics* stripeStats = metadata.add_stripestats(); + std::vector<proto::ColumnStatistics> colStats; + columnWriter->getStripeStatistics(colStats); + for (uint32_t i = 0; i != colStats.size(); ++i) { + *stripeStats->add_colstats() = colStats[i]; + } + // merge stripe stats into file stats and clear stripe stats + columnWriter->mergeStripeStatsIntoFileStats(); + + if (!stripeFooter.SerializeToZeroCopyStream(compressionStream.get())) { + throw std::logic_error("Failed to write stripe footer."); + } + uint64_t footerLength = compressionStream->flush(); + + // calculate data length and index length + uint64_t dataLength = 0; + uint64_t indexLength = 0; + for (uint32_t i = 0; i < streams.size(); ++i) { + if (streams[i].kind() == proto::Stream_Kind_ROW_INDEX || + streams[i].kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8) { + indexLength += streams[i].length(); + } else { + dataLength += streams[i].length(); + } + } + + // update stripe info + stripeInfo.set_indexlength(indexLength); + stripeInfo.set_datalength(dataLength); + stripeInfo.set_footerlength(footerLength); + stripeInfo.set_numberofrows(stripeRows); + + *fileFooter.add_stripes() = stripeInfo; + + currentOffset = currentOffset + indexLength + dataLength + footerLength; + totalRows += stripeRows; + + columnWriter->reset(); + + initStripe(); + } + + void WriterImpl::writeMetadata() { + if (!metadata.SerializeToZeroCopyStream(compressionStream.get())) { + throw std::logic_error("Failed to write metadata."); + } + postScript.set_metadatalength(compressionStream.get()->flush()); + } + + void WriterImpl::writeFileFooter() { + fileFooter.set_contentlength(currentOffset - fileFooter.headerlength()); + fileFooter.set_numberofrows(totalRows); + + // update file statistics + std::vector<proto::ColumnStatistics> colStats; + columnWriter->getFileStatistics(colStats); + for (uint32_t i = 0; i != colStats.size(); ++i) { + *fileFooter.add_statistics() = colStats[i]; + } + + if (!fileFooter.SerializeToZeroCopyStream(compressionStream.get())) { + throw std::logic_error("Failed to write file footer."); + } + postScript.set_footerlength(compressionStream->flush()); + } + + void WriterImpl::writePostscript() { + if (!postScript.SerializeToZeroCopyStream(bufferedStream.get())) { + throw std::logic_error("Failed to write post script."); + } + unsigned char psLength = + static_cast<unsigned char>(bufferedStream->flush()); + outStream->write(&psLength, sizeof(unsigned char)); + } + + void WriterImpl::buildFooterType( + const Type& t, + proto::Footer& footer, + uint32_t & index) { + proto::Type protoType; + protoType.set_maximumlength(static_cast<uint32_t>(t.getMaximumLength())); + protoType.set_precision(static_cast<uint32_t>(t.getPrecision())); + protoType.set_scale(static_cast<uint32_t>(t.getScale())); + + switch (t.getKind()) { + case BOOLEAN: { + protoType.set_kind(proto::Type_Kind_BOOLEAN); + break; + } + case BYTE: { + protoType.set_kind(proto::Type_Kind_BYTE); + break; + } + case SHORT: { + protoType.set_kind(proto::Type_Kind_SHORT); + break; + } + case INT: { + protoType.set_kind(proto::Type_Kind_INT); + break; + } + case LONG: { + protoType.set_kind(proto::Type_Kind_LONG); + break; + } + case FLOAT: { + protoType.set_kind(proto::Type_Kind_FLOAT); + break; + } + case DOUBLE: { + protoType.set_kind(proto::Type_Kind_DOUBLE); + break; + } + case STRING: { + protoType.set_kind(proto::Type_Kind_STRING); + break; + } + case BINARY: { + protoType.set_kind(proto::Type_Kind_BINARY); + break; + } + case TIMESTAMP: { + protoType.set_kind(proto::Type_Kind_TIMESTAMP); + break; + } + case LIST: { + protoType.set_kind(proto::Type_Kind_LIST); + break; + } + case MAP: { + protoType.set_kind(proto::Type_Kind_MAP); + break; + } + case STRUCT: { + protoType.set_kind(proto::Type_Kind_STRUCT); + break; + } + case UNION: { + protoType.set_kind(proto::Type_Kind_UNION); + break; + } + case DECIMAL: { + protoType.set_kind(proto::Type_Kind_DECIMAL); + break; + } + case DATE: { + protoType.set_kind(proto::Type_Kind_DATE); + break; + } + case VARCHAR: { + protoType.set_kind(proto::Type_Kind_VARCHAR); + break; + } + case CHAR: { + protoType.set_kind(proto::Type_Kind_CHAR); + break; + } + default: + throw std::logic_error("Unknown type."); + } + + int pos = static_cast<int>(index); + *footer.add_types() = protoType; + + for (uint64_t i = 0; i < t.getSubtypeCount(); ++i) { + // only add subtypes' field names if this type is STRUCT + if (t.getKind() == STRUCT) { + footer.mutable_types(pos)->add_fieldnames(TString(t.getFieldName(i))); + } + footer.mutable_types(pos)->add_subtypes(++index); + buildFooterType(*t.getSubtype(i), footer, index); + } + } + + proto::CompressionKind WriterImpl::convertCompressionKind( + const CompressionKind& kind) { + return static_cast<proto::CompressionKind>(kind); + } + + std::unique_ptr<Writer> createWriter( + const Type& type, + OutputStream* stream, + const WriterOptions& options) { + return std::unique_ptr<Writer>( + new WriterImpl( + type, + stream, + options)); + } + +} + |