aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/streams/brotli
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/streams/brotli
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/streams/brotli')
-rw-r--r--library/cpp/streams/brotli/brotli.cpp231
-rw-r--r--library/cpp/streams/brotli/brotli.h52
-rw-r--r--library/cpp/streams/brotli/brotli_ut.cpp89
-rw-r--r--library/cpp/streams/brotli/ut/ya.make12
-rw-r--r--library/cpp/streams/brotli/ya.make17
5 files changed, 401 insertions, 0 deletions
diff --git a/library/cpp/streams/brotli/brotli.cpp b/library/cpp/streams/brotli/brotli.cpp
new file mode 100644
index 00000000000..38052cb6882
--- /dev/null
+++ b/library/cpp/streams/brotli/brotli.cpp
@@ -0,0 +1,231 @@
+#include "brotli.h"
+
+#include <contrib/libs/brotli/include/brotli/decode.h>
+#include <contrib/libs/brotli/include/brotli/encode.h>
+
+#include <util/generic/yexception.h>
+#include <util/memory/addstorage.h>
+
+namespace {
+ struct TAllocator {
+ static void* Allocate(void* /* opaque */, size_t size) {
+ return ::operator new(size);
+ }
+
+ static void Deallocate(void* /* opaque */, void* ptr) noexcept {
+ ::operator delete(ptr);
+ }
+ };
+
+}
+
+class TBrotliCompress::TImpl {
+public:
+ TImpl(IOutputStream* slave, int quality)
+ : Slave_(slave)
+ , EncoderState_(BrotliEncoderCreateInstance(&TAllocator::Allocate, &TAllocator::Deallocate, nullptr))
+ {
+ if (!EncoderState_) {
+ ythrow yexception() << "Brotli encoder initialization failed";
+ }
+
+ auto res = BrotliEncoderSetParameter(
+ EncoderState_,
+ BROTLI_PARAM_QUALITY,
+ quality);
+
+ if (!res) {
+ BrotliEncoderDestroyInstance(EncoderState_);
+ ythrow yexception() << "Failed to set brotli encoder quality to " << quality;
+ }
+ }
+
+ ~TImpl() {
+ BrotliEncoderDestroyInstance(EncoderState_);
+ }
+
+ void Write(const void* buffer, size_t size) {
+ DoWrite(buffer, size, BROTLI_OPERATION_PROCESS);
+ }
+
+ void Flush() {
+ DoWrite(nullptr, 0, BROTLI_OPERATION_FLUSH);
+ }
+
+ void Finish() {
+ Flush();
+ DoWrite(nullptr, 0, BROTLI_OPERATION_FINISH);
+ Y_VERIFY(BrotliEncoderIsFinished(EncoderState_));
+ }
+
+private:
+ IOutputStream* Slave_;
+ BrotliEncoderState* EncoderState_;
+
+ void DoWrite(const void* buffer, size_t size, BrotliEncoderOperation operation) {
+ size_t availableOut = 0;
+ ui8* outputBuffer = nullptr;
+
+ const ui8* uBuffer = static_cast<const ui8*>(buffer);
+
+ do {
+ auto result = BrotliEncoderCompressStream(
+ EncoderState_,
+ operation,
+ &size,
+ &uBuffer,
+ &availableOut,
+ &outputBuffer,
+ nullptr);
+
+ if (result == BROTLI_FALSE) {
+ ythrow yexception() << "Brotli encoder failed to process buffer";
+ }
+
+ size_t outputLength = 0;
+ const ui8* output = BrotliEncoderTakeOutput(EncoderState_, &outputLength);
+ if (outputLength > 0) {
+ Slave_->Write(output, outputLength);
+ }
+ } while (size > 0 || BrotliEncoderHasMoreOutput(EncoderState_));
+ }
+};
+
+TBrotliCompress::TBrotliCompress(IOutputStream* slave, int quality) {
+ Impl_.Reset(new TImpl(slave, quality));
+}
+
+TBrotliCompress::~TBrotliCompress() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TBrotliCompress::DoWrite(const void* buffer, size_t size) {
+ Impl_->Write(buffer, size);
+}
+
+void TBrotliCompress::DoFlush() {
+ if (Impl_) {
+ Impl_->Flush();
+ }
+}
+
+void TBrotliCompress::DoFinish() {
+ THolder<TImpl> impl(Impl_.Release());
+
+ if (impl) {
+ impl->Finish();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TBrotliDecompress::TImpl: public TAdditionalStorage<TImpl> {
+public:
+ TImpl(IInputStream* slave)
+ : Slave_(slave)
+ {
+ InitDecoder();
+ }
+
+ ~TImpl() {
+ FreeDecoder();
+ }
+
+ size_t Read(void* buffer, size_t size) {
+ Y_ASSERT(size > 0);
+
+ ui8* outBuffer = static_cast<ui8*>(buffer);
+ size_t availableOut = size;
+ size_t decompressedSize = 0;
+
+ BrotliDecoderResult result;
+ do {
+ if (InputAvailable_ == 0 && !InputExhausted_) {
+ InputBuffer_ = TmpBuf();
+ InputAvailable_ = Slave_->Read((void*)InputBuffer_, TmpBufLen());
+ if (InputAvailable_ == 0) {
+ InputExhausted_ = true;
+ }
+ }
+
+ if (SubstreamFinished_ && !InputExhausted_) {
+ ResetState();
+ }
+
+ result = BrotliDecoderDecompressStream(
+ DecoderState_,
+ &InputAvailable_,
+ &InputBuffer_,
+ &availableOut,
+ &outBuffer,
+ nullptr);
+
+ decompressedSize = size - availableOut;
+ SubstreamFinished_ = (result == BROTLI_DECODER_RESULT_SUCCESS);
+
+ if (result == BROTLI_DECODER_RESULT_ERROR) {
+ BrotliDecoderErrorCode code = BrotliDecoderGetErrorCode(DecoderState_);
+ ythrow yexception() << "Brotli decoder failed to decompress buffer: "
+ << BrotliDecoderErrorString(code);
+ } else if (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT) {
+ Y_VERIFY(availableOut != size,
+ "Buffer passed to read in Brotli decoder is too small");
+ break;
+ }
+ } while (decompressedSize == 0 && result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && !InputExhausted_);
+
+ if (!SubstreamFinished_ && decompressedSize == 0) {
+ ythrow yexception() << "Input stream is incomplete";
+ }
+
+ return decompressedSize;
+ }
+
+private:
+ IInputStream* Slave_;
+ BrotliDecoderState* DecoderState_;
+
+ bool SubstreamFinished_ = false;
+ bool InputExhausted_ = false;
+ const ui8* InputBuffer_ = nullptr;
+ size_t InputAvailable_ = 0;
+
+ unsigned char* TmpBuf() noexcept {
+ return static_cast<unsigned char*>(AdditionalData());
+ }
+
+ size_t TmpBufLen() const noexcept {
+ return AdditionalDataLength();
+ }
+
+ void InitDecoder() {
+ DecoderState_ = BrotliDecoderCreateInstance(&TAllocator::Allocate, &TAllocator::Deallocate, nullptr);
+ if (!DecoderState_) {
+ ythrow yexception() << "Brotli decoder initialization failed";
+ }
+ }
+
+ void FreeDecoder() {
+ BrotliDecoderDestroyInstance(DecoderState_);
+ }
+
+ void ResetState() {
+ Y_VERIFY(BrotliDecoderIsFinished(DecoderState_));
+ FreeDecoder();
+ InitDecoder();
+ }
+};
+
+TBrotliDecompress::TBrotliDecompress(IInputStream* slave, size_t bufferSize)
+ : Impl_(new (bufferSize) TImpl(slave))
+{
+}
+
+TBrotliDecompress::~TBrotliDecompress() = default;
+
+size_t TBrotliDecompress::DoRead(void* buffer, size_t size) {
+ return Impl_->Read(buffer, size);
+}
diff --git a/library/cpp/streams/brotli/brotli.h b/library/cpp/streams/brotli/brotli.h
new file mode 100644
index 00000000000..b3af869e29c
--- /dev/null
+++ b/library/cpp/streams/brotli/brotli.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+
+/**
+ * @addtogroup Streams_Archs
+ * @{
+ */
+
+class TBrotliCompress: public IOutputStream {
+public:
+ static constexpr int BEST_QUALITY = 11;
+
+ /**
+ @param slave stream to write compressed data to
+ @param quality the higher the quality, the slower and better the compression. Range is 0 to 11.
+ */
+ explicit TBrotliCompress(IOutputStream* slave, int quality = BEST_QUALITY);
+ ~TBrotliCompress() override;
+
+private:
+ void DoWrite(const void* buffer, size_t size) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+public:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TBrotliDecompress: public IInputStream {
+public:
+ /**
+ @param slave stream to read compressed data from
+ @param bufferSize approximate size of buffer compressed data is read in
+ */
+ explicit TBrotliDecompress(IInputStream* slave, size_t bufferSize = 8 * 1024);
+ ~TBrotliDecompress() override;
+
+private:
+ size_t DoRead(void* buffer, size_t size) override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/** @} */
diff --git a/library/cpp/streams/brotli/brotli_ut.cpp b/library/cpp/streams/brotli/brotli_ut.cpp
new file mode 100644
index 00000000000..aeb2e284dc3
--- /dev/null
+++ b/library/cpp/streams/brotli/brotli_ut.cpp
@@ -0,0 +1,89 @@
+#include "brotli.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/random/fast.h>
+
+Y_UNIT_TEST_SUITE(TBrotliTestSuite) {
+ TString Compress(TString data) {
+ TString compressed;
+ TStringOutput output(compressed);
+ TBrotliCompress compressStream(&output, 11);
+ compressStream.Write(data.data(), data.size());
+ compressStream.Finish();
+ output.Finish();
+ return compressed;
+ }
+
+ TString Decompress(TString data) {
+ TStringInput input(data);
+ TBrotliDecompress decompressStream(&input);
+ return decompressStream.ReadAll();
+ }
+
+ void TestCase(const TString& s) {
+ UNIT_ASSERT_VALUES_EQUAL(s, Decompress(Compress(s)));
+ }
+
+ TString GenerateRandomString(size_t size) {
+ TReallyFastRng32 rng(42);
+ TString result;
+ result.reserve(size + sizeof(ui64));
+ while (result.size() < size) {
+ ui64 value = rng.GenRand64();
+ result += TStringBuf(reinterpret_cast<const char*>(&value), sizeof(value));
+ }
+ result.resize(size);
+ return result;
+ }
+
+ Y_UNIT_TEST(TestHelloWorld) {
+ TestCase("hello world");
+ }
+
+ Y_UNIT_TEST(TestFlush) {
+ TStringStream ss;
+ TBrotliCompress compressStream(&ss);
+ TBrotliDecompress decompressStream(&ss);
+
+ for (size_t i = 0; i < 3; ++i) {
+ TString s = GenerateRandomString(1 << 15);
+ compressStream.Write(s.data(), s.size());
+ compressStream.Flush();
+
+ TString r(s.size(), '*');
+ decompressStream.Load((char*)r.data(), r.size());
+
+ UNIT_ASSERT_VALUES_EQUAL(s, r);
+ }
+ }
+
+ Y_UNIT_TEST(TestSeveralStreams) {
+ auto s1 = GenerateRandomString(1 << 15);
+ auto s2 = GenerateRandomString(1 << 15);
+ auto c1 = Compress(s1);
+ auto c2 = Compress(s2);
+ UNIT_ASSERT_VALUES_EQUAL(s1 + s2, Decompress(c1 + c2));
+ }
+
+ Y_UNIT_TEST(TestIncompleteStream) {
+ TString manyAs(64 * 1024, 'a');
+ auto compressed = Compress(manyAs);
+ TString truncated(compressed.data(), compressed.size() - 1);
+ UNIT_CHECK_GENERATED_EXCEPTION(Decompress(truncated), std::exception);
+ }
+
+ Y_UNIT_TEST(Test64KB) {
+ auto manyAs = TString(64 * 1024, 'a');
+ TString str("Hello from the Matrix!@#% How are you?}{\n\t\a");
+ TestCase(manyAs + str + manyAs);
+ }
+
+ Y_UNIT_TEST(Test1MB) {
+ TestCase(GenerateRandomString(1 * 1024 * 1024));
+ }
+
+ Y_UNIT_TEST(TestEmpty) {
+ TestCase("");
+ }
+}
diff --git a/library/cpp/streams/brotli/ut/ya.make b/library/cpp/streams/brotli/ut/ya.make
new file mode 100644
index 00000000000..243462f1b2b
--- /dev/null
+++ b/library/cpp/streams/brotli/ut/ya.make
@@ -0,0 +1,12 @@
+UNITTEST_FOR(library/cpp/streams/brotli)
+
+OWNER(
+ levysotsky
+ g:util
+)
+
+SRCS(
+ brotli_ut.cpp
+)
+
+END()
diff --git a/library/cpp/streams/brotli/ya.make b/library/cpp/streams/brotli/ya.make
new file mode 100644
index 00000000000..fa2bfec9cc5
--- /dev/null
+++ b/library/cpp/streams/brotli/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+OWNER(
+ levysotsky
+ g:util
+)
+
+PEERDIR(
+ contrib/libs/brotli/enc
+ contrib/libs/brotli/dec
+)
+
+SRCS(
+ brotli.cpp
+)
+
+END()