diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/archive/yarchive.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/archive/yarchive.cpp')
-rw-r--r-- | library/cpp/archive/yarchive.cpp | 398 |
1 files changed, 398 insertions, 0 deletions
diff --git a/library/cpp/archive/yarchive.cpp b/library/cpp/archive/yarchive.cpp new file mode 100644 index 0000000000..1becc3e5da --- /dev/null +++ b/library/cpp/archive/yarchive.cpp @@ -0,0 +1,398 @@ +#include "yarchive.h" + +#include <util/generic/algorithm.h> +#include <util/generic/hash.h> +#include <util/generic/utility.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/memory/blob.h> +#include <util/memory/tempbuf.h> +#include <util/stream/input.h> +#include <util/stream/length.h> +#include <util/stream/mem.h> +#include <util/stream/output.h> +#include <util/stream/zlib.h> +#include <util/system/byteorder.h> +#include <util/ysaveload.h> + +template <class T> +static inline void ESSave(IOutputStream* out, const T& t_in) { + T t = HostToLittle(t_in); + + out->Write((const void*)&t, sizeof(t)); +} + +static inline void ESSave(IOutputStream* out, const TString& s) { + ESSave(out, (ui32) s.size()); + out->Write(s.data(), s.size()); +} + +template <class T> +static inline T ESLoad(IInputStream* in) { + T t = T(); + + if (in->Load(&t, sizeof(t)) != sizeof(t)) { + ythrow TSerializeException() << "malformed archive"; + } + + return LittleToHost(t); +} + +template <> +inline TString ESLoad<TString>(IInputStream* in) { + size_t len = ESLoad<ui32>(in); + TString ret; + TTempBuf tmp; + + while (len) { + const size_t toread = Min(len, tmp.Size()); + const size_t readed = in->Read(tmp.Data(), toread); + + if (!readed) { + ythrow TSerializeException() << "malformed archive"; + } + + ret.append(tmp.Data(), readed); + len -= readed; + } + + return ret; +} + +namespace { + class TArchiveRecordDescriptor: public TSimpleRefCount<TArchiveRecordDescriptor> { + public: + inline TArchiveRecordDescriptor(ui64 off, ui64 len, const TString& name) + : Off_(off) + , Len_(len) + , Name_(name) + { + } + + inline TArchiveRecordDescriptor(IInputStream* in) + : Off_(ESLoad<ui64>(in)) + , Len_(ESLoad<ui64>(in)) + , Name_(ESLoad<TString>(in)) + { + } + + inline ~TArchiveRecordDescriptor() = default; + + inline void SaveTo(IOutputStream* out) const { + ESSave(out, Off_); + ESSave(out, Len_); + ESSave(out, Name_); + } + + inline const TString& Name() const noexcept { + return Name_; + } + + inline ui64 Length() const noexcept { + return Len_; + } + + inline ui64 Offset() const noexcept { + return Off_; + } + + private: + ui64 Off_; + ui64 Len_; + TString Name_; + }; + + typedef TIntrusivePtr<TArchiveRecordDescriptor> TArchiveRecordDescriptorRef; +} + +class TArchiveWriter::TImpl { + using TDict = THashMap<TString, TArchiveRecordDescriptorRef>; + +public: + inline TImpl(IOutputStream& out, bool compress) + : Off_(0) + , Out_(&out) + , UseCompression(compress) + { + } + + inline ~TImpl() = default; + + inline void Flush() { + Out_->Flush(); + } + + inline void Finish() { + TCountingOutput out(Out_); + + { + TZLibCompress compress(&out); + + ESSave(&compress, (ui32)Dict_.size()); + + for (const auto& kv : Dict_) { + kv.second->SaveTo(&compress); + } + + ESSave(&compress, static_cast<ui8>(UseCompression)); + + compress.Finish(); + } + + ESSave(Out_, out.Counter()); + + Out_->Flush(); + } + + inline void Add(const TString& key, IInputStream* src) { + Y_ENSURE(!Dict_.contains(key), "key " << key.data() << " already stored"); + + TCountingOutput out(Out_); + if (UseCompression) { + TZLibCompress compress(&out); + TransferData(src, &compress); + compress.Finish(); + } else { + size_t skip_size = ArchiveWriterDefaultDataAlignment - Off_ % ArchiveWriterDefaultDataAlignment; + if (skip_size == ArchiveWriterDefaultDataAlignment) { + skip_size = 0; + } + while(skip_size > 0) { + Out_->Write(char(0)); + Off_ += 1; + skip_size -= 1; + } + TransferData(src, &out); + out.Finish(); + } + + TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(Off_, out.Counter(), key)); + + Dict_[key] = descr; + Off_ += out.Counter(); + } + + inline void AddSynonym(const TString& existingKey, const TString& newKey) { + Y_ENSURE(Dict_.contains(existingKey), "key " << existingKey.data() << " not stored yet"); + Y_ENSURE(!Dict_.contains(newKey), "key " << newKey.data() << " already stored"); + + TArchiveRecordDescriptorRef existingDescr = Dict_[existingKey]; + TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(existingDescr->Offset(), existingDescr->Length(), newKey)); + + Dict_[newKey] = descr; + } + +private: + ui64 Off_; + IOutputStream* Out_; + TDict Dict_; + const bool UseCompression; +}; + +TArchiveWriter::TArchiveWriter(IOutputStream* out, bool compress) + : Impl_(new TImpl(*out, compress)) +{ +} + +TArchiveWriter::~TArchiveWriter() { + try { + Finish(); + } catch (...) { + } +} + +void TArchiveWriter::Flush() { + if (Impl_.Get()) { + Impl_->Flush(); + } +} + +void TArchiveWriter::Finish() { + if (Impl_.Get()) { + Impl_->Finish(); + Impl_.Destroy(); + } +} + +void TArchiveWriter::Add(const TString& key, IInputStream* src) { + Y_ENSURE(Impl_.Get(), "archive already closed"); + + Impl_->Add(key, src); +} + +void TArchiveWriter::AddSynonym(const TString& existingKey, const TString& newKey) { + Y_ENSURE(Impl_.Get(), "archive already closed"); + + Impl_->AddSynonym(existingKey, newKey); +} + +namespace { + class TArchiveInputStreamBase { + public: + inline TArchiveInputStreamBase(const TBlob& b) + : Blob_(b) + , Input_(b.Data(), b.Size()) + { + } + + protected: + TBlob Blob_; + TMemoryInput Input_; + }; + + class TArchiveInputStream: public TArchiveInputStreamBase, public TZLibDecompress { + public: + inline TArchiveInputStream(const TBlob& b) + : TArchiveInputStreamBase(b) + , TZLibDecompress(&Input_) + { + } + + ~TArchiveInputStream() override = default; + }; +} + +class TArchiveReader::TImpl { + typedef THashMap<TString, TArchiveRecordDescriptorRef> TDict; + +public: + inline TImpl(const TBlob& blob) + : Blob_(blob) + , UseDecompression(true) + { + ReadDict(); + } + + inline ~TImpl() = default; + + inline void ReadDict() { + Y_ENSURE(Blob_.Size() >= sizeof(ui64), "too small blob"); + + const char* end = (const char*)Blob_.End(); + const char* ptr = end - sizeof(ui64); + ui64 dictlen = 0; + memcpy(&dictlen, ptr, sizeof(ui64)); + dictlen = LittleToHost(dictlen); + + Y_ENSURE(dictlen <= Blob_.Size() - sizeof(ui64), "bad blob"); + + const char* beg = ptr - dictlen; + TMemoryInput mi(beg, dictlen); + TZLibDecompress d(&mi); + const ui32 count = ESLoad<ui32>(&d); + + for (size_t i = 0; i < count; ++i) { + TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(&d)); + + Recs_.push_back(descr); + Dict_[descr->Name()] = descr; + } + Sort(Recs_.begin(), Recs_.end(), [](const auto& lhs, const auto& rhs) -> bool { + return lhs->Offset() < rhs->Offset(); + }); + + try { + UseDecompression = static_cast<bool>(ESLoad<ui8>(&d)); + } catch (const TSerializeException&) { + // that's ok - just old format + UseDecompression = true; + } + } + + inline size_t Count() const noexcept { + return Recs_.size(); + } + + inline TString KeyByIndex(size_t n) const { + if (n < Count()) { + return Recs_[n]->Name(); + } + + ythrow yexception() << "incorrect index"; + } + + inline bool Has(const TStringBuf key) const { + return Dict_.contains(key); + } + + inline TAutoPtr<IInputStream> ObjectByKey(const TStringBuf key) const { + TBlob subBlob = BlobByKey(key); + + if (UseDecompression) { + return new TArchiveInputStream(subBlob); + } else { + return new TMemoryInput(subBlob.Data(), subBlob.Length()); + } + } + + inline TBlob ObjectBlobByKey(const TStringBuf key) const { + TBlob subBlob = BlobByKey(key); + + if (UseDecompression) { + TArchiveInputStream st(subBlob); + return TBlob::FromStream(st); + } else { + return subBlob; + } + } + + inline TBlob BlobByKey(const TStringBuf key) const { + const auto it = Dict_.find(key); + + Y_ENSURE(it != Dict_.end(), "key " << key.data() << " not found"); + + const size_t off = it->second->Offset(); + const size_t len = it->second->Length(); + + /* + * TODO - overflow check + */ + + return Blob_.SubBlob(off, off + len); + } + + inline bool Compressed() const { + return UseDecompression; + } + +private: + TBlob Blob_; + TVector<TArchiveRecordDescriptorRef> Recs_; + TDict Dict_; + bool UseDecompression; +}; + +TArchiveReader::TArchiveReader(const TBlob& data) + : Impl_(new TImpl(data)) +{ +} + +TArchiveReader::~TArchiveReader() {} + +size_t TArchiveReader::Count() const noexcept { + return Impl_->Count(); +} + +TString TArchiveReader::KeyByIndex(size_t n) const { + return Impl_->KeyByIndex(n); +} + +bool TArchiveReader::Has(const TStringBuf key) const { + return Impl_->Has(key); +} + +TAutoPtr<IInputStream> TArchiveReader::ObjectByKey(const TStringBuf key) const { + return Impl_->ObjectByKey(key); +} + +TBlob TArchiveReader::ObjectBlobByKey(const TStringBuf key) const { + return Impl_->ObjectBlobByKey(key); +} + +TBlob TArchiveReader::BlobByKey(const TStringBuf key) const { + return Impl_->BlobByKey(key); +} + +bool TArchiveReader::Compressed() const { + return Impl_->Compressed(); +} |