aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/streams/lz/lz.cpp
blob: b65bb3ed96503a1d9aeb75a6b4b7747a187b671c (plain) (tree)
1
2
3
4
5
6
7
8
9
               
                                
                                   
                                
                                 
                                   
                                    






                                         
                                                

             
                                                





                                                              
                                

                                                                                                                               
                                                                


                               
                         
                                                                                                                                           
 
          
                       
                            
 
          
                          

                                
                               
















                                                                           
                                     

                      
                                                      









                                  
                                         

                                      
                                            


                                                       
                                           


                               
                                                                                                                   
                                                                 
                                                             












                                                                       
                                           








                                                                 

        
                          


                          
                                            







                                                      
                            
       
                                                      
                                                                           
         
     
 



                                                                                                                                               
 
                                 

                              
                                                                   







                                                                       

                                         
                                                                                 



                 
                                                     
                       
                            


                                                      
                               



                                     
                                     


































                                                                       

                                                                             























                                                                             
                         



                             
                 


               
                                                                      
       
                                                 

                                                                                      
 
                                 
     
  

































                                                                         
                                                                         


                                                                         
                                                                         






                                                                         
                                                                  


                                                                  
                                                                  




























                                                                  
                        
     
                                                    

                                                                                               
     
                                                            







                                          
                                   







                                                                                       
                                                                                            















                                                                                           
                                                              











                                                      
                                                    

                                                      
                                                                                            





                                                                                   
                                                              
     
                                                            








                                         

            
                                  
 
                                                    
                                                      
 
                                                                                        
     
 
                                                                                   
                                                           


                                        
 
                                                              
     
 
                                                            
                     









                                      
                                  
 
                                                    
                                                                  
     
 
                                                                                            


                                                     
 




                                                                                                         
 
                                                              
     
 
                                                            
                     






                                            




                                  
                                                    


                         
                         














                                                                                                         
                                                           
                                                     










                                              
                                                                                            








                                                                                           
                                                  








                                                                            
                                                                                              










                                                                  
                                                                                                          




                                                                                   
 

                         
                                           




                                      
                                               








                                                                 
                                                                                
           
                                            


            
                                                       



                                                   
      
 
                  
                                                                                              
                        
                                                      
                           
                                                         
                            
                                                                      
                           
                                                         
                                      
                                                                    
 
                   
 
                  
                                                                                                      
                                             
                                                             
                                     
                                                
     
                   
 
                  
                                                                         
                                    
 


                                            
                                                                      
               
                                                                    
     

               
 
                                                                

                                         
                                                                   

                                            
                                                                                                

                                                       
                                                                              

                                         
                                                                                 

                                            
                                                                                                              
                                                       
#include "lz.h"

#include <util/system/yassert.h>
#include <util/system/byteorder.h>
#include <util/memory/addstorage.h>
#include <util/generic/buffer.h>
#include <util/generic/utility.h>
#include <util/generic/singleton.h>
#include <util/generic/yexception.h>
#include <util/stream/mem.h>

#include <contrib/libs/lz4/lz4.h>
#include <contrib/libs/fastlz/fastlz.h>
#include <contrib/libs/snappy/snappy.h>
#include <contrib/libs/quicklz/quicklz.h>
#include <contrib/libs/minilzo/minilzo.h>

static inline ui8 HostToLittle(ui8 t) noexcept {
    return t;
}

static inline ui8 LittleToHost(ui8 t) noexcept {
    return t;
}

struct TCommonData {
    static const size_t overhead = sizeof(ui16) + sizeof(ui8);
};

const size_t SIGNATURE_SIZE = 4;

template <class TCompressor, class TBase>
class TCompressorBase: public TAdditionalStorage<TCompressorBase<TCompressor, TBase>>, public TCompressor, public TCommonData {
public:
    inline TCompressorBase(IOutputStream* slave, ui16 blockSize)
        : Slave_(slave)
        , BlockSize_(blockSize)
    {
        /*
         * save signature
         */
        static_assert(sizeof(TCompressor::signature) - 1 == SIGNATURE_SIZE, "expect sizeof(TCompressor::signature) - 1 == SIGNATURE_SIZE");
        Slave_->Write(TCompressor::signature, sizeof(TCompressor::signature) - 1);

        /*
         * save version
         */
        this->Save((ui32)1);

        /*
         * save block size
         */
        this->Save(BlockSize());
    }

    inline ~TCompressorBase() {
    }

    inline void Write(const char* buf, size_t len) {
        while (len) {
            const ui16 toWrite = (ui16)Min<size_t>(len, this->BlockSize());

            this->WriteBlock(buf, toWrite);

            buf += toWrite;
            len -= toWrite;
        }
    }

    inline void Flush() {
    }

    inline void Finish() {
        this->Flush();
        this->WriteBlock(nullptr, 0);
    }

    template <class T>
    static inline void Save(T t, IOutputStream* out) {
        t = HostToLittle(t);

        out->Write(&t, sizeof(t));
    }

    template <class T>
    inline void Save(T t) {
        Save(t, Slave_);
    }

private:
    inline void* Block() const noexcept {
        return this->AdditionalData();
    }

    inline ui16 BlockSize() const noexcept {
        return BlockSize_;
    }

    inline void WriteBlock(const void* ptr, ui16 len) {
        Y_ASSERT(len <= this->BlockSize());

        ui8 compressed = false;

        if (len) {
            const size_t out = this->Compress((const char*)ptr, len, (char*)Block(), this->AdditionalDataLength());
            // catch compressor buffer overrun (e.g. SEARCH-2043)
            //Y_VERIFY(out <= this->Hint(this->BlockSize()));

            if (out < len || TCompressor::SaveIncompressibleChunks()) {
                compressed = true;
                ptr = Block();
                len = (ui16)out;
            }
        }

        char tmp[overhead];
        TMemoryOutput header(tmp, sizeof(tmp));

        this->Save(len, &header);
        this->Save(compressed, &header);

        using TPart = IOutputStream::TPart;
        if (ptr) {
            const TPart parts[] = {
                TPart(tmp, sizeof(tmp)),
                TPart(ptr, len),
            };

            Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
        } else {
            Slave_->Write(tmp, sizeof(tmp));
        }
    }

private:
    IOutputStream* Slave_;
    const ui16 BlockSize_;
};

template <class T>
static inline T GLoad(IInputStream* input) {
    T t;

    if (input->Load(&t, sizeof(t)) != sizeof(t)) {
        ythrow TDecompressorError() << "stream error";
    }

    return LittleToHost(t);
}

class TDecompressSignature {
public:
    inline TDecompressSignature(IInputStream* input) {
        if (input->Load(Buffer_, SIGNATURE_SIZE) != SIGNATURE_SIZE) {
            ythrow TDecompressorError() << "can not load stream signature";
        }
    }

    template <class TDecompressor>
    inline bool Check() const {
        static_assert(sizeof(TDecompressor::signature) - 1 == SIGNATURE_SIZE, "expect sizeof(TDecompressor::signature) - 1 == SIGNATURE_SIZE");
        return memcmp(TDecompressor::signature, Buffer_, SIGNATURE_SIZE) == 0;
    }

private:
    char Buffer_[SIGNATURE_SIZE];
};

template <class TDecompressor>
static inline IInputStream* ConsumeSignature(IInputStream* input) {
    TDecompressSignature sign(input);
    if (!sign.Check<TDecompressor>()) {
        ythrow TDecompressorError() << "incorrect signature";
    }
    return input;
}

template <class TDecompressor>
class TDecompressorBaseImpl: public TDecompressor, public TCommonData {
public:
    static inline ui32 CheckVer(ui32 v) {
        if (v != 1) {
            ythrow yexception() << TStringBuf("incorrect stream version: ") << v;
        }

        return v;
    }

    inline TDecompressorBaseImpl(IInputStream* slave)
        : Slave_(slave)
        , Input_(nullptr, 0)
        , Eof_(false)
        , Version_(CheckVer(Load<ui32>()))
        , BlockSize_(Load<ui16>())
        , OutBufSize_(TDecompressor::Hint(BlockSize_))
        , Tmp_(2 * OutBufSize_)
        , In_(Tmp_.Data())
        , Out_(In_ + OutBufSize_)
    {
        this->InitFromStream(Slave_);
    }

    inline ~TDecompressorBaseImpl() {
    }

    inline size_t Read(void* buf, size_t len) {
        size_t ret = Input_.Read(buf, len);

        if (ret) {
            return ret;
        }

        if (Eof_) {
            return 0;
        }

        this->FillNextBlock();

        ret = Input_.Read(buf, len);

        if (ret) {
            return ret;
        }

        Eof_ = true;

        return 0;
    }

    inline void FillNextBlock() {
        char tmp[overhead];

        if (Slave_->Load(tmp, sizeof(tmp)) != sizeof(tmp)) {
            ythrow TDecompressorError() << "can not read block header";
        }

        TMemoryInput header(tmp, sizeof(tmp));

        const ui16 len = GLoad<ui16>(&header);
        if (len > Tmp_.Capacity()) {
            ythrow TDecompressorError() << "invalid len inside block header";
        }
        const ui8 compressed = GLoad<ui8>(&header);

        if (compressed > 1) {
            ythrow TDecompressorError() << "broken header";
        }

        if (Slave_->Load(In_, len) != len) {
            ythrow TDecompressorError() << "can not read data";
        }

        if (compressed) {
            const size_t ret = this->Decompress(In_, len, Out_, OutBufSize_);

            Input_.Reset(Out_, ret);
        } else {
            Input_.Reset(In_, len);
        }
    }

    template <class T>
    inline T Load() {
        return GLoad<T>(Slave_);
    }

protected:
    IInputStream* Slave_;
    TMemoryInput Input_;
    bool Eof_;
    const ui32 Version_;
    const ui16 BlockSize_;
    const size_t OutBufSize_;
    TBuffer Tmp_;
    char* In_;
    char* Out_;
};

template <class TDecompressor, class TBase>
class TDecompressorBase: public TDecompressorBaseImpl<TDecompressor> {
public:
    inline TDecompressorBase(IInputStream* slave)
        : TDecompressorBaseImpl<TDecompressor>(ConsumeSignature<TDecompressor>(slave))
    {
    }

    inline ~TDecompressorBase() {
    }
};

#define DEF_COMPRESSOR_COMMON(rname, name)                              \
    rname::~rname() {                                                   \
        try {                                                           \
            Finish();                                                   \
        } catch (...) {                                                 \
        }                                                               \
    }                                                                   \
                                                                        \
    void rname::DoWrite(const void* buf, size_t len) {                  \
        if (!Impl_) {                                                   \
            ythrow yexception() << "can not write to finalized stream"; \
        }                                                               \
                                                                        \
        Impl_->Write((const char*)buf, len);                            \
    }                                                                   \
                                                                        \
    void rname::DoFlush() {                                             \
        if (!Impl_) {                                                   \
            ythrow yexception() << "can not flush finalized stream";    \
        }                                                               \
                                                                        \
        Impl_->Flush();                                                 \
    }                                                                   \
                                                                        \
    void rname::DoFinish() {                                            \
        THolder<TImpl> impl(Impl_.Release());                           \
                                                                        \
        if (impl) {                                                     \
            impl->Finish();                                             \
        }                                                               \
    }

#define DEF_COMPRESSOR(rname, name)                                     \
    class rname::TImpl: public TCompressorBase<name, TImpl> {           \
    public:                                                             \
        inline TImpl(IOutputStream* out, ui16 blockSize)                \
            : TCompressorBase<name, TImpl>(out, blockSize) {            \
        }                                                               \
    };                                                                  \
                                                                        \
    rname::rname(IOutputStream* slave, ui16 blockSize)                  \
        : Impl_(new (TImpl::Hint(blockSize)) TImpl(slave, blockSize)) { \
    }                                                                   \
                                                                        \
    DEF_COMPRESSOR_COMMON(rname, name)

#define DEF_DECOMPRESSOR(rname, name)                            \
    class rname::TImpl: public TDecompressorBase<name, TImpl> {  \
    public:                                                      \
        inline TImpl(IInputStream* in)                           \
            : TDecompressorBase<name, TImpl>(in) {               \
        }                                                        \
    };                                                           \
                                                                 \
    rname::rname(IInputStream* slave)                            \
        : Impl_(new TImpl(slave)) {                              \
    }                                                            \
                                                                 \
    rname::~rname() {                                            \
    }                                                            \
                                                                 \
    size_t rname::DoRead(void* buf, size_t len) {                \
        return Impl_->Read(buf, len);                            \
    }

/*
 * MiniLzo
 */
class TMiniLzo {
    class TInit {
    public:
        inline TInit() {
            if (lzo_init() != LZO_E_OK) {
                ythrow yexception() << "can not init lzo engine";
            }
        }
    };

public:
    static const char signature[];

    inline TMiniLzo() {
        Singleton<TInit>();
    }

    inline ~TMiniLzo() {
    }

    static inline size_t Hint(size_t len) noexcept {
        // see SEARCH-2043 and, e.g. examples at
        // http://stackoverflow.com/questions/4235019/how-to-get-lzo-to-work-with-a-file-stream
        return len + (len / 16) + 64 + 3;
    }

    static inline bool SaveIncompressibleChunks() noexcept {
        return false;
    }
};

const char TMiniLzo::signature[] = "YLZO";

template <size_t N>
class TFixedArray {
public:
    inline TFixedArray() noexcept {
        memset(WorkMem_, 0, sizeof(WorkMem_));
    }

protected:
    char WorkMem_[N];
};

class TMiniLzoCompressor: public TMiniLzo, public TFixedArray<LZO1X_MEM_COMPRESS + 1> {
public:
    inline size_t Compress(const char* data, size_t len, char* ptr, size_t /*dstMaxSize*/) {
        lzo_uint out = 0;
        lzo1x_1_compress((const lzo_bytep)data, len, (lzo_bytep)ptr, &out, WorkMem_);

        return out;
    }
};

class TMiniLzoDecompressor: public TMiniLzo, public TFixedArray<LZO1X_MEM_DECOMPRESS + 1> {
public:
    inline size_t Decompress(const char* data, size_t len, char* ptr, size_t /*max*/) {
        lzo_uint ret = 0;

        lzo1x_decompress((const lzo_bytep)data, len, (lzo_bytep)ptr, &ret, WorkMem_);

        return ret;
    }

    inline void InitFromStream(IInputStream*) const noexcept {
    }
};

DEF_COMPRESSOR(TLzoCompress, TMiniLzoCompressor)
DEF_DECOMPRESSOR(TLzoDecompress, TMiniLzoDecompressor)

/*
 * FastLZ
 */
class TFastLZ {
public:
    static const char signature[];

    static inline size_t Hint(size_t len) noexcept {
        return Max<size_t>((size_t)(len * 1.06), 100);
    }

    inline size_t Compress(const char* data, size_t len, char* ptr, size_t /*dstMaxSize*/) {
        return fastlz_compress(data, len, ptr);
    }

    inline size_t Decompress(const char* data, size_t len, char* ptr, size_t max) {
        return fastlz_decompress(data, len, ptr, max);
    }

    inline void InitFromStream(IInputStream*) const noexcept {
    }

    static inline bool SaveIncompressibleChunks() noexcept {
        return false;
    }
};

const char TFastLZ::signature[] = "YLZF";

DEF_COMPRESSOR(TLzfCompress, TFastLZ)
DEF_DECOMPRESSOR(TLzfDecompress, TFastLZ)

/*
 * LZ4
 */
class TLZ4 {
public:
    static const char signature[];

    static inline size_t Hint(size_t len) noexcept {
        return Max<size_t>((size_t)(len * 1.06), 100);
    }

    inline size_t Compress(const char* data, size_t len, char* ptr, size_t dstMaxSize) {
        return LZ4_compress_default(data, ptr, len, dstMaxSize);
    }

    inline size_t Decompress(const char* data, size_t len, char* ptr, size_t max) {
        int res = LZ4_decompress_safe(data, ptr, len, max);
        if (res < 0)
            ythrow TDecompressorError();
        return res;
    }

    inline void InitFromStream(IInputStream*) const noexcept {
    }

    static inline bool SaveIncompressibleChunks() noexcept {
        return false;
    }
};

const char TLZ4::signature[] = "LZ.4";

DEF_COMPRESSOR(TLz4Compress, TLZ4)
DEF_DECOMPRESSOR(TLz4Decompress, TLZ4)

/*
 * Snappy
 */
class TSnappy {
public:
    static const char signature[];

    static inline size_t Hint(size_t len) noexcept {
        return Max<size_t>(snappy::MaxCompressedLength(len), 100);
    }

    inline size_t Compress(const char* data, size_t len, char* ptr, size_t /*dstMaxSize*/) {
        size_t reslen = 0;
        snappy::RawCompress(data, len, ptr, &reslen);
        return reslen;
    }

    inline size_t Decompress(const char* data, size_t len, char* ptr, size_t) {
        size_t srclen = 0;
        if (!snappy::GetUncompressedLength(data, len, &srclen) || !snappy::RawUncompress(data, len, ptr))
            ythrow TDecompressorError();
        return srclen;
    }

    inline void InitFromStream(IInputStream*) const noexcept {
    }

    static inline bool SaveIncompressibleChunks() noexcept {
        return false;
    }
};

const char TSnappy::signature[] = "Snap";

DEF_COMPRESSOR(TSnappyCompress, TSnappy)
DEF_DECOMPRESSOR(TSnappyDecompress, TSnappy)

/*
 * QuickLZ
 */
class TQuickLZBase {
public:
    static const char signature[];

    static inline size_t Hint(size_t len) noexcept {
        return len + 500;
    }

    inline TQuickLZBase()
        : Table_(nullptr)
    {
    }

    inline void Init(unsigned ver, unsigned lev, unsigned mod, unsigned type) {
        Table_ = LzqTable(ver, lev, mod);

        if (!Table_) {
            ythrow yexception() << "unsupported lzq stream(" << ver << ", " << lev << ", " << mod << ")";
        }

        const size_t size = Table_->Setting(3) + Table_->Setting(type);

        Mem_.Reset(::operator new(size));
        memset(Mem_.Get(), 0, size);
    }

    inline bool SaveIncompressibleChunks() const noexcept {
        // we must save incompressible chunks "as is"
        // after compressor run in streaming mode
        return Table_->Setting(3);
    }

protected:
    const TQuickLZMethods* Table_;
    THolder<void> Mem_;
};

const char TQuickLZBase::signature[] = "YLZQ";

class TQuickLZCompress: public TQuickLZBase {
public:
    inline size_t Compress(const char* data, size_t len, char* ptr, size_t /*dstMaxSize*/) {
        return Table_->Compress(data, ptr, len, (char*)Mem_.Get());
    }
};

class TQuickLZDecompress: public TQuickLZBase {
public:
    inline size_t Decompress(const char* data, size_t /*len*/, char* ptr, size_t /*max*/) {
        return Table_->Decompress(data, ptr, (char*)Mem_.Get());
    }

    inline void InitFromStream(IInputStream* in) {
        const ui8 ver = ::GLoad<ui8>(in);
        const ui8 lev = ::GLoad<ui8>(in);
        const ui8 mod = ::GLoad<ui8>(in);

        Init(ver, lev, mod, 2);
    }
};

class TLzqCompress::TImpl: public TCompressorBase<TQuickLZCompress, TImpl> {
public:
    inline TImpl(IOutputStream* out, ui16 blockSize, EVersion ver, unsigned level, EMode mode)
        : TCompressorBase<TQuickLZCompress, TImpl>(out, blockSize)
    {
        memset(AdditionalData(), 0, AdditionalDataLength());

        Init(ver, level, mode, 1);

        Save((ui8)ver);
        Save((ui8)level);
        Save((ui8)mode);
    }
};

TLzqCompress::TLzqCompress(IOutputStream* slave, ui16 blockSize, EVersion ver, unsigned level, EMode mode)
    : Impl_(new (TImpl::Hint(blockSize)) TImpl(slave, blockSize, ver, level, mode))
{
}

DEF_COMPRESSOR_COMMON(TLzqCompress, TQuickLZCompress)
DEF_DECOMPRESSOR(TLzqDecompress, TQuickLZDecompress)

namespace {
    template <class T>
    struct TInputHolder {
        static inline T Set(T t) noexcept {
            return t;
        }
    };

    template <class T>
    struct TInputHolder<TAutoPtr<T>> {
        inline T* Set(TAutoPtr<T> v) noexcept {
            V_ = v;

            return V_.Get();
        }

        TAutoPtr<T> V_;
    };

    // Decompressing input streams without signature verification
    template <class TInput, class TDecompressor>
    class TLzDecompressInput: public TInputHolder<TInput>, public IInputStream {
    public:
        inline TLzDecompressInput(TInput in)
            : Impl_(this->Set(in))
        {
        }

    private:
        size_t DoRead(void* buf, size_t len) override {
            return Impl_.Read(buf, len);
        }

    private:
        TDecompressorBaseImpl<TDecompressor> Impl_;
    };
}

template <class T>
static TAutoPtr<IInputStream> TryOpenLzDecompressorX(const TDecompressSignature& s, T input) {
    if (s.Check<TLZ4>())
        return new TLzDecompressInput<T, TLZ4>(input);

    if (s.Check<TSnappy>())
        return new TLzDecompressInput<T, TSnappy>(input);

    if (s.Check<TMiniLzo>())
        return new TLzDecompressInput<T, TMiniLzoDecompressor>(input);

    if (s.Check<TFastLZ>())
        return new TLzDecompressInput<T, TFastLZ>(input);

    if (s.Check<TQuickLZDecompress>())
        return new TLzDecompressInput<T, TQuickLZDecompress>(input);

    return nullptr;
}

template <class T>
static inline TAutoPtr<IInputStream> TryOpenLzDecompressorImpl(const TStringBuf& signature, T input) {
    if (signature.size() == SIGNATURE_SIZE) {
        TMemoryInput mem(signature.data(), signature.size());
        TDecompressSignature s(&mem);

        return TryOpenLzDecompressorX(s, input);
    }

    return nullptr;
}

template <class T>
static inline TAutoPtr<IInputStream> TryOpenLzDecompressorImpl(T input) {
    TDecompressSignature s(&*input);

    return TryOpenLzDecompressorX(s, input);
}

template <class T>
static inline TAutoPtr<IInputStream> OpenLzDecompressorImpl(T input) {
    TAutoPtr<IInputStream> ret = TryOpenLzDecompressorImpl(input);

    if (!ret) {
        ythrow TDecompressorError() << "Unknown compression format";
    }

    return ret;
}

TAutoPtr<IInputStream> OpenLzDecompressor(IInputStream* input) {
    return OpenLzDecompressorImpl(input);
}

TAutoPtr<IInputStream> TryOpenLzDecompressor(IInputStream* input) {
    return TryOpenLzDecompressorImpl(input);
}

TAutoPtr<IInputStream> TryOpenLzDecompressor(const TStringBuf& signature, IInputStream* input) {
    return TryOpenLzDecompressorImpl(signature, input);
}

TAutoPtr<IInputStream> OpenOwnedLzDecompressor(TAutoPtr<IInputStream> input) {
    return OpenLzDecompressorImpl(input);
}

TAutoPtr<IInputStream> TryOpenOwnedLzDecompressor(TAutoPtr<IInputStream> input) {
    return TryOpenLzDecompressorImpl(input);
}

TAutoPtr<IInputStream> TryOpenOwnedLzDecompressor(const TStringBuf& signature, TAutoPtr<IInputStream> input) {
    return TryOpenLzDecompressorImpl(signature, input);
}