diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/streams/lzma | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/streams/lzma')
-rw-r--r-- | library/cpp/streams/lzma/lzma.cpp | 520 | ||||
-rw-r--r-- | library/cpp/streams/lzma/lzma.h | 37 | ||||
-rw-r--r-- | library/cpp/streams/lzma/lzma_ut.cpp | 127 | ||||
-rw-r--r-- | library/cpp/streams/lzma/ut/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/streams/lzma/ya.make | 16 |
5 files changed, 712 insertions, 0 deletions
diff --git a/library/cpp/streams/lzma/lzma.cpp b/library/cpp/streams/lzma/lzma.cpp new file mode 100644 index 00000000000..f1942fa546e --- /dev/null +++ b/library/cpp/streams/lzma/lzma.cpp @@ -0,0 +1,520 @@ +#include "lzma.h" + +#include <util/stream/mem.h> +#include <util/system/context.h> +#include <util/generic/cast.h> +#include <util/memory/addstorage.h> +#include <util/generic/ptr.h> +#include <util/generic/intrlist.h> +#include <util/generic/scope.h> + +extern "C" { +#include <contrib/libs/lzmasdk/LzmaEnc.h> +#include <contrib/libs/lzmasdk/LzmaDec.h> +} + +namespace { + class TMemoryGc { + class TAllocation: public TIntrusiveListItem<TAllocation>, public TAdditionalStorage<TAllocation> { + }; + + public: + inline void* Allocate(size_t len) { + if (len > 1024 * 1024 * 1024) { + return nullptr; + } + + TAllocation* ret = new (len) TAllocation; + + Allocs_.PushBack(ret); + + return ret->AdditionalData(); + } + + inline void Deallocate(void* ptr) noexcept { + if (ptr) { + delete TAllocation::ObjectFromData(ptr); + } + } + + private: + TIntrusiveListWithAutoDelete<TAllocation, TDelete> Allocs_; + }; + + template <class T> + class TInverseFilter { + class TTrampoLine: public ITrampoLine { + public: + inline TTrampoLine(TInverseFilter* parent) + : Parent_(parent) + { + } + + void DoRun() override { + Parent_->RunFilter(); + } + + private: + TInverseFilter* Parent_; + }; + + class TInput: public IInputStream { + public: + inline TInput(TInverseFilter* parent) + : Parent_(parent) + { + } + + ~TInput() override { + } + + size_t DoRead(void* ptr, size_t len) override { + return Parent_->ReadImpl(ptr, len); + } + + private: + TInverseFilter* Parent_; + }; + + class TOutput: public IOutputStream { + public: + inline TOutput(TInverseFilter* parent) + : Parent_(parent) + { + } + + ~TOutput() override { + } + + void DoWrite(const void* ptr, size_t len) override { + Parent_->WriteImpl(ptr, len); + } + + private: + TInverseFilter* Parent_; + }; + + public: + inline TInverseFilter(IOutputStream* slave, T* filter) + : Slave_(slave) + , Filter_(filter) + , TrampoLine_(this) + , FilterCtx_(FilterClosure()) + , Finished_(false) + , In_(nullptr, 0) + { + } + + virtual ~TInverseFilter() { + if (!UncaughtException()) { + try { + Finish(); + } catch (...) { + } + } else { + //rely on gc + } + } + + inline void Write(const void* ptr, size_t len) { + In_.Reset(ptr, len); + + Y_DEFER { + In_.Reset(0, 0); + }; + + while (In_.Avail()) { + SwitchTo(); + } + } + + inline void Finish() { + if (!Finished_) { + Finished_ = true; + SwitchTo(); + } + } + + private: + inline void RunFilter() { + try { + TInput in(this); + TOutput out(this); + + (*Filter_)(&in, &out); + } catch (...) { + Err_ = std::current_exception(); + } + + SwitchFrom(); + } + + inline TContClosure FilterClosure() { + return {&TrampoLine_, TArrayRef(Stack_, sizeof(Stack_))}; + } + + inline size_t ReadImpl(void* ptr, size_t len) { + while (!Finished_) { + const size_t ret = In_.Read(ptr, len); + + if (ret) { + return ret; + } + + SwitchFrom(); + } + + return 0; + } + + inline void WriteImpl(const void* ptr, size_t len) { + Y_ASSERT(!Out_.Avail()); + + Out_.Reset(ptr, len); + + while (Out_.Avail()) { + SwitchFrom(); + } + } + + inline bool FlushImpl() { + if (Out_.Avail()) { + TransferData(&Out_, Slave_); + Out_.Reset(nullptr, 0); + + return true; + } + + return false; + } + + inline void SwitchTo() { + do { + CurrentCtx_.SwitchTo(&FilterCtx_); + + if (Err_) { + Finished_ = true; + + std::rethrow_exception(Err_); + } + } while (FlushImpl()); + } + + inline void SwitchFrom() { + FilterCtx_.SwitchTo(&CurrentCtx_); + } + + private: + IOutputStream* Slave_; + T* Filter_; + TTrampoLine TrampoLine_; + char Stack_[16 * 1024]; + TContMachineContext FilterCtx_; + TContMachineContext CurrentCtx_; + bool Finished_; + TMemoryInput In_; + TMemoryInput Out_; + std::exception_ptr Err_; + }; + + class TLzma { + public: + class TLzmaInput: public ISeqInStream { + public: + inline TLzmaInput(IInputStream* slave) + : Slave_(slave) + { + Read = ReadFunc; + } + + private: + static inline SRes ReadFunc(const ISeqInStream* p, void* ptr, size_t* len) { + *len = const_cast<TLzmaInput*>(static_cast<const TLzmaInput*>(p))->Slave_->Read(ptr, *len); + + return SZ_OK; + } + + private: + IInputStream* Slave_; + }; + + class TLzmaOutput: public ISeqOutStream { + public: + inline TLzmaOutput(IOutputStream* slave) + : Slave_(slave) + { + Write = WriteFunc; + } + + private: + static inline size_t WriteFunc(const ISeqOutStream* p, const void* ptr, size_t len) { + const_cast<TLzmaOutput*>(static_cast<const TLzmaOutput*>(p))->Slave_->Write(ptr, len); + + return len; + } + + private: + IOutputStream* Slave_; + }; + + class TAlloc: public ISzAlloc { + public: + inline TAlloc() { + Alloc = AllocFunc; + Free = FreeFunc; + } + + private: + static void* AllocFunc(const ISzAlloc* t, size_t len) { + return static_cast<TAlloc*>(((ISzAlloc*)t))->Gc_.Allocate(len); + } + + static void FreeFunc(const ISzAlloc* t, void* p) { + static_cast<TAlloc*>(((ISzAlloc*)t))->Gc_.Deallocate(p); + } + + private: + TMemoryGc Gc_; + }; + + inline ISzAlloc* Alloc() noexcept { + return &Alloc_; + } + + static inline void Check(SRes r) { + if (r != SZ_OK) { + ythrow yexception() << "lzma error(" << r << ")"; + } + } + + private: + TAlloc Alloc_; + }; + + class TLzmaCompressBase: public TLzma { + public: + inline TLzmaCompressBase(size_t level) + : H_(LzmaEnc_Create(Alloc())) + { + if (!H_) { + ythrow yexception() << "can not init lzma engine"; + } + + LzmaEncProps_Init(&Props_); + + Props_.level = level; + Props_.dictSize = 0; + Props_.lc = -1; + Props_.lp = -1; + Props_.pb = -1; + Props_.fb = -1; + Props_.numThreads = -1; + Props_.writeEndMark = 1; + + Check(LzmaEnc_SetProps(H_, &Props_)); + size_t bufLen = sizeof(PropsBuf_); + Zero(PropsBuf_); + Check(LzmaEnc_WriteProperties(H_, PropsBuf_, &bufLen)); + } + + inline ~TLzmaCompressBase() { + LzmaEnc_Destroy(H_, Alloc(), Alloc()); + } + + inline void operator()(IInputStream* in, IOutputStream* out) { + TLzmaInput input(in); + TLzmaOutput output(out); + + out->Write(PropsBuf_, sizeof(PropsBuf_)); + + Check(LzmaEnc_Encode(H_, &output, &input, nullptr, Alloc(), Alloc())); + } + + private: + CLzmaEncHandle H_; + CLzmaEncProps Props_; + Byte PropsBuf_[LZMA_PROPS_SIZE]; + }; +} + +class TLzmaCompress::TImpl: public TLzmaCompressBase, public TInverseFilter<TLzmaCompressBase> { +public: + inline TImpl(IOutputStream* slave, size_t level) + : TLzmaCompressBase(level) + , TInverseFilter<TLzmaCompressBase>(slave, this) + { + } +}; + +class TLzmaDecompress::TImpl: public TLzma { +public: + inline TImpl() + : InBegin_(nullptr) + , InEnd_(nullptr) + { + LzmaDec_Construct(&H_); + } + inline virtual ~TImpl() { + LzmaDec_Free(&H_, Alloc()); + } + + inline size_t Read(void* ptr, size_t len) { + Byte* pos = (Byte*)ptr; + Byte* end = pos + len; + + retry: + size_t availLen = InEnd_ - InBegin_; + size_t bufLen = end - pos; + ELzmaStatus status; + + Check(LzmaDec_DecodeToBuf(&H_, pos, &bufLen, (Byte*)InBegin_, &availLen, LZMA_FINISH_ANY, &status)); + + InBegin_ += availLen; + pos += bufLen; + + if (status == LZMA_STATUS_NEEDS_MORE_INPUT) { + Y_ASSERT(InEnd_ == InBegin_); + if (!Fill()) { + ythrow yexception() << "incomplete lzma stream"; + } + + goto retry; + } + + return pos - (Byte*)ptr; + } + +private: + virtual bool Fill() = 0; + +protected: + CLzmaDec H_; + char* InBegin_; + char* InEnd_; +}; + +class TLzmaDecompress::TImplStream: public TImpl { +public: + inline TImplStream(IInputStream* slave) + : Slave_(slave) + { + Byte buf[LZMA_PROPS_SIZE]; + + if (Slave_->Load(buf, sizeof(buf)) != sizeof(buf)) + ythrow yexception() << "can't read lzma header"; + + Check(LzmaDec_Allocate(&H_, buf, sizeof(buf), Alloc())); + LzmaDec_Init(&H_); + } + +private: + bool Fill() override { + size_t size = Slave_->Read(In_, sizeof(In_)); + InBegin_ = In_; + InEnd_ = In_ + size; + + return size; + } + +private: + IInputStream* Slave_; + char In_[4096]; +}; + +class TLzmaDecompress::TImplZeroCopy: public TLzmaDecompress::TImpl { +public: + inline TImplZeroCopy(IZeroCopyInput* in) + : Input_(in) + { + if (!Fill()) + ythrow yexception() << "can't read lzma header"; + + char buf[LZMA_PROPS_SIZE]; + char* header; + if (InEnd_ - InBegin_ >= LZMA_PROPS_SIZE) { + header = InBegin_; + InBegin_ += LZMA_PROPS_SIZE; + } else { + //bad luck, first part is less than header + //try to copy header part by part to the local buffer + const char* end = buf + sizeof(buf); + char* pos = buf; + while (1) { + size_t left = end - pos; + size_t avail = InEnd_ - InBegin_; + if (left < avail) { + memcpy(pos, InBegin_, left); + InBegin_ += left; + break; + } else { + memcpy(pos, InBegin_, avail); + pos += avail; + if (!Fill()) { + ythrow yexception() << "can't read lzma header"; + } + } + } + header = buf; + } + + Check(LzmaDec_Allocate(&H_, (Byte*)header, LZMA_PROPS_SIZE, Alloc())); + + LzmaDec_Init(&H_); + } + +private: + bool Fill() override { + size_t size = Input_->Next(&InBegin_); + + if (size) { + InEnd_ = InBegin_ + size; + + return true; + } + + return false; + } + + IZeroCopyInput* Input_; +}; + +TLzmaCompress::TLzmaCompress(IOutputStream* slave, size_t level) + : Impl_(new TImpl(slave, level)) +{ +} + +TLzmaCompress::~TLzmaCompress() { +} + +void TLzmaCompress::DoWrite(const void* buf, size_t len) { + if (!Impl_) { + ythrow yexception() << "can not write to finished lzma stream"; + } + + Impl_->Write(buf, len); +} + +void TLzmaCompress::DoFinish() { + THolder<TImpl> impl(Impl_.Release()); + + if (impl) { + impl->Finish(); + } +} + +TLzmaDecompress::TLzmaDecompress(IInputStream* slave) + : Impl_(new TImplStream(slave)) +{ +} + +TLzmaDecompress::TLzmaDecompress(IZeroCopyInput* input) + : Impl_(new TImplZeroCopy(input)) +{ +} + +TLzmaDecompress::~TLzmaDecompress() { +} + +size_t TLzmaDecompress::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} diff --git a/library/cpp/streams/lzma/lzma.h b/library/cpp/streams/lzma/lzma.h new file mode 100644 index 00000000000..ca1e06e9ef4 --- /dev/null +++ b/library/cpp/streams/lzma/lzma.h @@ -0,0 +1,37 @@ +#pragma once + +#include <util/stream/input.h> +#include <util/stream/output.h> +#include <util/stream/zerocopy.h> + +#include <util/generic/ptr.h> + +class TLzmaCompress: public IOutputStream { +public: + TLzmaCompress(IOutputStream* slave, size_t level = 7); + ~TLzmaCompress() override; + +private: + void DoWrite(const void* buf, size_t len) override; + void DoFinish() override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +class TLzmaDecompress: public IInputStream { +public: + TLzmaDecompress(IInputStream* slave); + TLzmaDecompress(IZeroCopyInput* input); + ~TLzmaDecompress() override; + +private: + size_t DoRead(void* buf, size_t len) override; + +private: + class TImpl; + class TImplStream; + class TImplZeroCopy; + THolder<TImpl> Impl_; +}; diff --git a/library/cpp/streams/lzma/lzma_ut.cpp b/library/cpp/streams/lzma/lzma_ut.cpp new file mode 100644 index 00000000000..847e98d1caa --- /dev/null +++ b/library/cpp/streams/lzma/lzma_ut.cpp @@ -0,0 +1,127 @@ +#include "lzma.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/stream/mem.h> +#include <util/random/fast.h> +#include <util/random/random.h> + +class TStrokaByOneByte: public IZeroCopyInput { +public: + TStrokaByOneByte(const TString& s) + : Data(s) + , Pos(s.data()) + { + } + +private: + size_t DoNext(const void** ptr, size_t len) override { + if (Pos < Data.end()) { + len = Min(len, static_cast<size_t>(1)); + *ptr = Pos; + Pos += len; + return len; + } else { + return 0; + } + } + + TString Data; + const char* Pos; +}; + +class TLzmaTest: public TTestBase { + UNIT_TEST_SUITE(TLzmaTest); + UNIT_TEST(Test1) + UNIT_TEST(Test2) + UNIT_TEST_SUITE_END(); + +private: + inline TString GenData() { + TString data; + TReallyFastRng32 rnd(RandomNumber<ui64>()); + + for (size_t i = 0; i < 50000; ++i) { + const char ch = rnd.Uniform(256); + const size_t len = 1 + rnd.Uniform(10); + + data += TString(len, ch); + } + + return data; + } + + inline void Test2() { + class TExcOutput: public IOutputStream { + public: + ~TExcOutput() override { + } + + void DoWrite(const void*, size_t) override { + throw 12345; + } + }; + + TString data(GenData()); + TMemoryInput mi(data.data(), data.size()); + TExcOutput out; + + try { + TLzmaCompress c(&out); + + TransferData(&mi, &c); + } catch (int i) { + UNIT_ASSERT_EQUAL(i, 12345); + } + } + + inline void Test1() { + TString data(GenData()); + TString data1; + TString res; + + { + TMemoryInput mi(data.data(), data.size()); + TStringOutput so(res); + TLzmaCompress c(&so); + + TransferData(&mi, &c); + + c.Finish(); + } + + { + TMemoryInput mi(res.data(), res.size()); + TStringOutput so(data1); + TLzmaDecompress d((IInputStream*)&mi); + + TransferData(&d, &so); + } + + UNIT_ASSERT_EQUAL(data, data1); + + data1.clear(); + { + TMemoryInput mi(res.data(), res.size()); + TStringOutput so(data1); + TLzmaDecompress d(&mi); + + TransferData(&d, &so); + } + + UNIT_ASSERT_EQUAL(data, data1); + + data1.clear(); + { + TStrokaByOneByte mi(res); + TStringOutput so(data1); + TLzmaDecompress d(&mi); + + TransferData(&d, &so); + } + + UNIT_ASSERT_EQUAL(data, data1); + } +}; + +UNIT_TEST_SUITE_REGISTRATION(TLzmaTest); diff --git a/library/cpp/streams/lzma/ut/ya.make b/library/cpp/streams/lzma/ut/ya.make new file mode 100644 index 00000000000..01624f02590 --- /dev/null +++ b/library/cpp/streams/lzma/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(library/cpp/streams/lzma) + +OWNER( + pg + g:util +) + +SRCS( + lzma_ut.cpp +) + +END() diff --git a/library/cpp/streams/lzma/ya.make b/library/cpp/streams/lzma/ya.make new file mode 100644 index 00000000000..38c05145c43 --- /dev/null +++ b/library/cpp/streams/lzma/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +OWNER( + pg + g:util +) + +PEERDIR( + contrib/libs/lzmasdk +) + +SRCS( + lzma.cpp +) + +END() |