diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/apache/avro/api/DataFile.hh | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/apache/avro/api/DataFile.hh')
-rw-r--r-- | contrib/libs/apache/avro/api/DataFile.hh | 419 |
1 files changed, 419 insertions, 0 deletions
diff --git a/contrib/libs/apache/avro/api/DataFile.hh b/contrib/libs/apache/avro/api/DataFile.hh new file mode 100644 index 0000000000..50169106b1 --- /dev/null +++ b/contrib/libs/apache/avro/api/DataFile.hh @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef avro_DataFile_hh__ +#define avro_DataFile_hh__ + +#include "Config.hh" +#include "Encoder.hh" +#include "buffer/Buffer.hh" +#include "ValidSchema.hh" +#include "Specific.hh" +#include "Stream.hh" + +#include <map> +#include <string> +#include <vector> + +#include "array" +#include "boost/utility.hpp" +#include <boost/iostreams/filtering_stream.hpp> + +namespace avro { + +/** Specify type of compression to use when writing data files. */ +enum Codec { + NULL_CODEC, + DEFLATE_CODEC, + +#ifdef SNAPPY_CODEC_AVAILABLE + SNAPPY_CODEC +#endif + +}; + +const int SyncSize = 16; +/** + * The sync value. + */ +typedef std::array<uint8_t, SyncSize> DataFileSync; + +/** + * Type-independent portion of DataFileWriter. + * At any given point in time, at most one file can be written using + * this object. + */ +class AVRO_DECL DataFileWriterBase : boost::noncopyable { + const std::string filename_; + const ValidSchema schema_; + const EncoderPtr encoderPtr_; + const size_t syncInterval_; + Codec codec_; + + std::unique_ptr<OutputStream> stream_; + std::unique_ptr<OutputStream> buffer_; + const DataFileSync sync_; + int64_t objectCount_; + + typedef std::map<std::string, std::vector<uint8_t> > Metadata; + + Metadata metadata_; + int64_t lastSync_; + + static std::unique_ptr<OutputStream> makeStream(const char* filename); + static DataFileSync makeSync(); + + void writeHeader(); + void setMetadata(const std::string& key, const std::string& value); + + /** + * Generates a sync marker in the file. + */ + void sync(); + + /** + * Shared constructor portion since we aren't using C++11 + */ + void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec); + +public: + /** + * Returns the current encoder for this writer. + */ + Encoder& encoder() const { return *encoderPtr_; } + + /** + * Returns true if the buffer has sufficient data for a sync to be + * inserted. + */ + void syncIfNeeded(); + + /** + * Returns the byte offset (within the current file) of the start of the current block being written. + */ + uint64_t getCurrentBlockStart(); + + /** + * Increments the object count. + */ + void incr() { + ++objectCount_; + } + /** + * Constructs a data file writer with the given sync interval and name. + */ + DataFileWriterBase(const char* filename, const ValidSchema& schema, + size_t syncInterval, Codec codec = NULL_CODEC); + DataFileWriterBase(std::unique_ptr<OutputStream> outputStream, + const ValidSchema& schema, size_t syncInterval, Codec codec); + + ~DataFileWriterBase(); + /** + * Closes the current file. Once closed this datafile object cannot be + * used for writing any more. + */ + void close(); + + /** + * Returns the schema for this data file. + */ + const ValidSchema& schema() const { return schema_; } + + /** + * Flushes any unwritten data into the file. + */ + void flush(); +}; + +/** + * An Avro datafile that can store objects of type T. + */ +template <typename T> +class DataFileWriter : boost::noncopyable { + std::unique_ptr<DataFileWriterBase> base_; +public: + /** + * Constructs a new data file. + */ + DataFileWriter(const char* filename, const ValidSchema& schema, + size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : + base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { } + + DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema& schema, + size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : + base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) { } + + /** + * Writes the given piece of data into the file. + */ + void write(const T& datum) { + base_->syncIfNeeded(); + avro::encode(base_->encoder(), datum); + base_->incr(); + } + + /** + * Returns the byte offset (within the current file) of the start of the current block being written. + */ + uint64_t getCurrentBlockStart() { return base_->getCurrentBlockStart(); } + + + /** + * Closes the current file. Once closed this datafile object cannot be + * used for writing any more. + */ + void close() { base_->close(); } + + /** + * Returns the schema for this data file. + */ + const ValidSchema& schema() const { return base_->schema(); } + + /** + * Flushes any unwritten data into the file. + */ + void flush() { base_->flush(); } +}; + +/** + * The type independent portion of reader. + */ +class AVRO_DECL DataFileReaderBase : boost::noncopyable { + const std::string filename_; + const std::unique_ptr<InputStream> stream_; + const DecoderPtr decoder_; + int64_t objectCount_; + bool eof_; + Codec codec_; + int64_t blockStart_; + int64_t blockEnd_; + + ValidSchema readerSchema_; + ValidSchema dataSchema_; + DecoderPtr dataDecoder_; + std::unique_ptr<InputStream> dataStream_; + typedef std::map<std::string, std::vector<uint8_t> > Metadata; + + Metadata metadata_; + DataFileSync sync_; + + // for compressed buffer + std::unique_ptr<boost::iostreams::filtering_istream> os_; + std::vector<char> compressed_; + std::string uncompressed; + void readHeader(); + + void readDataBlock(); + void doSeek(int64_t position); +public: + /** + * Returns the current decoder for this reader. + */ + Decoder& decoder() { return *dataDecoder_; } + + /** + * Returns true if and only if there is more to read. + */ + bool hasMore(); + + /** + * Decrements the number of objects yet to read. + */ + void decr() { --objectCount_; } + + /** + * Constructs the reader for the given file and the reader is + * expected to use the schema that is used with data. + * This function should be called exactly once after constructing + * the DataFileReaderBase object. + */ + DataFileReaderBase(const char* filename); + + DataFileReaderBase(std::unique_ptr<InputStream> inputStream); + + /** + * Initializes the reader so that the reader and writer schemas + * are the same. + */ + void init(); + + /** + * Initializes the reader to read objects according to the given + * schema. This gives an opportunity for the reader to see the schema + * in the data file before deciding the right schema to use for reading. + * This must be called exactly once after constructing the + * DataFileReaderBase object. + */ + void init(const ValidSchema& readerSchema); + + /** + * Returns the schema for this object. + */ + const ValidSchema& readerSchema() { return readerSchema_; } + + /** + * Returns the schema stored with the data file. + */ + const ValidSchema& dataSchema() { return dataSchema_; } + + /** + * Closes the reader. No further operation is possible on this reader. + */ + void close(); + + /** + * Move to a specific, known synchronization point, for example one returned + * from tell() after sync(). + */ + void seek(int64_t position); + + /** + * Move to the next synchronization point after a position. To process a + * range of file entries, call this with the starting position, then check + * pastSync() with the end point before each use of decoder(). + */ + void sync(int64_t position); + + /** + * Return true if past the next synchronization point after a position. + */ + bool pastSync(int64_t position); + + /** + * Return the last synchronization point before our current position. + */ + int64_t previousSync(); +}; + +/** + * Reads the contents of data file one after another. + */ +template <typename T> +class DataFileReader : boost::noncopyable { + std::unique_ptr<DataFileReaderBase> base_; +public: + /** + * Constructs the reader for the given file and the reader is + * expected to use the given schema. + */ + DataFileReader(const char* filename, const ValidSchema& readerSchema) : + base_(new DataFileReaderBase(filename)) { + base_->init(readerSchema); + } + + DataFileReader(std::unique_ptr<InputStream> inputStream, const ValidSchema& readerSchema) : + base_(new DataFileReaderBase(std::move(inputStream))) { + base_->init(readerSchema); + } + + /** + * Constructs the reader for the given file and the reader is + * expected to use the schema that is used with data. + */ + DataFileReader(const char* filename) : + base_(new DataFileReaderBase(filename)) { + base_->init(); + } + + DataFileReader(std::unique_ptr<InputStream> inputStream) : + base_(new DataFileReaderBase(std::move(inputStream))) { + base_->init(); + } + + /** + * Constructs a reader using the reader base. This form of constructor + * allows the user to examine the schema of a given file and then + * decide to use the right type of data to be deserialize. Without this + * the user must know the type of data for the template _before_ + * he knows the schema within the file. + * The schema present in the data file will be used for reading + * from this reader. + */ + DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) { + base_->init(); + } + + /** + * Constructs a reader using the reader base. This form of constructor + * allows the user to examine the schema of a given file and then + * decide to use the right type of data to be deserialize. Without this + * the user must know the type of data for the template _before_ + * he knows the schema within the file. + * The argument readerSchema will be used for reading + * from this reader. + */ + DataFileReader(std::unique_ptr<DataFileReaderBase> base, + const ValidSchema& readerSchema) : base_(std::move(base)) { + base_->init(readerSchema); + } + + /** + * Reads the next entry from the data file. + * \return true if an object has been successfully read into \p datum and + * false if there are no more entries in the file. + */ + bool read(T& datum) { + if (base_->hasMore()) { + base_->decr(); + avro::decode(base_->decoder(), datum); + return true; + } + return false; + } + + /** + * Returns the schema for this object. + */ + const ValidSchema& readerSchema() { return base_->readerSchema(); } + + /** + * Returns the schema stored with the data file. + */ + const ValidSchema& dataSchema() { return base_->dataSchema(); } + + /** + * Closes the reader. No further operation is possible on this reader. + */ + void close() { return base_->close(); } + + /** + * Move to a specific, known synchronization point, for example one returned + * from previousSync(). + */ + void seek(int64_t position) { base_->seek(position); } + + /** + * Move to the next synchronization point after a position. To process a + * range of file entries, call this with the starting position, then check + * pastSync() with the end point before each call to read(). + */ + void sync(int64_t position) { base_->sync(position); } + + /** + * Return true if past the next synchronization point after a position. + */ + bool pastSync(int64_t position) { return base_->pastSync(position); } + + /** + * Return the last synchronization point before our current position. + */ + int64_t previousSync() { return base_->previousSync(); } +}; + +} // namespace avro +#endif |