diff options
author | thegeorg <thegeorg@yandex-team.ru> | 2022-02-10 16:45:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:08 +0300 |
commit | 4e839db24a3bbc9f1c610c43d6faaaa99824dcca (patch) | |
tree | 506dac10f5df94fab310584ee51b24fc5a081c22 /contrib/libs/apache/avro/impl/FileStream.cc | |
parent | 2d37894b1b037cf24231090eda8589bbb44fb6fc (diff) | |
download | ydb-4e839db24a3bbc9f1c610c43d6faaaa99824dcca.tar.gz |
Restoring authorship annotation for <thegeorg@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/apache/avro/impl/FileStream.cc')
-rw-r--r-- | contrib/libs/apache/avro/impl/FileStream.cc | 794 |
1 files changed, 397 insertions, 397 deletions
diff --git a/contrib/libs/apache/avro/impl/FileStream.cc b/contrib/libs/apache/avro/impl/FileStream.cc index ed601b4c6f..03013a1224 100644 --- a/contrib/libs/apache/avro/impl/FileStream.cc +++ b/contrib/libs/apache/avro/impl/FileStream.cc @@ -1,397 +1,397 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fstream> -#include "Stream.hh" -#ifndef _WIN32 -#include "unistd.h" -#include "fcntl.h" -#include "errno.h" - -#ifndef O_BINARY -#define O_BINARY 0 -#endif -#else -#include "Windows.h" - -#ifdef min -#undef min -#endif -#endif - -using std::unique_ptr; -using std::istream; -using std::ostream; - -namespace avro { -namespace { -struct BufferCopyIn { - virtual ~BufferCopyIn() { } - virtual void seek(size_t len) = 0; - virtual bool read(uint8_t* b, size_t toRead, size_t& actual) = 0; - -}; - -struct FileBufferCopyIn : public BufferCopyIn { -#ifdef _WIN32 - HANDLE h_; - FileBufferCopyIn(const char* filename) : - h_(::CreateFileA(filename, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) { - if (h_ == INVALID_HANDLE_VALUE) { - throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError()); - } - } - - ~FileBufferCopyIn() { - ::CloseHandle(h_); - } - - void seek(size_t len) { - if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR) { - throw Exception(boost::format("Cannot skip file: %1%") % ::GetLastError()); - } - } - - bool read(uint8_t* b, size_t toRead, size_t& actual) { - DWORD dw = 0; - if (! ::ReadFile(h_, b, toRead, &dw, NULL)) { - throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError()); - } - actual = static_cast<size_t>(dw); - return actual != 0; - } -#else - const int fd_; - - FileBufferCopyIn(const char* filename) : - fd_(open(filename, O_RDONLY | O_BINARY)) { - if (fd_ < 0) { - throw Exception(boost::format("Cannot open file: %1%") % - ::strerror(errno)); - } - } - - ~FileBufferCopyIn() { - ::close(fd_); - } - - void seek(size_t len) { - off_t r = ::lseek(fd_, len, SEEK_CUR); - if (r == static_cast<off_t>(-1)) { - throw Exception(boost::format("Cannot skip file: %1%") % - strerror(errno)); - } - } - - bool read(uint8_t* b, size_t toRead, size_t& actual) { - int n = ::read(fd_, b, toRead); - if (n > 0) { - actual = n; - return true; - } - return false; - } -#endif - -}; - -struct IStreamBufferCopyIn : public BufferCopyIn { - istream& is_; - - IStreamBufferCopyIn(istream& is) : is_(is) { - } - - void seek(size_t len) { - if (! is_.seekg(len, std::ios_base::cur)) { - throw Exception("Cannot skip stream"); - } - } - - bool read(uint8_t* b, size_t toRead, size_t& actual) { - is_.read(reinterpret_cast<char*>(b), toRead); - if (is_.bad()) { - return false; - } - actual = static_cast<size_t>(is_.gcount()); - return (! is_.eof() || actual != 0); - } - -}; - -struct NonSeekableIStreamBufferCopyIn : public IStreamBufferCopyIn { - NonSeekableIStreamBufferCopyIn(istream& is) : IStreamBufferCopyIn(is) { } - - void seek(size_t len) { - const size_t bufSize = 4096; - uint8_t buf[bufSize]; - while (len > 0) { - size_t n = std::min(len, bufSize); - is_.read(reinterpret_cast<char*>(buf), n); - if (is_.bad()) { - throw Exception("Cannot skip stream"); - } - size_t actual = static_cast<size_t>(is_.gcount()); - if (is_.eof() && actual == 0) { - throw Exception("Cannot skip stream"); - } - len -= n; - } - } -}; - -} - -class BufferCopyInInputStream : public SeekableInputStream { - const size_t bufferSize_; - uint8_t* const buffer_; - unique_ptr<BufferCopyIn> in_; - size_t byteCount_; - uint8_t* next_; - size_t available_; - - bool next(const uint8_t** data, size_t *size) { - if (available_ == 0 && ! fill()) { - return false; - } - *data = next_; - *size = available_; - next_ += available_; - byteCount_ += available_; - available_ = 0; - return true; - } - - void backup(size_t len) { - next_ -= len; - available_ += len; - byteCount_ -= len; - } - - void skip(size_t len) { - while (len > 0) { - if (available_ == 0) { - in_->seek(len); - byteCount_ += len; - return; - } - size_t n = std::min(available_, len); - available_ -= n; - next_ += n; - len -= n; - byteCount_ += n; - } - } - - size_t byteCount() const { return byteCount_; } - - bool fill() { - size_t n = 0; - if (in_->read(buffer_, bufferSize_, n)) { - next_ = buffer_; - available_ = n; - return true; - } - return false; - } - - void seek(int64_t position) { - // BufferCopyIn::seek is relative to byteCount_, whereas position is - // absolute. - in_->seek(position - byteCount_ - available_); - byteCount_ = position; - available_ = 0; - } - -public: - BufferCopyInInputStream(unique_ptr<BufferCopyIn> in, size_t bufferSize) : - bufferSize_(bufferSize), - buffer_(new uint8_t[bufferSize]), - in_(std::move(in)), - byteCount_(0), - next_(buffer_), - available_(0) { } - - ~BufferCopyInInputStream() { - delete[] buffer_; - } -}; - -namespace { -struct BufferCopyOut { - virtual ~BufferCopyOut() { } - virtual void write(const uint8_t* b, size_t len) = 0; -}; - -struct FileBufferCopyOut : public BufferCopyOut { -#ifdef _WIN32 - HANDLE h_; - FileBufferCopyOut(const char* filename) : - h_(::CreateFileA(filename, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) { - if (h_ == INVALID_HANDLE_VALUE) { - throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError()); - } - } - - ~FileBufferCopyOut() { - ::CloseHandle(h_); - } - - void write(const uint8_t* b, size_t len) { - while (len > 0) { - DWORD dw = 0; - if (! ::WriteFile(h_, b, len, &dw, NULL)) { - throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError()); - } - b += dw; - len -= dw; - } - } -#else - const int fd_; - - FileBufferCopyOut(const char* filename) : - fd_(::open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0644)) { - - if (fd_ < 0) { - throw Exception(boost::format("Cannot open file: %1%") % - ::strerror(errno)); - } - } - - ~FileBufferCopyOut() { - ::close(fd_); - } - - void write(const uint8_t* b, size_t len) { - if (::write(fd_, b, len) < 0) { - throw Exception(boost::format("Cannot write file: %1%") % - ::strerror(errno)); - } - } -#endif - -}; - -struct OStreamBufferCopyOut : public BufferCopyOut { - ostream& os_; - - OStreamBufferCopyOut(ostream& os) : os_(os) { - } - - void write(const uint8_t* b, size_t len) { - os_.write(reinterpret_cast<const char*>(b), len); - } - -}; - -} - -class BufferCopyOutputStream : public OutputStream { - size_t bufferSize_; - uint8_t* const buffer_; - unique_ptr<BufferCopyOut> out_; - uint8_t* next_; - size_t available_; - size_t byteCount_; - - // Invaiant: byteCount_ == byteswritten + bufferSize_ - available_; - bool next(uint8_t** data, size_t* len) { - if (available_ == 0) { - flush(); - } - *data = next_; - *len = available_; - next_ += available_; - byteCount_ += available_; - available_ = 0; - return true; - } - - void backup(size_t len) { - available_ += len; - next_ -= len; - byteCount_ -= len; - } - - uint64_t byteCount() const { - return byteCount_; - } - - void flush() { - out_->write(buffer_, bufferSize_ - available_); - next_ = buffer_; - available_ = bufferSize_; - } - -public: - BufferCopyOutputStream(unique_ptr<BufferCopyOut> out, size_t bufferSize) : - bufferSize_(bufferSize), - buffer_(new uint8_t[bufferSize]), - out_(std::move(out)), - next_(buffer_), - available_(bufferSize_), byteCount_(0) { } - - ~BufferCopyOutputStream() { - delete[] buffer_; - } -}; - -unique_ptr<InputStream> fileInputStream(const char* filename, - size_t bufferSize) -{ - unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename)); - return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); -} - -unique_ptr<SeekableInputStream> fileSeekableInputStream(const char* filename, - size_t bufferSize) -{ - unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename)); - return unique_ptr<SeekableInputStream>( new BufferCopyInInputStream(std::move(in), - bufferSize)); -} - -unique_ptr<InputStream> istreamInputStream(istream& is, size_t bufferSize) -{ - unique_ptr<BufferCopyIn> in(new IStreamBufferCopyIn(is)); - return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); -} - -unique_ptr<InputStream> nonSeekableIstreamInputStream( - istream& is, size_t bufferSize) -{ - unique_ptr<BufferCopyIn> in(new NonSeekableIStreamBufferCopyIn(is)); - return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); -} - -unique_ptr<OutputStream> fileOutputStream(const char* filename, - size_t bufferSize) -{ - unique_ptr<BufferCopyOut> out(new FileBufferCopyOut(filename)); - return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize)); -} - -unique_ptr<OutputStream> ostreamOutputStream(ostream& os, - size_t bufferSize) -{ - unique_ptr<BufferCopyOut> out(new OStreamBufferCopyOut(os)); - return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize)); -} - - -} // namespace avro +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fstream> +#include "Stream.hh" +#ifndef _WIN32 +#include "unistd.h" +#include "fcntl.h" +#include "errno.h" + +#ifndef O_BINARY +#define O_BINARY 0 +#endif +#else +#include "Windows.h" + +#ifdef min +#undef min +#endif +#endif + +using std::unique_ptr; +using std::istream; +using std::ostream; + +namespace avro { +namespace { +struct BufferCopyIn { + virtual ~BufferCopyIn() { } + virtual void seek(size_t len) = 0; + virtual bool read(uint8_t* b, size_t toRead, size_t& actual) = 0; + +}; + +struct FileBufferCopyIn : public BufferCopyIn { +#ifdef _WIN32 + HANDLE h_; + FileBufferCopyIn(const char* filename) : + h_(::CreateFileA(filename, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) { + if (h_ == INVALID_HANDLE_VALUE) { + throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError()); + } + } + + ~FileBufferCopyIn() { + ::CloseHandle(h_); + } + + void seek(size_t len) { + if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR) { + throw Exception(boost::format("Cannot skip file: %1%") % ::GetLastError()); + } + } + + bool read(uint8_t* b, size_t toRead, size_t& actual) { + DWORD dw = 0; + if (! ::ReadFile(h_, b, toRead, &dw, NULL)) { + throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError()); + } + actual = static_cast<size_t>(dw); + return actual != 0; + } +#else + const int fd_; + + FileBufferCopyIn(const char* filename) : + fd_(open(filename, O_RDONLY | O_BINARY)) { + if (fd_ < 0) { + throw Exception(boost::format("Cannot open file: %1%") % + ::strerror(errno)); + } + } + + ~FileBufferCopyIn() { + ::close(fd_); + } + + void seek(size_t len) { + off_t r = ::lseek(fd_, len, SEEK_CUR); + if (r == static_cast<off_t>(-1)) { + throw Exception(boost::format("Cannot skip file: %1%") % + strerror(errno)); + } + } + + bool read(uint8_t* b, size_t toRead, size_t& actual) { + int n = ::read(fd_, b, toRead); + if (n > 0) { + actual = n; + return true; + } + return false; + } +#endif + +}; + +struct IStreamBufferCopyIn : public BufferCopyIn { + istream& is_; + + IStreamBufferCopyIn(istream& is) : is_(is) { + } + + void seek(size_t len) { + if (! is_.seekg(len, std::ios_base::cur)) { + throw Exception("Cannot skip stream"); + } + } + + bool read(uint8_t* b, size_t toRead, size_t& actual) { + is_.read(reinterpret_cast<char*>(b), toRead); + if (is_.bad()) { + return false; + } + actual = static_cast<size_t>(is_.gcount()); + return (! is_.eof() || actual != 0); + } + +}; + +struct NonSeekableIStreamBufferCopyIn : public IStreamBufferCopyIn { + NonSeekableIStreamBufferCopyIn(istream& is) : IStreamBufferCopyIn(is) { } + + void seek(size_t len) { + const size_t bufSize = 4096; + uint8_t buf[bufSize]; + while (len > 0) { + size_t n = std::min(len, bufSize); + is_.read(reinterpret_cast<char*>(buf), n); + if (is_.bad()) { + throw Exception("Cannot skip stream"); + } + size_t actual = static_cast<size_t>(is_.gcount()); + if (is_.eof() && actual == 0) { + throw Exception("Cannot skip stream"); + } + len -= n; + } + } +}; + +} + +class BufferCopyInInputStream : public SeekableInputStream { + const size_t bufferSize_; + uint8_t* const buffer_; + unique_ptr<BufferCopyIn> in_; + size_t byteCount_; + uint8_t* next_; + size_t available_; + + bool next(const uint8_t** data, size_t *size) { + if (available_ == 0 && ! fill()) { + return false; + } + *data = next_; + *size = available_; + next_ += available_; + byteCount_ += available_; + available_ = 0; + return true; + } + + void backup(size_t len) { + next_ -= len; + available_ += len; + byteCount_ -= len; + } + + void skip(size_t len) { + while (len > 0) { + if (available_ == 0) { + in_->seek(len); + byteCount_ += len; + return; + } + size_t n = std::min(available_, len); + available_ -= n; + next_ += n; + len -= n; + byteCount_ += n; + } + } + + size_t byteCount() const { return byteCount_; } + + bool fill() { + size_t n = 0; + if (in_->read(buffer_, bufferSize_, n)) { + next_ = buffer_; + available_ = n; + return true; + } + return false; + } + + void seek(int64_t position) { + // BufferCopyIn::seek is relative to byteCount_, whereas position is + // absolute. + in_->seek(position - byteCount_ - available_); + byteCount_ = position; + available_ = 0; + } + +public: + BufferCopyInInputStream(unique_ptr<BufferCopyIn> in, size_t bufferSize) : + bufferSize_(bufferSize), + buffer_(new uint8_t[bufferSize]), + in_(std::move(in)), + byteCount_(0), + next_(buffer_), + available_(0) { } + + ~BufferCopyInInputStream() { + delete[] buffer_; + } +}; + +namespace { +struct BufferCopyOut { + virtual ~BufferCopyOut() { } + virtual void write(const uint8_t* b, size_t len) = 0; +}; + +struct FileBufferCopyOut : public BufferCopyOut { +#ifdef _WIN32 + HANDLE h_; + FileBufferCopyOut(const char* filename) : + h_(::CreateFileA(filename, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) { + if (h_ == INVALID_HANDLE_VALUE) { + throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError()); + } + } + + ~FileBufferCopyOut() { + ::CloseHandle(h_); + } + + void write(const uint8_t* b, size_t len) { + while (len > 0) { + DWORD dw = 0; + if (! ::WriteFile(h_, b, len, &dw, NULL)) { + throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError()); + } + b += dw; + len -= dw; + } + } +#else + const int fd_; + + FileBufferCopyOut(const char* filename) : + fd_(::open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0644)) { + + if (fd_ < 0) { + throw Exception(boost::format("Cannot open file: %1%") % + ::strerror(errno)); + } + } + + ~FileBufferCopyOut() { + ::close(fd_); + } + + void write(const uint8_t* b, size_t len) { + if (::write(fd_, b, len) < 0) { + throw Exception(boost::format("Cannot write file: %1%") % + ::strerror(errno)); + } + } +#endif + +}; + +struct OStreamBufferCopyOut : public BufferCopyOut { + ostream& os_; + + OStreamBufferCopyOut(ostream& os) : os_(os) { + } + + void write(const uint8_t* b, size_t len) { + os_.write(reinterpret_cast<const char*>(b), len); + } + +}; + +} + +class BufferCopyOutputStream : public OutputStream { + size_t bufferSize_; + uint8_t* const buffer_; + unique_ptr<BufferCopyOut> out_; + uint8_t* next_; + size_t available_; + size_t byteCount_; + + // Invaiant: byteCount_ == byteswritten + bufferSize_ - available_; + bool next(uint8_t** data, size_t* len) { + if (available_ == 0) { + flush(); + } + *data = next_; + *len = available_; + next_ += available_; + byteCount_ += available_; + available_ = 0; + return true; + } + + void backup(size_t len) { + available_ += len; + next_ -= len; + byteCount_ -= len; + } + + uint64_t byteCount() const { + return byteCount_; + } + + void flush() { + out_->write(buffer_, bufferSize_ - available_); + next_ = buffer_; + available_ = bufferSize_; + } + +public: + BufferCopyOutputStream(unique_ptr<BufferCopyOut> out, size_t bufferSize) : + bufferSize_(bufferSize), + buffer_(new uint8_t[bufferSize]), + out_(std::move(out)), + next_(buffer_), + available_(bufferSize_), byteCount_(0) { } + + ~BufferCopyOutputStream() { + delete[] buffer_; + } +}; + +unique_ptr<InputStream> fileInputStream(const char* filename, + size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename)); + return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); +} + +unique_ptr<SeekableInputStream> fileSeekableInputStream(const char* filename, + size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename)); + return unique_ptr<SeekableInputStream>( new BufferCopyInInputStream(std::move(in), + bufferSize)); +} + +unique_ptr<InputStream> istreamInputStream(istream& is, size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new IStreamBufferCopyIn(is)); + return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); +} + +unique_ptr<InputStream> nonSeekableIstreamInputStream( + istream& is, size_t bufferSize) +{ + unique_ptr<BufferCopyIn> in(new NonSeekableIStreamBufferCopyIn(is)); + return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize)); +} + +unique_ptr<OutputStream> fileOutputStream(const char* filename, + size_t bufferSize) +{ + unique_ptr<BufferCopyOut> out(new FileBufferCopyOut(filename)); + return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize)); +} + +unique_ptr<OutputStream> ostreamOutputStream(ostream& os, + size_t bufferSize) +{ + unique_ptr<BufferCopyOut> out(new OStreamBufferCopyOut(os)); + return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize)); +} + + +} // namespace avro |