diff options
author | bulatman <bulatman@yandex-team.ru> | 2022-02-10 16:45:50 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:50 +0300 |
commit | 2f6ca198245aeffd5e2d82b65927c2465b68b4f5 (patch) | |
tree | 9142afc54d335ea52910662635b898e79e192e49 /library/cpp/streams/zstd | |
parent | 6560e4993b14d193f8c879e33a3de5e5eba6e21d (diff) | |
download | ydb-2f6ca198245aeffd5e2d82b65927c2465b68b4f5.tar.gz |
Restoring authorship annotation for <bulatman@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/streams/zstd')
-rw-r--r-- | library/cpp/streams/zstd/ut/ya.make | 22 | ||||
-rw-r--r-- | library/cpp/streams/zstd/ya.make | 32 | ||||
-rw-r--r-- | library/cpp/streams/zstd/zstd.cpp | 342 | ||||
-rw-r--r-- | library/cpp/streams/zstd/zstd.h | 102 | ||||
-rw-r--r-- | library/cpp/streams/zstd/zstd_ut.cpp | 186 |
5 files changed, 342 insertions, 342 deletions
diff --git a/library/cpp/streams/zstd/ut/ya.make b/library/cpp/streams/zstd/ut/ya.make index 64cfcabf84..1b98f0ad5e 100644 --- a/library/cpp/streams/zstd/ut/ya.make +++ b/library/cpp/streams/zstd/ut/ya.make @@ -1,12 +1,12 @@ UNITTEST_FOR(library/cpp/streams/zstd) - -OWNER( - bulatman - g:util -) - -SRCS( - zstd_ut.cpp -) - -END() + +OWNER( + bulatman + g:util +) + +SRCS( + zstd_ut.cpp +) + +END() diff --git a/library/cpp/streams/zstd/ya.make b/library/cpp/streams/zstd/ya.make index 3de497d626..c284deeeff 100644 --- a/library/cpp/streams/zstd/ya.make +++ b/library/cpp/streams/zstd/ya.make @@ -1,16 +1,16 @@ -LIBRARY() - -OWNER( - bulatman - g:util -) - -PEERDIR( - contrib/libs/zstd -) - -SRCS( - zstd.cpp -) - -END() +LIBRARY() + +OWNER( + bulatman + g:util +) + +PEERDIR( + contrib/libs/zstd +) + +SRCS( + zstd.cpp +) + +END() diff --git a/library/cpp/streams/zstd/zstd.cpp b/library/cpp/streams/zstd/zstd.cpp index 8f7cec7569..29816f6d4c 100644 --- a/library/cpp/streams/zstd/zstd.cpp +++ b/library/cpp/streams/zstd/zstd.cpp @@ -1,173 +1,173 @@ -#include "zstd.h" - -#include <util/generic/buffer.h> -#include <util/generic/yexception.h> - -#define ZSTD_STATIC_LINKING_ONLY +#include "zstd.h" + +#include <util/generic/buffer.h> +#include <util/generic/yexception.h> + +#define ZSTD_STATIC_LINKING_ONLY #include <contrib/libs/zstd/include/zstd.h> - -namespace { - inline void CheckError(const char* op, size_t code) { - if (::ZSTD_isError(code)) { + +namespace { + inline void CheckError(const char* op, size_t code) { + if (::ZSTD_isError(code)) { ythrow yexception() << op << TStringBuf(" zstd error: ") << ::ZSTD_getErrorName(code); - } - } - - struct DestroyZCStream { - static void Destroy(::ZSTD_CStream* p) noexcept { - ::ZSTD_freeCStream(p); - } - }; - - struct DestroyZDStream { - static void Destroy(::ZSTD_DStream* p) noexcept { - ::ZSTD_freeDStream(p); - } - }; -} - -class TZstdCompress::TImpl { -public: - TImpl(IOutputStream* slave, int quality) - : Slave_(slave) - , ZCtx_(::ZSTD_createCStream()) - , Buffer_(::ZSTD_CStreamOutSize()) // do reserve - { - Y_ENSURE(nullptr != ZCtx_.Get(), "Failed to allocate ZSTD_CStream"); - Y_ENSURE(0 != Buffer_.Capacity(), "ZSTD_CStreamOutSize was too small"); - CheckError("init", ZSTD_initCStream(ZCtx_.Get(), quality)); - } - - void Write(const void* buffer, size_t size) { - ::ZSTD_inBuffer zIn{buffer, size, 0}; - auto zOut = OutBuffer(); - - while (0 != zIn.size) { - CheckError("compress", ::ZSTD_compressStream(ZCtx_.Get(), &zOut, &zIn)); - DoWrite(zOut); - // forget about the data we already compressed - zIn.src = static_cast<const unsigned char*>(zIn.src) + zIn.pos; - zIn.size -= zIn.pos; - zIn.pos = 0; - } - } - - void Flush() { - auto zOut = OutBuffer(); - CheckError("flush", ::ZSTD_flushStream(ZCtx_.Get(), &zOut)); - DoWrite(zOut); - } - - void Finish() { - auto zOut = OutBuffer(); - size_t returnCode; - do { - returnCode = ::ZSTD_endStream(ZCtx_.Get(), &zOut); - CheckError("finish", returnCode); - DoWrite(zOut); - } while (0 != returnCode); // zero means there is no more bytes to flush - } - -private: - ::ZSTD_outBuffer OutBuffer() { - return {Buffer_.Data(), Buffer_.Capacity(), 0}; - } - - void DoWrite(::ZSTD_outBuffer& buffer) { - Slave_->Write(buffer.dst, buffer.pos); - buffer.pos = 0; - } -private: - IOutputStream* Slave_; - THolder<::ZSTD_CStream, DestroyZCStream> ZCtx_; - TBuffer Buffer_; -}; - -TZstdCompress::TZstdCompress(IOutputStream* slave, int quality) - : Impl_(new TImpl(slave, quality)) { -} - -TZstdCompress::~TZstdCompress() { - try { - Finish(); - } catch (...) { - } -} - -void TZstdCompress::DoWrite(const void* buffer, size_t size) { - Y_ENSURE(Impl_, "Cannot use stream after finish."); - Impl_->Write(buffer, size); -} - -void TZstdCompress::DoFlush() { - Y_ENSURE(Impl_, "Cannot use stream after finish."); - Impl_->Flush(); -} - -void TZstdCompress::DoFinish() { - // Finish should be idempotent - if (Impl_) { - auto impl = std::move(Impl_); - impl->Finish(); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -class TZstdDecompress::TImpl { -public: - TImpl(IInputStream* slave, size_t bufferSize) - : Slave_(slave) - , ZCtx_(::ZSTD_createDStream()) - , Buffer_(bufferSize) // do reserve - , Offset_(0) - { - Y_ENSURE(nullptr != ZCtx_.Get(), "Failed to allocate ZSTD_DStream"); - Y_ENSURE(0 != Buffer_.Capacity(), "Buffer size was too small"); - } - - size_t Read(void* buffer, size_t size) { - Y_ASSERT(size > 0); - - ::ZSTD_outBuffer zOut{buffer, size, 0}; - ::ZSTD_inBuffer zIn{Buffer_.Data(), Buffer_.Size(), Offset_}; - - size_t returnCode = 0; - while (zOut.pos != zOut.size) { - if (zIn.pos == zIn.size) { - zIn.size = Slave_->Read(Buffer_.Data(), Buffer_.Capacity()); - Buffer_.Resize(zIn.size); - zIn.pos = Offset_ = 0; - if (0 == zIn.size) { - // end of stream, need to check that there is no uncompleted blocks - Y_ENSURE(0 == returnCode, "Incomplete block"); - break; - } - } - returnCode = ::ZSTD_decompressStream(ZCtx_.Get(), &zOut, &zIn); - CheckError("decompress", returnCode); - if (0 == returnCode) { - // The frame is over, prepare to (maybe) start a new frame - ZSTD_initDStream(ZCtx_.Get()); - } - } - Offset_ = zIn.pos; - return zOut.pos; - } - -private: - IInputStream* Slave_; - THolder<::ZSTD_DStream, DestroyZDStream> ZCtx_; - TBuffer Buffer_; - size_t Offset_; -}; - -TZstdDecompress::TZstdDecompress(IInputStream* slave, size_t bufferSize) - : Impl_(new TImpl(slave, bufferSize)) { -} - -TZstdDecompress::~TZstdDecompress() = default; - -size_t TZstdDecompress::DoRead(void* buffer, size_t size) { - return Impl_->Read(buffer, size); -} + } + } + + struct DestroyZCStream { + static void Destroy(::ZSTD_CStream* p) noexcept { + ::ZSTD_freeCStream(p); + } + }; + + struct DestroyZDStream { + static void Destroy(::ZSTD_DStream* p) noexcept { + ::ZSTD_freeDStream(p); + } + }; +} + +class TZstdCompress::TImpl { +public: + TImpl(IOutputStream* slave, int quality) + : Slave_(slave) + , ZCtx_(::ZSTD_createCStream()) + , Buffer_(::ZSTD_CStreamOutSize()) // do reserve + { + Y_ENSURE(nullptr != ZCtx_.Get(), "Failed to allocate ZSTD_CStream"); + Y_ENSURE(0 != Buffer_.Capacity(), "ZSTD_CStreamOutSize was too small"); + CheckError("init", ZSTD_initCStream(ZCtx_.Get(), quality)); + } + + void Write(const void* buffer, size_t size) { + ::ZSTD_inBuffer zIn{buffer, size, 0}; + auto zOut = OutBuffer(); + + while (0 != zIn.size) { + CheckError("compress", ::ZSTD_compressStream(ZCtx_.Get(), &zOut, &zIn)); + DoWrite(zOut); + // forget about the data we already compressed + zIn.src = static_cast<const unsigned char*>(zIn.src) + zIn.pos; + zIn.size -= zIn.pos; + zIn.pos = 0; + } + } + + void Flush() { + auto zOut = OutBuffer(); + CheckError("flush", ::ZSTD_flushStream(ZCtx_.Get(), &zOut)); + DoWrite(zOut); + } + + void Finish() { + auto zOut = OutBuffer(); + size_t returnCode; + do { + returnCode = ::ZSTD_endStream(ZCtx_.Get(), &zOut); + CheckError("finish", returnCode); + DoWrite(zOut); + } while (0 != returnCode); // zero means there is no more bytes to flush + } + +private: + ::ZSTD_outBuffer OutBuffer() { + return {Buffer_.Data(), Buffer_.Capacity(), 0}; + } + + void DoWrite(::ZSTD_outBuffer& buffer) { + Slave_->Write(buffer.dst, buffer.pos); + buffer.pos = 0; + } +private: + IOutputStream* Slave_; + THolder<::ZSTD_CStream, DestroyZCStream> ZCtx_; + TBuffer Buffer_; +}; + +TZstdCompress::TZstdCompress(IOutputStream* slave, int quality) + : Impl_(new TImpl(slave, quality)) { +} + +TZstdCompress::~TZstdCompress() { + try { + Finish(); + } catch (...) { + } +} + +void TZstdCompress::DoWrite(const void* buffer, size_t size) { + Y_ENSURE(Impl_, "Cannot use stream after finish."); + Impl_->Write(buffer, size); +} + +void TZstdCompress::DoFlush() { + Y_ENSURE(Impl_, "Cannot use stream after finish."); + Impl_->Flush(); +} + +void TZstdCompress::DoFinish() { + // Finish should be idempotent + if (Impl_) { + auto impl = std::move(Impl_); + impl->Finish(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +class TZstdDecompress::TImpl { +public: + TImpl(IInputStream* slave, size_t bufferSize) + : Slave_(slave) + , ZCtx_(::ZSTD_createDStream()) + , Buffer_(bufferSize) // do reserve + , Offset_(0) + { + Y_ENSURE(nullptr != ZCtx_.Get(), "Failed to allocate ZSTD_DStream"); + Y_ENSURE(0 != Buffer_.Capacity(), "Buffer size was too small"); + } + + size_t Read(void* buffer, size_t size) { + Y_ASSERT(size > 0); + + ::ZSTD_outBuffer zOut{buffer, size, 0}; + ::ZSTD_inBuffer zIn{Buffer_.Data(), Buffer_.Size(), Offset_}; + + size_t returnCode = 0; + while (zOut.pos != zOut.size) { + if (zIn.pos == zIn.size) { + zIn.size = Slave_->Read(Buffer_.Data(), Buffer_.Capacity()); + Buffer_.Resize(zIn.size); + zIn.pos = Offset_ = 0; + if (0 == zIn.size) { + // end of stream, need to check that there is no uncompleted blocks + Y_ENSURE(0 == returnCode, "Incomplete block"); + break; + } + } + returnCode = ::ZSTD_decompressStream(ZCtx_.Get(), &zOut, &zIn); + CheckError("decompress", returnCode); + if (0 == returnCode) { + // The frame is over, prepare to (maybe) start a new frame + ZSTD_initDStream(ZCtx_.Get()); + } + } + Offset_ = zIn.pos; + return zOut.pos; + } + +private: + IInputStream* Slave_; + THolder<::ZSTD_DStream, DestroyZDStream> ZCtx_; + TBuffer Buffer_; + size_t Offset_; +}; + +TZstdDecompress::TZstdDecompress(IInputStream* slave, size_t bufferSize) + : Impl_(new TImpl(slave, bufferSize)) { +} + +TZstdDecompress::~TZstdDecompress() = default; + +size_t TZstdDecompress::DoRead(void* buffer, size_t size) { + return Impl_->Read(buffer, size); +} diff --git a/library/cpp/streams/zstd/zstd.h b/library/cpp/streams/zstd/zstd.h index 4f1583d067..667a0494b7 100644 --- a/library/cpp/streams/zstd/zstd.h +++ b/library/cpp/streams/zstd/zstd.h @@ -1,53 +1,53 @@ -#pragma once - -#include <util/generic/ptr.h> -#include <util/stream/input.h> -#include <util/stream/output.h> - -/** - * @addtogroup Streams_Archs - * @{ - */ - -// @brief Stream to compress into zstd archive -class TZstdCompress: public IOutputStream { -public: - /** - @param slave stream to write compressed data to - @param quality, higher quality - slower but better compression. +#pragma once + +#include <util/generic/ptr.h> +#include <util/stream/input.h> +#include <util/stream/output.h> + +/** + * @addtogroup Streams_Archs + * @{ + */ + +// @brief Stream to compress into zstd archive +class TZstdCompress: public IOutputStream { +public: + /** + @param slave stream to write compressed data to + @param quality, higher quality - slower but better compression. 0 is default compression (see constant ZSTD_CLEVEL_DEFAULT(3)) - max compression is ZSTD_MAX_CLEVEL (22) - */ + max compression is ZSTD_MAX_CLEVEL (22) + */ explicit TZstdCompress(IOutputStream* slave, int quality = 0); - ~TZstdCompress() override; -private: - void DoWrite(const void* buffer, size_t size) override; - void DoFlush() override; - void DoFinish() override; - -public: - class TImpl; - THolder<TImpl> Impl_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -// @brief Buffered stream to decompress from zstd archive -class TZstdDecompress: public IInputStream { -public: - /** - @param slave stream to read compressed data from - @param bufferSize approximate size of buffer compressed data is read in - */ - explicit TZstdDecompress(IInputStream* slave, size_t bufferSize = 8 * 1024); - ~TZstdDecompress() override; - -private: - size_t DoRead(void* buffer, size_t size) override; - -private: - class TImpl; - THolder<TImpl> Impl_; -}; - -/** @} */ + ~TZstdCompress() override; +private: + void DoWrite(const void* buffer, size_t size) override; + void DoFlush() override; + void DoFinish() override; + +public: + class TImpl; + THolder<TImpl> Impl_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// @brief Buffered stream to decompress from zstd archive +class TZstdDecompress: public IInputStream { +public: + /** + @param slave stream to read compressed data from + @param bufferSize approximate size of buffer compressed data is read in + */ + explicit TZstdDecompress(IInputStream* slave, size_t bufferSize = 8 * 1024); + ~TZstdDecompress() override; + +private: + size_t DoRead(void* buffer, size_t size) override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +/** @} */ diff --git a/library/cpp/streams/zstd/zstd_ut.cpp b/library/cpp/streams/zstd/zstd_ut.cpp index 01618193fa..ef479fdd97 100644 --- a/library/cpp/streams/zstd/zstd_ut.cpp +++ b/library/cpp/streams/zstd/zstd_ut.cpp @@ -1,94 +1,94 @@ -#include "zstd.h" - +#include "zstd.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/random/fast.h> -#include <util/stream/null.h> -#include <util/stream/str.h> - -Y_UNIT_TEST_SUITE(TZstdTestSuite) { - TString Compress(TString data, int quality = -1) { - TString compressed; - TStringOutput output(compressed); - TZstdCompress compressStream(&output, quality); - compressStream.Write(data.data(), data.size()); - compressStream.Finish(); - output.Finish(); - return compressed; - } - - TString Decompress(TString data) { - TStringInput input(data); - TZstdDecompress decompressStream(&input); - return decompressStream.ReadAll(); - } - - void TestCase(const TString& s) { - UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, -1))); - UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 0))); - UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 22))); - UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 11))); - UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 100500))); - } - - TString GenerateRandomString(size_t size) { - TReallyFastRng32 rng(42); - TString result; - result.reserve(size + sizeof(ui64)); - while (result.size() < size) { - ui64 value = rng.GenRand64(); - result += TStringBuf(reinterpret_cast<const char*>(&value), sizeof(value)); - } - result.resize(size); - return result; - } - - Y_UNIT_TEST(TestHelloWorld) { - TestCase("hello world"); - } - - Y_UNIT_TEST(TestSeveralStreamsWithSameQuality) { - auto s1 = GenerateRandomString(1 << 15); - auto s2 = GenerateRandomString(1 << 15); - auto c1 = Compress(s1); - auto c2 = Compress(s2); - UNIT_ASSERT_VALUES_EQUAL(s1 + s2, Decompress(c1 + c2)); - } - - Y_UNIT_TEST(TestSeveralStreamsWithDifferentQuality) { - auto s1 = GenerateRandomString(1 << 15); - auto s2 = GenerateRandomString(1 << 15); - auto c1 = Compress(s1, 1); - auto c2 = Compress(s2, 2); - UNIT_ASSERT_VALUES_EQUAL(s1 + s2, Decompress(c1 + c2)); - } - - Y_UNIT_TEST(TestIncompleteStream) { - TString manyAs(64 * 1024, 'a'); - auto compressed = Compress(manyAs); - TString truncated(compressed.data(), compressed.size() - 1); - UNIT_CHECK_GENERATED_EXCEPTION(Decompress(truncated), std::exception); - } - - Y_UNIT_TEST(Test64KB) { - auto manyAs = TString(64 * 1024, 'a'); - TString str("Hello from the Matrix!@#% How are you?}{\n\t\a"); - TestCase(manyAs + str + manyAs); - } - - Y_UNIT_TEST(Test1MB) { - TestCase(GenerateRandomString(1 * 1024 * 1024)); - } - - Y_UNIT_TEST(TestEmpty) { - TestCase(""); - } - - Y_UNIT_TEST(TestWriteAfterFinish) { - TNullOutput output; - TZstdCompress compressStream(&output); - compressStream.Finish(); - UNIT_ASSERT_EXCEPTION_CONTAINS(compressStream.Write("a", 1), std::exception, "Cannot use stream after finish."); - UNIT_ASSERT_EXCEPTION_CONTAINS(compressStream.Flush(), std::exception, "Cannot use stream after finish."); - } -} + +#include <util/random/fast.h> +#include <util/stream/null.h> +#include <util/stream/str.h> + +Y_UNIT_TEST_SUITE(TZstdTestSuite) { + TString Compress(TString data, int quality = -1) { + TString compressed; + TStringOutput output(compressed); + TZstdCompress compressStream(&output, quality); + compressStream.Write(data.data(), data.size()); + compressStream.Finish(); + output.Finish(); + return compressed; + } + + TString Decompress(TString data) { + TStringInput input(data); + TZstdDecompress decompressStream(&input); + return decompressStream.ReadAll(); + } + + void TestCase(const TString& s) { + UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, -1))); + UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 0))); + UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 22))); + UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 11))); + UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s, 100500))); + } + + TString GenerateRandomString(size_t size) { + TReallyFastRng32 rng(42); + TString result; + result.reserve(size + sizeof(ui64)); + while (result.size() < size) { + ui64 value = rng.GenRand64(); + result += TStringBuf(reinterpret_cast<const char*>(&value), sizeof(value)); + } + result.resize(size); + return result; + } + + Y_UNIT_TEST(TestHelloWorld) { + TestCase("hello world"); + } + + Y_UNIT_TEST(TestSeveralStreamsWithSameQuality) { + auto s1 = GenerateRandomString(1 << 15); + auto s2 = GenerateRandomString(1 << 15); + auto c1 = Compress(s1); + auto c2 = Compress(s2); + UNIT_ASSERT_VALUES_EQUAL(s1 + s2, Decompress(c1 + c2)); + } + + Y_UNIT_TEST(TestSeveralStreamsWithDifferentQuality) { + auto s1 = GenerateRandomString(1 << 15); + auto s2 = GenerateRandomString(1 << 15); + auto c1 = Compress(s1, 1); + auto c2 = Compress(s2, 2); + UNIT_ASSERT_VALUES_EQUAL(s1 + s2, Decompress(c1 + c2)); + } + + Y_UNIT_TEST(TestIncompleteStream) { + TString manyAs(64 * 1024, 'a'); + auto compressed = Compress(manyAs); + TString truncated(compressed.data(), compressed.size() - 1); + UNIT_CHECK_GENERATED_EXCEPTION(Decompress(truncated), std::exception); + } + + Y_UNIT_TEST(Test64KB) { + auto manyAs = TString(64 * 1024, 'a'); + TString str("Hello from the Matrix!@#% How are you?}{\n\t\a"); + TestCase(manyAs + str + manyAs); + } + + Y_UNIT_TEST(Test1MB) { + TestCase(GenerateRandomString(1 * 1024 * 1024)); + } + + Y_UNIT_TEST(TestEmpty) { + TestCase(""); + } + + Y_UNIT_TEST(TestWriteAfterFinish) { + TNullOutput output; + TZstdCompress compressStream(&output); + compressStream.Finish(); + UNIT_ASSERT_EXCEPTION_CONTAINS(compressStream.Write("a", 1), std::exception, "Cannot use stream after finish."); + UNIT_ASSERT_EXCEPTION_CONTAINS(compressStream.Flush(), std::exception, "Cannot use stream after finish."); + } +} |