aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/archive/yarchive.cpp
blob: 1becc3e5dac2107222eceb7525f3a388a39a7bcc (plain) (tree)
1
2
3
4
5
6
7
                     
                                   
                              
                                 
                                    







                                  
                  
                                                              



                                           
                                                                 
                                   

                  
                                          

                                               
                                                            




                           
                                                  
                                  
                





                                                           
                                                                










                                                                                      
                                                                                




                         
                                                         
                                    
                                        

         
                                                     
 
                                                      



                               
                                                     

                         
                                             

                        
                                             




                        
                      




                                                                                
                                                                 
       
                                                   




                                  
                              




                          
                                  




                                                  
                                             





                                                                
                                    


                      
                                                            
                                                                                  
 
                                  



                                         







                                                                                                            


                                    
                                                                                                  
                           
                              
     
                                                                               
                                                                                                 





                                                                                                                                  
              
                        


                              
                                                                 
                                      

 
                                   

















                               
                                                                 
                                                    


                         




                                                                                    
















                                                                                       
                                      

         
                                                  


                             
                                                                 







                                   
                              
                            
                                                                 





                                                   
                                                                     

                                        
                               


                                                                                
 

                                         
                                                                                       
           







                                                                  
                                          

                            
                                               




                                                 
 
                                                 
                                   
     
 
                                                                           




                                                                      
         
     
 
                                                              
                                       
 



                                            
         
     
 
                                                        
 
                                                                          












                                                
 
                
                                               







                                                 
                                    
 
                                               

                          
                                                    

                                
                                                      

                           
                                                                                
                                   
 
                                                                   

                                       
                                                             




                                         
#include "yarchive.h"

#include <util/generic/algorithm.h>
#include <util/generic/hash.h>
#include <util/generic/utility.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/memory/blob.h>
#include <util/memory/tempbuf.h>
#include <util/stream/input.h>
#include <util/stream/length.h>
#include <util/stream/mem.h>
#include <util/stream/output.h>
#include <util/stream/zlib.h>
#include <util/system/byteorder.h>
#include <util/ysaveload.h>

template <class T>
static inline void ESSave(IOutputStream* out, const T& t_in) {
    T t = HostToLittle(t_in);

    out->Write((const void*)&t, sizeof(t));
}

static inline void ESSave(IOutputStream* out, const TString& s) {
    ESSave(out, (ui32) s.size());
    out->Write(s.data(), s.size());
}

template <class T>
static inline T ESLoad(IInputStream* in) {
    T t = T();

    if (in->Load(&t, sizeof(t)) != sizeof(t)) {
        ythrow TSerializeException() << "malformed archive";
    }

    return LittleToHost(t);
}

template <>
inline TString ESLoad<TString>(IInputStream* in) {
    size_t len = ESLoad<ui32>(in);
    TString ret;
    TTempBuf tmp;

    while (len) {
        const size_t toread = Min(len, tmp.Size());
        const size_t readed = in->Read(tmp.Data(), toread);

        if (!readed) {
            ythrow TSerializeException() << "malformed archive";
        }

        ret.append(tmp.Data(), readed);
        len -= readed;
    }

    return ret;
}

namespace {
    class TArchiveRecordDescriptor: public TSimpleRefCount<TArchiveRecordDescriptor> {
    public:
        inline TArchiveRecordDescriptor(ui64 off, ui64 len, const TString& name)
            : Off_(off)
            , Len_(len)
            , Name_(name)
        {
        }

        inline TArchiveRecordDescriptor(IInputStream* in)
            : Off_(ESLoad<ui64>(in))
            , Len_(ESLoad<ui64>(in))
            , Name_(ESLoad<TString>(in))
        {
        }

        inline ~TArchiveRecordDescriptor() = default;

        inline void SaveTo(IOutputStream* out) const {
            ESSave(out, Off_);
            ESSave(out, Len_);
            ESSave(out, Name_);
        }

        inline const TString& Name() const noexcept {
            return Name_;
        }

        inline ui64 Length() const noexcept {
            return Len_;
        }

        inline ui64 Offset() const noexcept {
            return Off_;
        }

    private:
        ui64 Off_;
        ui64 Len_;
        TString Name_;
    };

    typedef TIntrusivePtr<TArchiveRecordDescriptor> TArchiveRecordDescriptorRef;
}

class TArchiveWriter::TImpl {
    using TDict = THashMap<TString, TArchiveRecordDescriptorRef>;

public:
    inline TImpl(IOutputStream& out, bool compress)
        : Off_(0)
        , Out_(&out)
        , UseCompression(compress)
    {
    }

    inline ~TImpl() = default;

    inline void Flush() {
        Out_->Flush();
    }

    inline void Finish() {
        TCountingOutput out(Out_);

        {
            TZLibCompress compress(&out);

            ESSave(&compress, (ui32)Dict_.size());

            for (const auto& kv : Dict_) {
                kv.second->SaveTo(&compress);
            }

            ESSave(&compress, static_cast<ui8>(UseCompression));

            compress.Finish();
        }

        ESSave(Out_, out.Counter());

        Out_->Flush();
    }

    inline void Add(const TString& key, IInputStream* src) {
        Y_ENSURE(!Dict_.contains(key), "key " << key.data() << " already stored");

        TCountingOutput out(Out_);
        if (UseCompression) {
            TZLibCompress compress(&out);
            TransferData(src, &compress);
            compress.Finish();
        } else {
            size_t skip_size = ArchiveWriterDefaultDataAlignment - Off_ % ArchiveWriterDefaultDataAlignment;
            if (skip_size == ArchiveWriterDefaultDataAlignment) {
                skip_size = 0;
            }
            while(skip_size > 0) {
                Out_->Write(char(0));
                Off_ += 1;
                skip_size -= 1;
            }
            TransferData(src, &out);
            out.Finish();
        }

        TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(Off_, out.Counter(), key));

        Dict_[key] = descr;
        Off_ += out.Counter();
    }

    inline void AddSynonym(const TString& existingKey, const TString& newKey) {
        Y_ENSURE(Dict_.contains(existingKey), "key " << existingKey.data() << " not stored yet");
        Y_ENSURE(!Dict_.contains(newKey), "key " << newKey.data() << " already stored");

        TArchiveRecordDescriptorRef existingDescr = Dict_[existingKey];
        TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(existingDescr->Offset(), existingDescr->Length(), newKey));

        Dict_[newKey] = descr;
    }

private:
    ui64 Off_;
    IOutputStream* Out_;
    TDict Dict_;
    const bool UseCompression;
};

TArchiveWriter::TArchiveWriter(IOutputStream* out, bool compress)
    : Impl_(new TImpl(*out, compress))
{
}

TArchiveWriter::~TArchiveWriter() {
    try {
        Finish();
    } catch (...) {
    }
}

void TArchiveWriter::Flush() {
    if (Impl_.Get()) {
        Impl_->Flush();
    }
}

void TArchiveWriter::Finish() {
    if (Impl_.Get()) {
        Impl_->Finish();
        Impl_.Destroy();
    }
}

void TArchiveWriter::Add(const TString& key, IInputStream* src) {
    Y_ENSURE(Impl_.Get(), "archive already closed");

    Impl_->Add(key, src);
}

void TArchiveWriter::AddSynonym(const TString& existingKey, const TString& newKey) {
    Y_ENSURE(Impl_.Get(), "archive already closed");

    Impl_->AddSynonym(existingKey, newKey);
}

namespace {
    class TArchiveInputStreamBase {
    public:
        inline TArchiveInputStreamBase(const TBlob& b)
            : Blob_(b)
            , Input_(b.Data(), b.Size())
        {
        }

    protected:
        TBlob Blob_;
        TMemoryInput Input_;
    };

    class TArchiveInputStream: public TArchiveInputStreamBase, public TZLibDecompress {
    public:
        inline TArchiveInputStream(const TBlob& b)
            : TArchiveInputStreamBase(b)
            , TZLibDecompress(&Input_)
        {
        }

        ~TArchiveInputStream() override = default;
    };
}

class TArchiveReader::TImpl {
    typedef THashMap<TString, TArchiveRecordDescriptorRef> TDict;

public:
    inline TImpl(const TBlob& blob)
        : Blob_(blob)
        , UseDecompression(true)
    {
        ReadDict();
    }

    inline ~TImpl() = default;

    inline void ReadDict() {
        Y_ENSURE(Blob_.Size() >= sizeof(ui64), "too small blob");

        const char* end = (const char*)Blob_.End();
        const char* ptr = end - sizeof(ui64);
        ui64 dictlen = 0;
        memcpy(&dictlen, ptr, sizeof(ui64));
        dictlen = LittleToHost(dictlen);

        Y_ENSURE(dictlen <= Blob_.Size() - sizeof(ui64), "bad blob");

        const char* beg = ptr - dictlen;
        TMemoryInput mi(beg, dictlen);
        TZLibDecompress d(&mi);
        const ui32 count = ESLoad<ui32>(&d);

        for (size_t i = 0; i < count; ++i) {
            TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(&d));

            Recs_.push_back(descr);
            Dict_[descr->Name()] = descr;
        }
        Sort(Recs_.begin(), Recs_.end(), [](const auto& lhs, const auto& rhs) -> bool {
            return lhs->Offset() < rhs->Offset();
        });

        try {
            UseDecompression = static_cast<bool>(ESLoad<ui8>(&d));
        } catch (const TSerializeException&) {
            // that's ok - just old format
            UseDecompression = true;
        }
    }

    inline size_t Count() const noexcept {
        return Recs_.size();
    }

    inline TString KeyByIndex(size_t n) const {
        if (n < Count()) {
            return Recs_[n]->Name();
        }

        ythrow yexception() << "incorrect index";
    }

    inline bool Has(const TStringBuf key) const {
        return Dict_.contains(key);
    }

    inline TAutoPtr<IInputStream> ObjectByKey(const TStringBuf key) const {
        TBlob subBlob = BlobByKey(key);

        if (UseDecompression) {
            return new TArchiveInputStream(subBlob);
        } else {
            return new TMemoryInput(subBlob.Data(), subBlob.Length());
        }
    }

    inline TBlob ObjectBlobByKey(const TStringBuf key) const {
        TBlob subBlob = BlobByKey(key);

        if (UseDecompression) {
            TArchiveInputStream st(subBlob);
            return TBlob::FromStream(st);
        } else {
            return subBlob;
        }
    }

    inline TBlob BlobByKey(const TStringBuf key) const {
        const auto it = Dict_.find(key);

        Y_ENSURE(it != Dict_.end(), "key " << key.data() << " not found");

        const size_t off = it->second->Offset();
        const size_t len = it->second->Length();

        /*
             * TODO - overflow check
             */

        return Blob_.SubBlob(off, off + len);
    }

    inline bool Compressed() const {
        return UseDecompression;
    }

private:
    TBlob Blob_;
    TVector<TArchiveRecordDescriptorRef> Recs_;
    TDict Dict_;
    bool UseDecompression;
};

TArchiveReader::TArchiveReader(const TBlob& data)
    : Impl_(new TImpl(data))
{
}

TArchiveReader::~TArchiveReader() {}

size_t TArchiveReader::Count() const noexcept {
    return Impl_->Count();
}

TString TArchiveReader::KeyByIndex(size_t n) const {
    return Impl_->KeyByIndex(n);
}

bool TArchiveReader::Has(const TStringBuf key) const {
    return Impl_->Has(key);
}

TAutoPtr<IInputStream> TArchiveReader::ObjectByKey(const TStringBuf key) const {
    return Impl_->ObjectByKey(key);
}

TBlob TArchiveReader::ObjectBlobByKey(const TStringBuf key) const {
    return Impl_->ObjectBlobByKey(key);
}

TBlob TArchiveReader::BlobByKey(const TStringBuf key) const {
    return Impl_->BlobByKey(key);
}

bool TArchiveReader::Compressed() const {
    return Impl_->Compressed();
}