aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/avro/impl/DataFile.cc
diff options
context:
space:
mode:
authorthegeorg <thegeorg@yandex-team.ru>2022-02-10 16:45:08 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:08 +0300
commit4e839db24a3bbc9f1c610c43d6faaaa99824dcca (patch)
tree506dac10f5df94fab310584ee51b24fc5a081c22 /contrib/libs/apache/avro/impl/DataFile.cc
parent2d37894b1b037cf24231090eda8589bbb44fb6fc (diff)
downloadydb-4e839db24a3bbc9f1c610c43d6faaaa99824dcca.tar.gz
Restoring authorship annotation for <thegeorg@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/apache/avro/impl/DataFile.cc')
-rw-r--r--contrib/libs/apache/avro/impl/DataFile.cc1200
1 files changed, 600 insertions, 600 deletions
diff --git a/contrib/libs/apache/avro/impl/DataFile.cc b/contrib/libs/apache/avro/impl/DataFile.cc
index e20e605827..8b92440d04 100644
--- a/contrib/libs/apache/avro/impl/DataFile.cc
+++ b/contrib/libs/apache/avro/impl/DataFile.cc
@@ -1,600 +1,600 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "DataFile.hh"
-#include "Compiler.hh"
-#include "Exception.hh"
-
-#include <sstream>
-
-#include <boost/random/mersenne_twister.hpp>
-#include <boost/iostreams/device/file.hpp>
-#include <boost/iostreams/filter/gzip.hpp>
-#include <boost/iostreams/filter/zlib.hpp>
-#include <boost/crc.hpp> // for boost::crc_32_type
-
-#ifdef SNAPPY_CODEC_AVAILABLE
-#include <snappy.h>
-#endif
-
-namespace avro {
-using std::unique_ptr;
-using std::ostringstream;
-using std::istringstream;
-using std::vector;
-using std::copy;
-using std::string;
-
-using std::array;
-
-namespace {
-const string AVRO_SCHEMA_KEY("avro.schema");
-const string AVRO_CODEC_KEY("avro.codec");
-const string AVRO_NULL_CODEC("null");
-const string AVRO_DEFLATE_CODEC("deflate");
-
-#ifdef SNAPPY_CODEC_AVAILABLE
-const string AVRO_SNAPPY_CODEC = "snappy";
-#endif
-
-const size_t minSyncInterval = 32;
-const size_t maxSyncInterval = 1u << 30;
-
-boost::iostreams::zlib_params get_zlib_params() {
- boost::iostreams::zlib_params ret;
- ret.method = boost::iostreams::zlib::deflated;
- ret.noheader = true;
- return ret;
-}
-}
-
-DataFileWriterBase::DataFileWriterBase(const char* filename, const ValidSchema& schema, size_t syncInterval,
- Codec codec) :
- filename_(filename),
- schema_(schema),
- encoderPtr_(binaryEncoder()),
- syncInterval_(syncInterval),
- codec_(codec),
- stream_(fileOutputStream(filename)),
- buffer_(memoryOutputStream()),
- sync_(makeSync()),
- objectCount_(0),
- lastSync_(0)
-{
- init(schema, syncInterval, codec);
-}
-
-DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
- const ValidSchema& schema, size_t syncInterval, Codec codec) :
- filename_(),
- schema_(schema),
- encoderPtr_(binaryEncoder()),
- syncInterval_(syncInterval),
- codec_(codec),
- stream_(std::move(outputStream)),
- buffer_(memoryOutputStream()),
- sync_(makeSync()),
- objectCount_(0),
- lastSync_(0)
-{
- init(schema, syncInterval, codec);
-}
-
-void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec) {
- if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
- throw Exception(boost::format("Invalid sync interval: %1%. "
- "Should be between %2% and %3%") % syncInterval %
- minSyncInterval % maxSyncInterval);
- }
- setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
-
- if (codec_ == NULL_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
- } else if (codec_ == DEFLATE_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
-#ifdef SNAPPY_CODEC_AVAILABLE
- } else if (codec_ == SNAPPY_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
-#endif
- } else {
- throw Exception(boost::format("Unknown codec: %1%") % codec);
- }
- setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
-
- writeHeader();
- encoderPtr_->init(*buffer_);
-
- lastSync_ = stream_->byteCount();
-}
-
-
-DataFileWriterBase::~DataFileWriterBase()
-{
- if (stream_.get()) {
- close();
- }
-}
-
-void DataFileWriterBase::close()
-{
- flush();
- stream_.reset();
-}
-
-void DataFileWriterBase::sync()
-{
- encoderPtr_->flush();
-
- encoderPtr_->init(*stream_);
- avro::encode(*encoderPtr_, objectCount_);
- if (codec_ == NULL_CODEC) {
- int64_t byteCount = buffer_->byteCount();
- avro::encode(*encoderPtr_, byteCount);
- encoderPtr_->flush();
- std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
- copy(*in, *stream_);
- } else if (codec_ == DEFLATE_CODEC) {
- std::vector<char> buf;
- {
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
- os.push(boost::iostreams::back_inserter(buf));
- const uint8_t* data;
- size_t len;
-
- std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
- while (input->next(&data, &len)) {
- boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
- }
- } // make sure all is flushed
- std::unique_ptr<InputStream> in = memoryInputStream(
- reinterpret_cast<const uint8_t*>(buf.data()), buf.size());
- int64_t byteCount = buf.size();
- avro::encode(*encoderPtr_, byteCount);
- encoderPtr_->flush();
- copy(*in, *stream_);
-#ifdef SNAPPY_CODEC_AVAILABLE
- } else if (codec_ == SNAPPY_CODEC) {
- std::vector<char> temp;
- std::string compressed;
- boost::crc_32_type crc;
- {
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::back_inserter(temp));
- const uint8_t* data;
- size_t len;
-
- std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
- while (input->next(&data, &len)) {
- boost::iostreams::write(os, reinterpret_cast<const char*>(data),
- len);
- }
- } // make sure all is flushed
-
- crc.process_bytes(reinterpret_cast<const char*>(temp.data()),
- temp.size());
- // For Snappy, add the CRC32 checksum
- int32_t checksum = crc();
-
- // Now compress
- size_t compressed_size = snappy::Compress(
- reinterpret_cast<const char*>(temp.data()), temp.size(),
- &compressed);
- temp.clear();
- {
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::back_inserter(temp));
- boost::iostreams::write(os, compressed.c_str(), compressed_size);
- }
- temp.push_back((checksum >> 24) & 0xFF);
- temp.push_back((checksum >> 16) & 0xFF);
- temp.push_back((checksum >> 8) & 0xFF);
- temp.push_back(checksum & 0xFF);
- std::unique_ptr<InputStream> in = memoryInputStream(
- reinterpret_cast<const uint8_t*>(temp.data()), temp.size());
- int64_t byteCount = temp.size();
- avro::encode(*encoderPtr_, byteCount);
- encoderPtr_->flush();
- copy(*in, *stream_);
-#endif
- }
-
- encoderPtr_->init(*stream_);
- avro::encode(*encoderPtr_, sync_);
- encoderPtr_->flush();
-
- lastSync_ = stream_->byteCount();
-
- buffer_ = memoryOutputStream();
- encoderPtr_->init(*buffer_);
- objectCount_ = 0;
-}
-
-void DataFileWriterBase::syncIfNeeded()
-{
- encoderPtr_->flush();
- if (buffer_->byteCount() >= syncInterval_) {
- sync();
- }
-}
-
-uint64_t DataFileWriterBase::getCurrentBlockStart()
-{
- return lastSync_;
-}
-
-void DataFileWriterBase::flush()
-{
- sync();
-}
-
-boost::mt19937 random(static_cast<uint32_t>(time(0)));
-
-DataFileSync DataFileWriterBase::makeSync()
-{
- DataFileSync sync;
- for (size_t i = 0; i < sync.size(); ++i) {
- sync[i] = random();
- }
- return sync;
-}
-
-typedef array<uint8_t, 4> Magic;
-static Magic magic = { { 'O', 'b', 'j', '\x01' } };
-
-void DataFileWriterBase::writeHeader()
-{
- encoderPtr_->init(*stream_);
- avro::encode(*encoderPtr_, magic);
- avro::encode(*encoderPtr_, metadata_);
- avro::encode(*encoderPtr_, sync_);
- encoderPtr_->flush();
-}
-
-void DataFileWriterBase::setMetadata(const string& key, const string& value)
-{
- vector<uint8_t> v(value.size());
- copy(value.begin(), value.end(), v.begin());
- metadata_[key] = v;
-}
-
-DataFileReaderBase::DataFileReaderBase(const char* filename) :
- filename_(filename), stream_(fileSeekableInputStream(filename)),
- decoder_(binaryDecoder()), objectCount_(0), eof_(false), blockStart_(-1),
- blockEnd_(-1)
-{
- readHeader();
-}
-
-DataFileReaderBase::DataFileReaderBase(std::unique_ptr<InputStream> inputStream) :
- filename_(""), stream_(std::move(inputStream)),
- decoder_(binaryDecoder()), objectCount_(0), eof_(false)
-{
- readHeader();
-}
-
-void DataFileReaderBase::init()
-{
- readerSchema_ = dataSchema_;
- dataDecoder_ = binaryDecoder();
- readDataBlock();
-}
-
-void DataFileReaderBase::init(const ValidSchema& readerSchema)
-{
- readerSchema_ = readerSchema;
- dataDecoder_ = (readerSchema_.toJson(true) != dataSchema_.toJson(true)) ?
- resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) :
- binaryDecoder();
- readDataBlock();
-}
-
-static void drain(InputStream& in)
-{
- const uint8_t *p = 0;
- size_t n = 0;
- while (in.next(&p, &n));
-}
-
-char hex(unsigned int x)
-{
- return x + (x < 10 ? '0' : ('a' - 10));
-}
-
-std::ostream& operator << (std::ostream& os, const DataFileSync& s)
-{
- for (size_t i = 0; i < s.size(); ++i) {
- os << hex(s[i] / 16) << hex(s[i] % 16) << ' ';
- }
- os << std::endl;
- return os;
-}
-
-
-bool DataFileReaderBase::hasMore()
-{
- for (; ;) {
- if (eof_) {
- return false;
- } else if (objectCount_ != 0) {
- return true;
- }
-
- dataDecoder_->init(*dataStream_);
- drain(*dataStream_);
- DataFileSync s;
- decoder_->init(*stream_);
- avro::decode(*decoder_, s);
- if (s != sync_) {
- throw Exception("Sync mismatch");
- }
- readDataBlock();
- }
-}
-
-class BoundedInputStream : public InputStream {
- InputStream& in_;
- size_t limit_;
-
- bool next(const uint8_t** data, size_t* len) {
- if (limit_ != 0 && in_.next(data, len)) {
- if (*len > limit_) {
- in_.backup(*len - limit_);
- *len = limit_;
- }
- limit_ -= *len;
- return true;
- }
- return false;
- }
-
- void backup(size_t len) {
- in_.backup(len);
- limit_ += len;
- }
-
- void skip(size_t len) {
- if (len > limit_) {
- len = limit_;
- }
- in_.skip(len);
- limit_ -= len;
- }
-
- size_t byteCount() const {
- return in_.byteCount();
- }
-
-public:
- BoundedInputStream(InputStream& in, size_t limit) :
- in_(in), limit_(limit) { }
-};
-
-unique_ptr<InputStream> boundedInputStream(InputStream& in, size_t limit)
-{
- return unique_ptr<InputStream>(new BoundedInputStream(in, limit));
-}
-
-void DataFileReaderBase::readDataBlock()
-{
- decoder_->init(*stream_);
- blockStart_ = stream_->byteCount();
- const uint8_t* p = 0;
- size_t n = 0;
- if (! stream_->next(&p, &n)) {
- eof_ = true;
- return;
- }
- stream_->backup(n);
- avro::decode(*decoder_, objectCount_);
- int64_t byteCount;
- avro::decode(*decoder_, byteCount);
- decoder_->init(*stream_);
- blockEnd_ = stream_->byteCount() + byteCount;
-
- unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
- if (codec_ == NULL_CODEC) {
- dataDecoder_->init(*st);
- dataStream_ = std::move(st);
-#ifdef SNAPPY_CODEC_AVAILABLE
- } else if (codec_ == SNAPPY_CODEC) {
- boost::crc_32_type crc;
- uint32_t checksum = 0;
- compressed_.clear();
- uncompressed.clear();
- const uint8_t* data;
- size_t len;
- while (st->next(&data, &len)) {
- compressed_.insert(compressed_.end(), data, data + len);
- }
- len = compressed_.size();
- int b1 = compressed_[len - 4] & 0xFF;
- int b2 = compressed_[len - 3] & 0xFF;
- int b3 = compressed_[len - 2] & 0xFF;
- int b4 = compressed_[len - 1] & 0xFF;
-
- checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
- if (!snappy::Uncompress(reinterpret_cast<const char*>(compressed_.data()),
- len - 4, &uncompressed)) {
- throw Exception(
- "Snappy Compression reported an error when decompressing");
- }
- crc.process_bytes(uncompressed.c_str(), uncompressed.size());
- uint32_t c = crc();
- if (checksum != c) {
- throw Exception(boost::format("Checksum did not match for Snappy compression: Expected: %1%, computed: %2%") % checksum % c);
- }
- os_.reset(new boost::iostreams::filtering_istream());
- os_->push(
- boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
- uncompressed.size()));
- std::unique_ptr<InputStream> in = istreamInputStream(*os_);
-
- dataDecoder_->init(*in);
- dataStream_ = std::move(in);
-#endif
- } else {
- compressed_.clear();
- const uint8_t* data;
- size_t len;
- while (st->next(&data, &len)) {
- compressed_.insert(compressed_.end(), data, data + len);
- }
- // boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
- os_.reset(new boost::iostreams::filtering_istream());
- os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
- os_->push(boost::iostreams::basic_array_source<char>(
- compressed_.data(), compressed_.size()));
-
- std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
- dataDecoder_->init(*in);
- dataStream_ = std::move(in);
- }
-}
-
-void DataFileReaderBase::close()
-{
-}
-
-static string toString(const vector<uint8_t>& v)
-{
- string result;
- result.resize(v.size());
- copy(v.begin(), v.end(), result.begin());
- return result;
-}
-
-static ValidSchema makeSchema(const vector<uint8_t>& v)
-{
- istringstream iss(toString(v));
- ValidSchema vs;
- compileJsonSchema(iss, vs);
- return ValidSchema(vs);
-}
-
-void DataFileReaderBase::readHeader()
-{
- decoder_->init(*stream_);
- Magic m;
- avro::decode(*decoder_, m);
- if (magic != m) {
- throw Exception("Invalid data file. Magic does not match: "
- + filename_);
- }
- avro::decode(*decoder_, metadata_);
- Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY);
- if (it == metadata_.end()) {
- throw Exception("No schema in metadata");
- }
-
- dataSchema_ = makeSchema(it->second);
- if (! readerSchema_.root()) {
- readerSchema_ = dataSchema();
- }
-
- it = metadata_.find(AVRO_CODEC_KEY);
- if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
- codec_ = DEFLATE_CODEC;
-#ifdef SNAPPY_CODEC_AVAILABLE
- } else if (it != metadata_.end()
- && toString(it->second) == AVRO_SNAPPY_CODEC) {
- codec_ = SNAPPY_CODEC;
-#endif
- } else {
- codec_ = NULL_CODEC;
- if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
- throw Exception("Unknown codec in data file: " + toString(it->second));
- }
- }
-
- avro::decode(*decoder_, sync_);
- decoder_->init(*stream_);
- blockStart_ = stream_->byteCount();
-}
-
-void DataFileReaderBase::doSeek(int64_t position)
-{
- if (SeekableInputStream *ss = dynamic_cast<SeekableInputStream *>(stream_.get())) {
- if (!eof_) {
- dataDecoder_->init(*dataStream_);
- drain(*dataStream_);
- }
- decoder_->init(*stream_);
- ss->seek(position);
- eof_ = false;
- } else {
- throw Exception("seek not supported on non-SeekableInputStream");
- }
-}
-
-void DataFileReaderBase::seek(int64_t position)
-{
- doSeek(position);
- readDataBlock();
-}
-
-void DataFileReaderBase::sync(int64_t position)
-{
- doSeek(position);
- DataFileSync sync_buffer;
- const uint8_t *p = 0;
- size_t n = 0;
- size_t i = 0;
- while (i < SyncSize) {
- if (n == 0 && !stream_->next(&p, &n)) {
- eof_ = true;
- return;
- }
- int len =
- std::min(static_cast<size_t>(SyncSize - i), n);
- memcpy(&sync_buffer[i], p, len);
- p += len;
- n -= len;
- i += len;
- }
- for (;;) {
- size_t j = 0;
- for (; j < SyncSize; ++j) {
- if (sync_[j] != sync_buffer[(i + j) % SyncSize]) {
- break;
- }
- }
- if (j == SyncSize) {
- // Found the sync marker!
- break;
- }
- if (n == 0 && !stream_->next(&p, &n)) {
- eof_ = true;
- return;
- }
- sync_buffer[i++ % SyncSize] = *p++;
- --n;
- }
- stream_->backup(n);
- readDataBlock();
-}
-
-bool DataFileReaderBase::pastSync(int64_t position) {
- return !hasMore() || blockStart_ >= position + SyncSize;
-}
-
-int64_t DataFileReaderBase::previousSync() {
- return blockStart_;
-}
-
-} // namespace avro
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DataFile.hh"
+#include "Compiler.hh"
+#include "Exception.hh"
+
+#include <sstream>
+
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/iostreams/device/file.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
+#include <boost/crc.hpp> // for boost::crc_32_type
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+#include <snappy.h>
+#endif
+
+namespace avro {
+using std::unique_ptr;
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
+using std::copy;
+using std::string;
+
+using std::array;
+
+namespace {
+const string AVRO_SCHEMA_KEY("avro.schema");
+const string AVRO_CODEC_KEY("avro.codec");
+const string AVRO_NULL_CODEC("null");
+const string AVRO_DEFLATE_CODEC("deflate");
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+const string AVRO_SNAPPY_CODEC = "snappy";
+#endif
+
+const size_t minSyncInterval = 32;
+const size_t maxSyncInterval = 1u << 30;
+
+boost::iostreams::zlib_params get_zlib_params() {
+ boost::iostreams::zlib_params ret;
+ ret.method = boost::iostreams::zlib::deflated;
+ ret.noheader = true;
+ return ret;
+}
+}
+
+DataFileWriterBase::DataFileWriterBase(const char* filename, const ValidSchema& schema, size_t syncInterval,
+ Codec codec) :
+ filename_(filename),
+ schema_(schema),
+ encoderPtr_(binaryEncoder()),
+ syncInterval_(syncInterval),
+ codec_(codec),
+ stream_(fileOutputStream(filename)),
+ buffer_(memoryOutputStream()),
+ sync_(makeSync()),
+ objectCount_(0),
+ lastSync_(0)
+{
+ init(schema, syncInterval, codec);
+}
+
+DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
+ const ValidSchema& schema, size_t syncInterval, Codec codec) :
+ filename_(),
+ schema_(schema),
+ encoderPtr_(binaryEncoder()),
+ syncInterval_(syncInterval),
+ codec_(codec),
+ stream_(std::move(outputStream)),
+ buffer_(memoryOutputStream()),
+ sync_(makeSync()),
+ objectCount_(0),
+ lastSync_(0)
+{
+ init(schema, syncInterval, codec);
+}
+
+void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec) {
+ if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
+ throw Exception(boost::format("Invalid sync interval: %1%. "
+ "Should be between %2% and %3%") % syncInterval %
+ minSyncInterval % maxSyncInterval);
+ }
+ setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+
+ if (codec_ == NULL_CODEC) {
+ setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+ } else if (codec_ == DEFLATE_CODEC) {
+ setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
+#ifdef SNAPPY_CODEC_AVAILABLE
+ } else if (codec_ == SNAPPY_CODEC) {
+ setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
+#endif
+ } else {
+ throw Exception(boost::format("Unknown codec: %1%") % codec);
+ }
+ setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
+
+ writeHeader();
+ encoderPtr_->init(*buffer_);
+
+ lastSync_ = stream_->byteCount();
+}
+
+
+DataFileWriterBase::~DataFileWriterBase()
+{
+ if (stream_.get()) {
+ close();
+ }
+}
+
+void DataFileWriterBase::close()
+{
+ flush();
+ stream_.reset();
+}
+
+void DataFileWriterBase::sync()
+{
+ encoderPtr_->flush();
+
+ encoderPtr_->init(*stream_);
+ avro::encode(*encoderPtr_, objectCount_);
+ if (codec_ == NULL_CODEC) {
+ int64_t byteCount = buffer_->byteCount();
+ avro::encode(*encoderPtr_, byteCount);
+ encoderPtr_->flush();
+ std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
+ copy(*in, *stream_);
+ } else if (codec_ == DEFLATE_CODEC) {
+ std::vector<char> buf;
+ {
+ boost::iostreams::filtering_ostream os;
+ os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
+ os.push(boost::iostreams::back_inserter(buf));
+ const uint8_t* data;
+ size_t len;
+
+ std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
+ while (input->next(&data, &len)) {
+ boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
+ }
+ } // make sure all is flushed
+ std::unique_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t*>(buf.data()), buf.size());
+ int64_t byteCount = buf.size();
+ avro::encode(*encoderPtr_, byteCount);
+ encoderPtr_->flush();
+ copy(*in, *stream_);
+#ifdef SNAPPY_CODEC_AVAILABLE
+ } else if (codec_ == SNAPPY_CODEC) {
+ std::vector<char> temp;
+ std::string compressed;
+ boost::crc_32_type crc;
+ {
+ boost::iostreams::filtering_ostream os;
+ os.push(boost::iostreams::back_inserter(temp));
+ const uint8_t* data;
+ size_t len;
+
+ std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
+ while (input->next(&data, &len)) {
+ boost::iostreams::write(os, reinterpret_cast<const char*>(data),
+ len);
+ }
+ } // make sure all is flushed
+
+ crc.process_bytes(reinterpret_cast<const char*>(temp.data()),
+ temp.size());
+ // For Snappy, add the CRC32 checksum
+ int32_t checksum = crc();
+
+ // Now compress
+ size_t compressed_size = snappy::Compress(
+ reinterpret_cast<const char*>(temp.data()), temp.size(),
+ &compressed);
+ temp.clear();
+ {
+ boost::iostreams::filtering_ostream os;
+ os.push(boost::iostreams::back_inserter(temp));
+ boost::iostreams::write(os, compressed.c_str(), compressed_size);
+ }
+ temp.push_back((checksum >> 24) & 0xFF);
+ temp.push_back((checksum >> 16) & 0xFF);
+ temp.push_back((checksum >> 8) & 0xFF);
+ temp.push_back(checksum & 0xFF);
+ std::unique_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t*>(temp.data()), temp.size());
+ int64_t byteCount = temp.size();
+ avro::encode(*encoderPtr_, byteCount);
+ encoderPtr_->flush();
+ copy(*in, *stream_);
+#endif
+ }
+
+ encoderPtr_->init(*stream_);
+ avro::encode(*encoderPtr_, sync_);
+ encoderPtr_->flush();
+
+ lastSync_ = stream_->byteCount();
+
+ buffer_ = memoryOutputStream();
+ encoderPtr_->init(*buffer_);
+ objectCount_ = 0;
+}
+
+void DataFileWriterBase::syncIfNeeded()
+{
+ encoderPtr_->flush();
+ if (buffer_->byteCount() >= syncInterval_) {
+ sync();
+ }
+}
+
+uint64_t DataFileWriterBase::getCurrentBlockStart()
+{
+ return lastSync_;
+}
+
+void DataFileWriterBase::flush()
+{
+ sync();
+}
+
+boost::mt19937 random(static_cast<uint32_t>(time(0)));
+
+DataFileSync DataFileWriterBase::makeSync()
+{
+ DataFileSync sync;
+ for (size_t i = 0; i < sync.size(); ++i) {
+ sync[i] = random();
+ }
+ return sync;
+}
+
+typedef array<uint8_t, 4> Magic;
+static Magic magic = { { 'O', 'b', 'j', '\x01' } };
+
+void DataFileWriterBase::writeHeader()
+{
+ encoderPtr_->init(*stream_);
+ avro::encode(*encoderPtr_, magic);
+ avro::encode(*encoderPtr_, metadata_);
+ avro::encode(*encoderPtr_, sync_);
+ encoderPtr_->flush();
+}
+
+void DataFileWriterBase::setMetadata(const string& key, const string& value)
+{
+ vector<uint8_t> v(value.size());
+ copy(value.begin(), value.end(), v.begin());
+ metadata_[key] = v;
+}
+
+DataFileReaderBase::DataFileReaderBase(const char* filename) :
+ filename_(filename), stream_(fileSeekableInputStream(filename)),
+ decoder_(binaryDecoder()), objectCount_(0), eof_(false), blockStart_(-1),
+ blockEnd_(-1)
+{
+ readHeader();
+}
+
+DataFileReaderBase::DataFileReaderBase(std::unique_ptr<InputStream> inputStream) :
+ filename_(""), stream_(std::move(inputStream)),
+ decoder_(binaryDecoder()), objectCount_(0), eof_(false)
+{
+ readHeader();
+}
+
+void DataFileReaderBase::init()
+{
+ readerSchema_ = dataSchema_;
+ dataDecoder_ = binaryDecoder();
+ readDataBlock();
+}
+
+void DataFileReaderBase::init(const ValidSchema& readerSchema)
+{
+ readerSchema_ = readerSchema;
+ dataDecoder_ = (readerSchema_.toJson(true) != dataSchema_.toJson(true)) ?
+ resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) :
+ binaryDecoder();
+ readDataBlock();
+}
+
+static void drain(InputStream& in)
+{
+ const uint8_t *p = 0;
+ size_t n = 0;
+ while (in.next(&p, &n));
+}
+
+char hex(unsigned int x)
+{
+ return x + (x < 10 ? '0' : ('a' - 10));
+}
+
+std::ostream& operator << (std::ostream& os, const DataFileSync& s)
+{
+ for (size_t i = 0; i < s.size(); ++i) {
+ os << hex(s[i] / 16) << hex(s[i] % 16) << ' ';
+ }
+ os << std::endl;
+ return os;
+}
+
+
+bool DataFileReaderBase::hasMore()
+{
+ for (; ;) {
+ if (eof_) {
+ return false;
+ } else if (objectCount_ != 0) {
+ return true;
+ }
+
+ dataDecoder_->init(*dataStream_);
+ drain(*dataStream_);
+ DataFileSync s;
+ decoder_->init(*stream_);
+ avro::decode(*decoder_, s);
+ if (s != sync_) {
+ throw Exception("Sync mismatch");
+ }
+ readDataBlock();
+ }
+}
+
+class BoundedInputStream : public InputStream {
+ InputStream& in_;
+ size_t limit_;
+
+ bool next(const uint8_t** data, size_t* len) {
+ if (limit_ != 0 && in_.next(data, len)) {
+ if (*len > limit_) {
+ in_.backup(*len - limit_);
+ *len = limit_;
+ }
+ limit_ -= *len;
+ return true;
+ }
+ return false;
+ }
+
+ void backup(size_t len) {
+ in_.backup(len);
+ limit_ += len;
+ }
+
+ void skip(size_t len) {
+ if (len > limit_) {
+ len = limit_;
+ }
+ in_.skip(len);
+ limit_ -= len;
+ }
+
+ size_t byteCount() const {
+ return in_.byteCount();
+ }
+
+public:
+ BoundedInputStream(InputStream& in, size_t limit) :
+ in_(in), limit_(limit) { }
+};
+
+unique_ptr<InputStream> boundedInputStream(InputStream& in, size_t limit)
+{
+ return unique_ptr<InputStream>(new BoundedInputStream(in, limit));
+}
+
+void DataFileReaderBase::readDataBlock()
+{
+ decoder_->init(*stream_);
+ blockStart_ = stream_->byteCount();
+ const uint8_t* p = 0;
+ size_t n = 0;
+ if (! stream_->next(&p, &n)) {
+ eof_ = true;
+ return;
+ }
+ stream_->backup(n);
+ avro::decode(*decoder_, objectCount_);
+ int64_t byteCount;
+ avro::decode(*decoder_, byteCount);
+ decoder_->init(*stream_);
+ blockEnd_ = stream_->byteCount() + byteCount;
+
+ unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
+ if (codec_ == NULL_CODEC) {
+ dataDecoder_->init(*st);
+ dataStream_ = std::move(st);
+#ifdef SNAPPY_CODEC_AVAILABLE
+ } else if (codec_ == SNAPPY_CODEC) {
+ boost::crc_32_type crc;
+ uint32_t checksum = 0;
+ compressed_.clear();
+ uncompressed.clear();
+ const uint8_t* data;
+ size_t len;
+ while (st->next(&data, &len)) {
+ compressed_.insert(compressed_.end(), data, data + len);
+ }
+ len = compressed_.size();
+ int b1 = compressed_[len - 4] & 0xFF;
+ int b2 = compressed_[len - 3] & 0xFF;
+ int b3 = compressed_[len - 2] & 0xFF;
+ int b4 = compressed_[len - 1] & 0xFF;
+
+ checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
+ if (!snappy::Uncompress(reinterpret_cast<const char*>(compressed_.data()),
+ len - 4, &uncompressed)) {
+ throw Exception(
+ "Snappy Compression reported an error when decompressing");
+ }
+ crc.process_bytes(uncompressed.c_str(), uncompressed.size());
+ uint32_t c = crc();
+ if (checksum != c) {
+ throw Exception(boost::format("Checksum did not match for Snappy compression: Expected: %1%, computed: %2%") % checksum % c);
+ }
+ os_.reset(new boost::iostreams::filtering_istream());
+ os_->push(
+ boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
+ uncompressed.size()));
+ std::unique_ptr<InputStream> in = istreamInputStream(*os_);
+
+ dataDecoder_->init(*in);
+ dataStream_ = std::move(in);
+#endif
+ } else {
+ compressed_.clear();
+ const uint8_t* data;
+ size_t len;
+ while (st->next(&data, &len)) {
+ compressed_.insert(compressed_.end(), data, data + len);
+ }
+ // boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
+ os_.reset(new boost::iostreams::filtering_istream());
+ os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
+ os_->push(boost::iostreams::basic_array_source<char>(
+ compressed_.data(), compressed_.size()));
+
+ std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
+ dataDecoder_->init(*in);
+ dataStream_ = std::move(in);
+ }
+}
+
+void DataFileReaderBase::close()
+{
+}
+
+static string toString(const vector<uint8_t>& v)
+{
+ string result;
+ result.resize(v.size());
+ copy(v.begin(), v.end(), result.begin());
+ return result;
+}
+
+static ValidSchema makeSchema(const vector<uint8_t>& v)
+{
+ istringstream iss(toString(v));
+ ValidSchema vs;
+ compileJsonSchema(iss, vs);
+ return ValidSchema(vs);
+}
+
+void DataFileReaderBase::readHeader()
+{
+ decoder_->init(*stream_);
+ Magic m;
+ avro::decode(*decoder_, m);
+ if (magic != m) {
+ throw Exception("Invalid data file. Magic does not match: "
+ + filename_);
+ }
+ avro::decode(*decoder_, metadata_);
+ Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY);
+ if (it == metadata_.end()) {
+ throw Exception("No schema in metadata");
+ }
+
+ dataSchema_ = makeSchema(it->second);
+ if (! readerSchema_.root()) {
+ readerSchema_ = dataSchema();
+ }
+
+ it = metadata_.find(AVRO_CODEC_KEY);
+ if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
+ codec_ = DEFLATE_CODEC;
+#ifdef SNAPPY_CODEC_AVAILABLE
+ } else if (it != metadata_.end()
+ && toString(it->second) == AVRO_SNAPPY_CODEC) {
+ codec_ = SNAPPY_CODEC;
+#endif
+ } else {
+ codec_ = NULL_CODEC;
+ if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
+ throw Exception("Unknown codec in data file: " + toString(it->second));
+ }
+ }
+
+ avro::decode(*decoder_, sync_);
+ decoder_->init(*stream_);
+ blockStart_ = stream_->byteCount();
+}
+
+void DataFileReaderBase::doSeek(int64_t position)
+{
+ if (SeekableInputStream *ss = dynamic_cast<SeekableInputStream *>(stream_.get())) {
+ if (!eof_) {
+ dataDecoder_->init(*dataStream_);
+ drain(*dataStream_);
+ }
+ decoder_->init(*stream_);
+ ss->seek(position);
+ eof_ = false;
+ } else {
+ throw Exception("seek not supported on non-SeekableInputStream");
+ }
+}
+
+void DataFileReaderBase::seek(int64_t position)
+{
+ doSeek(position);
+ readDataBlock();
+}
+
+void DataFileReaderBase::sync(int64_t position)
+{
+ doSeek(position);
+ DataFileSync sync_buffer;
+ const uint8_t *p = 0;
+ size_t n = 0;
+ size_t i = 0;
+ while (i < SyncSize) {
+ if (n == 0 && !stream_->next(&p, &n)) {
+ eof_ = true;
+ return;
+ }
+ int len =
+ std::min(static_cast<size_t>(SyncSize - i), n);
+ memcpy(&sync_buffer[i], p, len);
+ p += len;
+ n -= len;
+ i += len;
+ }
+ for (;;) {
+ size_t j = 0;
+ for (; j < SyncSize; ++j) {
+ if (sync_[j] != sync_buffer[(i + j) % SyncSize]) {
+ break;
+ }
+ }
+ if (j == SyncSize) {
+ // Found the sync marker!
+ break;
+ }
+ if (n == 0 && !stream_->next(&p, &n)) {
+ eof_ = true;
+ return;
+ }
+ sync_buffer[i++ % SyncSize] = *p++;
+ --n;
+ }
+ stream_->backup(n);
+ readDataBlock();
+}
+
+bool DataFileReaderBase::pastSync(int64_t position) {
+ return !hasMore() || blockStart_ >= position + SyncSize;
+}
+
+int64_t DataFileReaderBase::previousSync() {
+ return blockStart_;
+}
+
+} // namespace avro